# Copyright 2011-2012 GRNET S.A. All rights reserved.
-#
+#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
-#
+#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
-#
+#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
-#
+#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-#
+#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
import time
import uuid as uuidlib
import logging
+import hashlib
import binascii
-from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
+from commissioning.clients.quotaholder import QuotaholderHTTP
-from pithos.lib.hashmap import HashMap
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
+ AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
+
+# Stripped-down version of the HashMap class found in tools.
+
+
+class HashMap(list):
+
+ def __init__(self, blocksize, blockhash):
+ super(HashMap, self).__init__()
+ self.blocksize = blocksize
+ self.blockhash = blockhash
+
+ def _hash_raw(self, v):
+ h = hashlib.new(self.blockhash)
+ h.update(v)
+ return h.digest()
+
+ def hash(self):
+ if len(self) == 0:
+ return self._hash_raw('')
+ if len(self) == 1:
+ return self.__getitem__(0)
+
+ h = list(self)
+ s = 2
+ while s < len(h):
+ s = s * 2
+ h += [('\x00' * len(h[0]))] * (s - len(h))
+ while len(h) > 1:
+ h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
+ return h[0]
# Default modules and settings.
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
+DEFAULT_BLOCK_UMASK = 0o022
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
-#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
+#DEFAULT_QUEUE_EXCHANGE = 'pithos'
-QUEUE_MESSAGE_KEY = '#'
-QUEUE_CLIENT_ID = 2 # Pithos.
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
+QUEUE_CLIENT_ID = 'pithos'
+QUEUE_INSTANCE_ID = '1'
-( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
+(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
inf = float('inf')
if not autocommit:
return func
+
def fn(self, *args, **kw):
self.wrapper.execute()
+ serials = []
+ self.serials = serials
+ self.messages = []
try:
ret = func(self, *args, **kw)
+ for m in self.messages:
+ self.queue.send(*m)
+ if serials:
+ self.quotaholder.accept_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.commit()
return ret
except:
+ self.quotaholder.reject_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.rollback()
raise
return fn
class ModularBackend(BaseBackend):
"""A modular backend.
-
+
Uses modules for SQL functions and storage.
"""
-
+
def __init__(self, db_module=None, db_connection=None,
- block_module=None, block_path=None,
- queue_module=None, queue_connection=None):
+ block_module=None, block_path=None, block_umask=None,
+ queue_module=None, queue_hosts=None,
+ queue_exchange=None, quotaholder_url=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
block_path = block_path or DEFAULT_BLOCK_PATH
+ block_umask = block_umask or DEFAULT_BLOCK_UMASK
#queue_module = queue_module or DEFAULT_QUEUE_MODULE
- #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
-
+ #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
+ #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
+
self.hash_algorithm = 'sha256'
- self.block_size = 4 * 1024 * 1024 # 4MB
-
- self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
-
+ self.block_size = 4 * 1024 * 1024 # 4MB
+
+ self.default_policy = {'quota': DEFAULT_QUOTA,
+ 'versioning': DEFAULT_VERSIONING}
+
def load_module(m):
__import__(m)
return sys.modules[m]
-
+
self.db_module = load_module(db_module)
self.wrapper = self.db_module.DBWrapper(db_connection)
params = {'wrapper': self.wrapper}
self.permissions = self.db_module.Permissions(**params)
+ self.config = self.db_module.Config(**params)
+ self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
self.node = self.db_module.Node(**params)
for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
setattr(self, x, getattr(self.db_module, x))
-
+
self.block_module = load_module(block_module)
params = {'path': block_path,
'block_size': self.block_size,
- 'hash_algorithm': self.hash_algorithm}
+ 'hash_algorithm': self.hash_algorithm,
+ 'umask': block_umask}
self.store = self.block_module.Store(**params)
- if queue_module and queue_connection:
+ if queue_module and queue_hosts:
self.queue_module = load_module(queue_module)
- params = {'exchange': queue_connection,
- 'message_key': QUEUE_MESSAGE_KEY,
+ params = {'hosts': queue_hosts,
+ 'exchange': queue_exchange,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
class NoQueue:
def send(self, *args):
pass
-
+
def close(self):
pass
-
+
self.queue = NoQueue()
-
+
+ self.quotaholder_url = quotaholder_url
+ self.quotaholder = QuotaholderHTTP(quotaholder_url)
+ self.serials = []
+ self.messages = []
+
def close(self):
self.wrapper.close()
self.queue.close()
-
+
@backend_method
def list_accounts(self, user, marker=None, limit=10000):
"""Return a list of accounts the user can access."""
-
+
logger.debug("list_accounts: %s %s %s", user, marker, limit)
allowed = self._allowed_accounts(user)
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
-
+
@backend_method
def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
"""Return a dictionary with the account metadata for the domain."""
-
- logger.debug("get_account_meta: %s %s %s", account, domain, until)
+
+ logger.debug(
+ "get_account_meta: %s %s %s %s", user, account, domain, until)
path, node = self._lookup_account(account, user == account)
if user != account:
if until or node is None or account not in self._allowed_accounts(user):
if until is None:
modified = tstamp
else:
- modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = self._get_statistics(
+ node)[2] # Overall last modification.
modified = max(modified, mtime)
-
+
if user != account:
meta = {'name': account}
else:
meta = {}
if props is not None and include_user_defined:
- meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
+ meta.update(
+ dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
meta.update({'modified': modified})
return meta
-
+
@backend_method
def update_account_meta(self, user, account, domain, meta, replace=False):
"""Update the metadata associated with the account for the domain."""
-
- logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
+
+ logger.debug("update_account_meta: %s %s %s %s %s", user,
+ account, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
self._put_metadata(user, node, domain, meta, replace)
-
+
@backend_method
def get_account_groups(self, user, account):
"""Return a dictionary with the user groups defined for this account."""
-
- logger.debug("get_account_groups: %s", account)
+
+ logger.debug("get_account_groups: %s %s", user, account)
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
return {}
self._lookup_account(account, True)
return self.permissions.group_dict(account)
-
+
@backend_method
def update_account_groups(self, user, account, groups, replace=False):
"""Update the groups associated with the account."""
-
- logger.debug("update_account_groups: %s %s %s", account, groups, replace)
+
+ logger.debug("update_account_groups: %s %s %s %s", user,
+ account, groups, replace)
if user != account:
raise NotAllowedError
self._lookup_account(account, True)
if replace:
self.permissions.group_destroy(account)
for k, v in groups.iteritems():
- if not replace: # If not already deleted.
+ if not replace: # If not already deleted.
self.permissions.group_delete(account, k)
if v:
self.permissions.group_addmany(account, k, v)
-
+
@backend_method
def get_account_policy(self, user, account):
"""Return a dictionary with the account policy."""
-
- logger.debug("get_account_policy: %s", account)
+
+ logger.debug("get_account_policy: %s %s", user, account)
if user != account:
if account not in self._allowed_accounts(user):
raise NotAllowedError
return {}
path, node = self._lookup_account(account, True)
return self._get_policy(node)
-
+
@backend_method
def update_account_policy(self, user, account, policy, replace=False):
"""Update the policy associated with the account."""
-
- logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+
+ logger.debug("update_account_policy: %s %s %s %s", user,
+ account, policy, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
self._check_policy(policy)
self._put_policy(node, policy, replace)
-
+
@backend_method
def put_account(self, user, account, policy={}):
"""Create a new account with the given name."""
-
- logger.debug("put_account: %s %s", account, policy)
+
+ logger.debug("put_account: %s %s %s", user, account, policy)
if user != account:
raise NotAllowedError
node = self.node.node_lookup(account)
if node is not None:
- raise NameError('Account already exists')
+ raise AccountExists('Account already exists')
if policy:
self._check_policy(policy)
node = self._put_path(user, self.ROOTNODE, account)
self._put_policy(node, policy, True)
-
+
@backend_method
def delete_account(self, user, account):
"""Delete the account with the given name."""
-
- logger.debug("delete_account: %s", account)
+
+ logger.debug("delete_account: %s %s", user, account)
if user != account:
raise NotAllowedError
node = self.node.node_lookup(account)
if node is None:
return
if not self.node.node_remove(node):
- raise IndexError('Account is not empty')
+ raise AccountNotEmpty('Account is not empty')
self.permissions.group_destroy(account)
-
+
@backend_method
- def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
+ def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
"""Return a list of containers existing under an account."""
-
- logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
+
+ logger.debug("list_containers: %s %s %s %s %s %s %s", user,
+ account, marker, limit, shared, until, public)
if user != account:
if until or account not in self._allowed_accounts(user):
raise NotAllowedError
allowed = self._allowed_containers(user, account)
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
- if shared:
- allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
- allowed = list(set(allowed))
+ if shared or public:
+ allowed = set()
+ if shared:
+ allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
+ if public:
+ allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
+ allowed = sorted(allowed)
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
node = self.node.node_lookup(account)
- return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
-
+ containers = [x[0] for x in self._list_object_properties(
+ node, account, '', '/', marker, limit, False, None, [], until)]
+ start, limit = self._list_limits(
+ [x[0] for x in containers], marker, limit)
+ return containers[start:start + limit]
+
@backend_method
def list_container_meta(self, user, account, container, domain, until=None):
"""Return a list with all the container's object meta keys for the domain."""
-
- logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
+
+ logger.debug("list_container_meta: %s %s %s %s %s", user,
+ account, container, domain, until)
allowed = []
if user != account:
if until:
raise NotAllowedError
- allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+ allowed = self.permissions.access_list_paths(
+ user, '/'.join((account, container)))
if not allowed:
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
allowed = self._get_formatted_paths(allowed)
return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
-
+
@backend_method
def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
"""Return a dictionary with the container metadata for the domain."""
-
- logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
+
+ logger.debug("get_container_meta: %s %s %s %s %s", user,
+ account, container, domain, until)
if user != account:
if until or container not in self._allowed_containers(user, account):
raise NotAllowedError
if until is None:
modified = tstamp
else:
- modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = self._get_statistics(
+ node)[2] # Overall last modification.
modified = max(modified, mtime)
-
+
if user != account:
meta = {'name': container}
else:
meta = {}
if include_user_defined:
- meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
+ meta.update(
+ dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': container, 'count': count, 'bytes': bytes})
meta.update({'modified': modified})
return meta
-
+
@backend_method
def update_container_meta(self, user, account, container, domain, meta, replace=False):
"""Update the metadata associated with the container for the domain."""
-
- logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
+
+ logger.debug("update_container_meta: %s %s %s %s %s %s",
+ user, account, container, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
- self._put_metadata(user, node, domain, meta, replace)
-
+ src_version_id, dest_version_id = self._put_metadata(
+ user, node, domain, meta, replace)
+ if src_version_id is not None:
+ versioning = self._get_policy(node)['versioning']
+ if versioning != 'auto':
+ self.node.version_remove(src_version_id)
+
@backend_method
def get_container_policy(self, user, account, container):
"""Return a dictionary with the container policy."""
-
- logger.debug("get_container_policy: %s %s", account, container)
+
+ logger.debug(
+ "get_container_policy: %s %s %s", user, account, container)
if user != account:
if container not in self._allowed_containers(user, account):
raise NotAllowedError
return {}
path, node = self._lookup_container(account, container)
return self._get_policy(node)
-
+
@backend_method
def update_container_policy(self, user, account, container, policy, replace=False):
"""Update the policy associated with the container."""
-
- logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
+
+ logger.debug("update_container_policy: %s %s %s %s %s",
+ user, account, container, policy, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
self._check_policy(policy)
self._put_policy(node, policy, replace)
-
+
@backend_method
def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name."""
-
- logger.debug("put_container: %s %s %s", account, container, policy)
+
+ logger.debug(
+ "put_container: %s %s %s %s", user, account, container, policy)
if user != account:
raise NotAllowedError
try:
except NameError:
pass
else:
- raise NameError('Container already exists')
+ raise ContainerExists('Container already exists')
if policy:
self._check_policy(policy)
path = '/'.join((account, container))
- node = self._put_path(user, self._lookup_account(account, True)[1], path)
+ node = self._put_path(
+ user, self._lookup_account(account, True)[1], path)
self._put_policy(node, policy, True)
-
+
@backend_method
- def delete_container(self, user, account, container, until=None):
+ def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
"""Delete/purge the container with the given name."""
-
- logger.debug("delete_container: %s %s %s", account, container, until)
+
+ logger.debug("delete_container: %s %s %s %s %s %s", user,
+ account, container, until, prefix, delimiter)
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
-
+
if until is not None:
- hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+ hashes, size, serials = self.node.node_purge_children(
+ node, until, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
- self._report_size_change(user, account, -size, {'action': 'container purge'})
+ self._report_size_change(user, account, -size,
+ {'action':'container purge', 'path': path,
+ 'versions': serials})
return
-
- if self._get_statistics(node)[0] > 0:
- raise IndexError('Container is not empty')
- hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
- for h in hashes:
- self.store.map_delete(h)
- self.node.node_purge_children(node, inf, CLUSTER_DELETED)
- self.node.node_remove(node)
- self._report_size_change(user, account, -size, {'action': 'container delete'})
-
- def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
+
+ if not delimiter:
+ if self._get_statistics(node)[0] > 0:
+ raise ContainerNotEmpty('Container is not empty')
+ hashes, size, serials = self.node.node_purge_children(
+ node, inf, CLUSTER_HISTORY)
+ for h in hashes:
+ self.store.map_delete(h)
+ self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+ self.node.node_remove(node)
+ self._report_size_change(user, account, -size,
+ {'action': 'container delete',
+ 'path': path,
+ 'versions': serials})
+ else:
+ # remove only contents
+ src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+ paths = []
+ for t in src_names:
+ path = '/'.join((account, container, t[0]))
+ node = t[2]
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+ del_size = self._apply_versioning(
+ account, container, src_version_id)
+ if del_size:
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete',
+ 'path': path, 'versions': [dest_version_id]})
+ self._report_object_change(
+ user, account, path, details={'action': 'object delete'})
+ paths.append(path)
+ self.permissions.access_clear_bulk(paths)
+
+ def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
if user != account and until:
raise NotAllowedError
- allowed = self._list_object_permissions(user, account, container, prefix, shared)
+ if shared and public:
+ # get shared first
+ shared = self._list_object_permissions(
+ user, account, container, prefix, shared=True, public=False)
+ objects = set()
+ if shared:
+ path, node = self._lookup_container(account, container)
+ shared = self._get_formatted_paths(shared)
+ objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
+
+ # get public
+ objects |= set(self._list_public_object_properties(
+ user, account, container, prefix, all_props))
+ objects = list(objects)
+
+ objects.sort(key=lambda x: x[0])
+ start, limit = self._list_limits(
+ [x[0] for x in objects], marker, limit)
+ return objects[start:start + limit]
+ elif public:
+ objects = self._list_public_object_properties(
+ user, account, container, prefix, all_props)
+ start, limit = self._list_limits(
+ [x[0] for x in objects], marker, limit)
+ return objects[start:start + limit]
+
+ allowed = self._list_object_permissions(
+ user, account, container, prefix, shared, public)
if shared and not allowed:
return []
path, node = self._lookup_container(account, container)
allowed = self._get_formatted_paths(allowed)
- return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
-
- def _list_object_permissions(self, user, account, container, prefix, shared):
+ objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
+ start, limit = self._list_limits(
+ [x[0] for x in objects], marker, limit)
+ return objects[start:start + limit]
+
+ def _list_public_object_properties(self, user, account, container, prefix, all_props):
+ public = self._list_object_permissions(
+ user, account, container, prefix, shared=False, public=True)
+ paths, nodes = self._lookup_objects(public)
+ path = '/'.join((account, container))
+ cont_prefix = path + '/'
+ paths = [x[len(cont_prefix):] for x in paths]
+ props = self.node.version_lookup_bulk(nodes, all_props=all_props)
+ objects = [(path,) + props for path, props in zip(paths, props)]
+ return objects
+
+ def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
+ objects = []
+ while True:
+ marker = objects[-1] if objects else None
+ limit = 10000
+ l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
+ objects.extend(l)
+ if not l or len(l) < limit:
+ break
+ return objects
+
+ def _list_object_permissions(self, user, account, container, prefix, shared, public):
allowed = []
path = '/'.join((account, container, prefix)).rstrip('/')
if user != account:
if not allowed:
raise NotAllowedError
else:
+ allowed = set()
if shared:
- allowed = self.permissions.access_list_shared(path)
- if not allowed:
- return []
+ allowed.update(self.permissions.access_list_shared(path))
+ if public:
+ allowed.update(
+ [x[0] for x in self.permissions.public_list(path)])
+ allowed = sorted(allowed)
+ if not allowed:
+ return []
return allowed
-
+
@backend_method
- def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
+ def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
"""Return a list of object (name, version_id) tuples existing under a container."""
-
- logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
- return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
-
+
+ logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
+ return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
+
@backend_method
- def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
+ def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
"""Return a list of object metadata dicts existing under a container."""
-
- logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
- props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
+
+ logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
+ props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
objects = []
for p in props:
if len(p) == 2:
'uuid': p[self.UUID + 1],
'checksum': p[self.CHECKSUM + 1]})
return objects
-
+
@backend_method
def list_object_permissions(self, user, account, container, prefix=''):
"""Return a list of paths that enforce permissions under a container."""
-
- logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
- return self._list_object_permissions(user, account, container, prefix, True)
-
+
+ logger.debug("list_object_permissions: %s %s %s %s", user,
+ account, container, prefix)
+ return self._list_object_permissions(user, account, container, prefix, True, False)
+
@backend_method
def list_object_public(self, user, account, container, prefix=''):
"""Return a dict mapping paths to public ids for objects that are public under a container."""
-
- logger.debug("list_object_public: %s %s %s", account, container, prefix)
+
+ logger.debug("list_object_public: %s %s %s %s", user,
+ account, container, prefix)
public = {}
for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
public[path] = p + ULTIMATE_ANSWER
return public
-
+
@backend_method
def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
"""Return a dictionary with the object metadata for the domain."""
-
- logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
+
+ logger.debug("get_object_meta: %s %s %s %s %s %s", user,
+ account, container, name, domain, version)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
modified = props[self.MTIME]
else:
try:
- modified = self._get_version(node)[self.MTIME] # Overall last modification.
- except NameError: # Object may be deleted.
- del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
+ modified = self._get_version(
+ node)[self.MTIME] # Overall last modification.
+ except NameError: # Object may be deleted.
+ del_props = self.node.version_lookup(
+ node, inf, CLUSTER_DELETED)
if del_props is None:
- raise NameError('Object does not exist')
+ raise ItemNotExists('Object does not exist')
modified = del_props[self.MTIME]
-
+
meta = {}
if include_user_defined:
- meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
+ meta.update(
+ dict(self.node.attribute_get(props[self.SERIAL], domain)))
meta.update({'name': name,
'bytes': props[self.SIZE],
'type': props[self.TYPE],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]})
return meta
-
+
@backend_method
def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
"""Update the metadata associated with the object for the domain and return the new version."""
-
- logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
+
+ logger.debug("update_object_meta: %s %s %s %s %s %s %s",
+ user, account, container, name, domain, meta, replace)
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
- src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+ src_version_id, dest_version_id = self._put_metadata(
+ user, node, domain, meta, replace)
self._apply_versioning(account, container, src_version_id)
return dest_version_id
-
+
@backend_method
def get_object_permissions(self, user, account, container, name):
"""Return the action allowed on the object, the path
from which the object gets its permissions from,
along with a dictionary containing the permissions."""
-
- logger.debug("get_object_permissions: %s %s %s", account, container, name)
+
+ logger.debug("get_object_permissions: %s %s %s %s", user,
+ account, container, name)
allowed = 'write'
permissions_path = self._get_permissions_path(account, container, name)
if user != account:
raise NotAllowedError
self._lookup_object(account, container, name)
return (allowed, permissions_path, self.permissions.access_get(permissions_path))
-
+
@backend_method
def update_object_permissions(self, user, account, container, name, permissions):
"""Update the permissions associated with the object."""
-
- logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
+
+ logger.debug("update_object_permissions: %s %s %s %s %s",
+ user, account, container, name, permissions)
if user != account:
raise NotAllowedError
path = self._lookup_object(account, container, name)[0]
self._check_permissions(path, permissions)
self.permissions.access_set(path, permissions)
-
+ self._report_sharing_change(user, account, path, {'members':
+ self.permissions.access_members(path)})
+
@backend_method
def get_object_public(self, user, account, container, name):
"""Return the public id of the object if applicable."""
-
- logger.debug("get_object_public: %s %s %s", account, container, name)
+
+ logger.debug(
+ "get_object_public: %s %s %s %s", user, account, container, name)
self._can_read(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
p = self.permissions.public_get(path)
if p is not None:
p += ULTIMATE_ANSWER
return p
-
+
@backend_method
def update_object_public(self, user, account, container, name, public):
"""Update the public status of the object."""
-
- logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
+
+ logger.debug("update_object_public: %s %s %s %s %s", user,
+ account, container, name, public)
self._can_write(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
if not public:
self.permissions.public_unset(path)
else:
self.permissions.public_set(path)
-
+
@backend_method
def get_object_hashmap(self, user, account, container, name, version=None):
"""Return the object's size and a list with partial hashes."""
-
- logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
+
+ logger.debug("get_object_hashmap: %s %s %s %s %s", user,
+ account, container, name, version)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
-
- def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
+
+ def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
if permissions is not None:
path = '/'.join((account, container, name))
self._check_permissions(path, permissions)
-
+
account_path, account_node = self._lookup_account(account, True)
- container_path, container_node = self._lookup_container(account, container)
- path, node = self._put_object_node(container_path, container_node, name)
+ container_path, container_node = self._lookup_container(
+ account, container)
+ path, node = self._put_object_node(
+ container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
-
+
+ # Handle meta.
+ if src_version_id is None:
+ src_version_id = pre_version_id
+ self._put_metadata_duplicate(
+ src_version_id, dest_version_id, domain, meta, replace_meta)
+
# Check quota.
del_size = self._apply_versioning(account, container, pre_version_id)
size_delta = size - del_size
(container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
- self._report_size_change(user, account, size_delta, {'action': 'object update'})
-
+ self._report_size_change(user, account, size_delta,
+ {'action': 'object update', 'path': path,
+ 'versions': [dest_version_id]})
+
if permissions is not None:
self.permissions.access_set(path, permissions)
- return pre_version_id, dest_version_id
-
+ self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
+
+ self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
+ return dest_version_id
+
@backend_method
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
-
- logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
- if size == 0: # No such thing as an empty hashmap.
+
+ logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
+ account, container, name, size, type, hashmap, checksum)
+ if size == 0: # No such thing as an empty hashmap.
hashmap = [self.put_block('')]
map = HashMap(self.block_size, self.hash_algorithm)
map.extend([binascii.unhexlify(x) for x in hashmap])
ie = IndexError()
ie.data = [binascii.hexlify(x) for x in missing]
raise ie
-
+
hash = map.hash()
- pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
- self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+ dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
self.store.map_put(hash, map)
return dest_version_id
-
+
@backend_method
def update_object_checksum(self, user, account, container, name, version, checksum):
"""Update an object's checksum."""
-
- logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
+
+ logger.debug("update_object_checksum: %s %s %s %s %s %s",
+ user, account, container, name, version, checksum)
# Update objects with greater version and same hashmap and size (fix metadata updates).
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
versions = self.node.node_get_versions(node)
for x in versions:
if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
- self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
-
- def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
+ self.node.version_put_property(
+ x[self.SERIAL], 'checksum', checksum)
+
+ def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
+ dest_version_ids = []
self._can_read(user, src_account, src_container, src_name)
path, node = self._lookup_object(src_account, src_container, src_name)
# TODO: Will do another fetch of the properties in duplicate version...
- props = self._get_version(node, src_version) # Check to see if source exists.
+ props = self._get_version(
+ node, src_version) # Check to see if source exists.
src_version_id = props[self.SERIAL]
hash = props[self.HASH]
size = props[self.SIZE]
-
- is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
- pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
- self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
- return dest_version_id
-
+ is_copy = not is_move and (src_account, src_container, src_name) != (
+ dest_account, dest_container, dest_name) # New uuid.
+ dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+ if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+ self._delete_object(user, src_account, src_container, src_name)
+
+ if delimiter:
+ prefix = src_name + \
+ delimiter if not src_name.endswith(delimiter) else src_name
+ src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+ src_names.sort(key=lambda x: x[2]) # order by nodes
+ paths = [elem[0] for elem in src_names]
+ nodes = [elem[2] for elem in src_names]
+ # TODO: Will do another fetch of the properties in duplicate version...
+ props = self._get_versions(nodes) # Check to see if source exists.
+
+ for prop, path, node in zip(props, paths, nodes):
+ src_version_id = prop[self.SERIAL]
+ hash = prop[self.HASH]
+ vtype = prop[self.TYPE]
+ size = prop[self.SIZE]
+ dest_prefix = dest_name + delimiter if not dest_name.endswith(
+ delimiter) else dest_name
+ vdest_name = path.replace(prefix, dest_prefix, 1)
+ dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+ if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+ self._delete_object(user, src_account, src_container, path)
+ return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
+
@backend_method
- def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
+ def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
"""Copy an object's data and metadata."""
-
- logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
- dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
+
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
return dest_version_id
-
+
@backend_method
- def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
+ def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
"""Move an object's data and metadata."""
-
- logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
+
+ logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
if user != src_account:
raise NotAllowedError
- dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
- if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
- self._delete_object(user, src_account, src_container, src_name)
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
return dest_version_id
-
- def _delete_object(self, user, account, container, name, until=None):
+
+ def _delete_object(self, user, account, container, name, until=None, delimiter=None):
if user != account:
raise NotAllowedError
-
+
if until is not None:
path = '/'.join((account, container, name))
node = self.node.node_lookup(path)
return
hashes = []
size = 0
- h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+ serials = []
+ h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
hashes += h
size += s
- h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+ serials += v
+ h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
hashes += h
size += s
+ serials += v
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED)
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
- self._report_size_change(user, account, -size, {'action': 'object purge'})
+ self._report_size_change(user, account, -size,
+ {'action': 'object purge', 'path': path,
+ 'versions': serials})
return
-
+
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
- self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete', 'path': path,
+ 'versions': [dest_version_id]})
+ self._report_object_change(
+ user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
-
+
+ if delimiter:
+ prefix = name + delimiter if not name.endswith(delimiter) else name
+ src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+ paths = []
+ for t in src_names:
+ path = '/'.join((account, container, t[0]))
+ node = t[2]
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+ del_size = self._apply_versioning(
+ account, container, src_version_id)
+ if del_size:
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete',
+ 'path': path,
+ 'versions': [dest_version_id]})
+ self._report_object_change(
+ user, account, path, details={'action': 'object delete'})
+ paths.append(path)
+ self.permissions.access_clear_bulk(paths)
+
@backend_method
- def delete_object(self, user, account, container, name, until=None):
+ def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
"""Delete/purge an object."""
-
- logger.debug("delete_object: %s %s %s %s", account, container, name, until)
- self._delete_object(user, account, container, name, until)
-
+
+ logger.debug("delete_object: %s %s %s %s %s %s %s", user,
+ account, container, name, until, prefix, delimiter)
+ self._delete_object(user, account, container, name, until, delimiter)
+
@backend_method
def list_versions(self, user, account, container, name):
"""Return a list of all (version, version_timestamp) tuples for an object."""
-
- logger.debug("list_versions: %s %s %s", account, container, name)
+
+ logger.debug(
+ "list_versions: %s %s %s %s", user, account, container, name)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
versions = self.node.node_get_versions(node)
return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
-
+
@backend_method
def get_uuid(self, user, uuid):
"""Return the (account, container, name) for the UUID given."""
-
- logger.debug("get_uuid: %s", uuid)
+
+ logger.debug("get_uuid: %s %s", user, uuid)
info = self.node.latest_uuid(uuid)
if info is None:
raise NameError
account, container, name = path.split('/', 2)
self._can_read(user, account, container, name)
return (account, container, name)
-
+
@backend_method
def get_public(self, user, public):
"""Return the (account, container, name) for the public id given."""
-
- logger.debug("get_public: %s", public)
+
+ logger.debug("get_public: %s %s", user, public)
if public is None or public < ULTIMATE_ANSWER:
raise NameError
path = self.permissions.public_path(public - ULTIMATE_ANSWER)
account, container, name = path.split('/', 2)
self._can_read(user, account, container, name)
return (account, container, name)
-
+
@backend_method(autocommit=0)
def get_block(self, hash):
"""Return a block's data."""
-
+
logger.debug("get_block: %s", hash)
block = self.store.block_get(binascii.unhexlify(hash))
if not block:
- raise NameError('Block does not exist')
+ raise ItemNotExists('Block does not exist')
return block
-
+
@backend_method(autocommit=0)
def put_block(self, data):
"""Store a block and return the hash."""
-
+
logger.debug("put_block: %s", len(data))
return binascii.hexlify(self.store.block_put(data))
-
+
@backend_method(autocommit=0)
def update_block(self, hash, data, offset=0):
"""Update a known block and return the hash."""
-
+
logger.debug("update_block: %s %s %s", hash, len(data), offset)
if offset == 0 and len(data) == self.block_size:
return self.put_block(data)
h = self.store.block_update(binascii.unhexlify(hash), offset, data)
return binascii.hexlify(h)
-
+
# Path functions.
-
+
def _generate_uuid(self):
return str(uuidlib.uuid4())
-
+
def _put_object_node(self, path, parent, name):
path = '/'.join((path, name))
node = self.node.node_lookup(path)
if node is None:
node = self.node.node_create(parent, path)
return path, node
-
+
def _put_path(self, user, parent, path):
node = self.node.node_create(parent, path)
- self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
+ self.node.version_create(node, None, 0, '', None, user,
+ self._generate_uuid(), '', CLUSTER_NORMAL)
return node
-
+
def _lookup_account(self, account, create=True):
node = self.node.node_lookup(account)
if node is None and create:
- node = self._put_path(account, self.ROOTNODE, account) # User is account.
+ node = self._put_path(
+ account, self.ROOTNODE, account) # User is account.
return account, node
-
+
def _lookup_container(self, account, container):
path = '/'.join((account, container))
node = self.node.node_lookup(path)
if node is None:
- raise NameError('Container does not exist')
+ raise ItemNotExists('Container does not exist')
return path, node
-
+
def _lookup_object(self, account, container, name):
path = '/'.join((account, container, name))
node = self.node.node_lookup(path)
if node is None:
- raise NameError('Object does not exist')
+ raise ItemNotExists('Object does not exist')
return path, node
-
+
+ def _lookup_objects(self, paths):
+ nodes = self.node.node_lookup_bulk(paths)
+ return paths, nodes
+
def _get_properties(self, node, until=None):
"""Return properties until the timestamp given."""
-
+
before = until if until is not None else inf
props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
if props is None and until is not None:
props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
if props is None:
- raise NameError('Path does not exist')
+ raise ItemNotExists('Path does not exist')
return props
-
+
def _get_statistics(self, node, until=None):
"""Return count, sum of size and latest timestamp of everything under node."""
-
+
if until is None:
stats = self.node.statistics_get(node, CLUSTER_NORMAL)
else:
if stats is None:
stats = (0, 0, 0)
return stats
-
+
def _get_version(self, node, version=None):
if version is None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is None:
- raise NameError('Object does not exist')
+ raise ItemNotExists('Object does not exist')
else:
try:
version = int(version)
except ValueError:
- raise IndexError('Version does not exist')
+ raise VersionNotExists('Version does not exist')
props = self.node.version_get_properties(version)
if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
- raise IndexError('Version does not exist')
+ raise VersionNotExists('Version does not exist')
return props
-
+
+ def _get_versions(self, nodes):
+ return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
+
def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
"""Create a new version of the node."""
-
- props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
+
+ props = self.node.version_lookup(
+ node if src_node is None else src_node, inf, CLUSTER_NORMAL)
if props is not None:
src_version_id = props[self.SERIAL]
src_hash = props[self.HASH]
src_size = 0
src_type = ''
src_checksum = ''
- if size is None: # Set metadata.
- hash = src_hash # This way hash can be set to None (account or container).
+ if size is None: # Set metadata.
+ hash = src_hash # This way hash can be set to None (account or container).
size = src_size
if type is None:
type = src_type
if checksum is None:
checksum = src_checksum
- uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
-
+ uuid = self._generate_uuid(
+ ) if (is_copy or src_version_id is None) else props[self.UUID]
+
if src_node is None:
pre_version_id = src_version_id
else:
pre_version_id = props[self.SERIAL]
if pre_version_id is not None:
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
-
+
dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
return pre_version_id, dest_version_id
-
+
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
if src_version_id is not None:
self.node.attribute_copy(src_version_id, dest_version_id)
if not replace:
- self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
- self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+ self.node.attribute_del(dest_version_id, domain, (
+ k for k, v in meta.iteritems() if v == ''))
+ self.node.attribute_set(dest_version_id, domain, (
+ (k, v) for k, v in meta.iteritems() if v != ''))
else:
self.node.attribute_del(dest_version_id, domain)
- self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
-
+ self.node.attribute_set(dest_version_id, domain, ((
+ k, v) for k, v in meta.iteritems()))
+
def _put_metadata(self, user, node, domain, meta, replace=False):
"""Create a new version and store metadata."""
-
- src_version_id, dest_version_id = self._put_version_duplicate(user, node)
- self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
+
+ src_version_id, dest_version_id = self._put_version_duplicate(
+ user, node)
+ self._put_metadata_duplicate(
+ src_version_id, dest_version_id, domain, meta, replace)
return src_version_id, dest_version_id
-
+
def _list_limits(self, listing, marker, limit):
start = 0
if marker:
if not limit or limit > 10000:
limit = 10000
return start, limit
-
+
def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
cont_prefix = path + '/'
prefix = cont_prefix + prefix
before = until if until is not None else inf
filterq = keys if domain else []
sizeq = size_range
-
+
objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
objects.extend([(p, None) for p in prefixes] if virtual else [])
objects.sort(key=lambda x: x[0])
objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
-
- start, limit = self._list_limits([x[0] for x in objects], marker, limit)
- return objects[start:start + limit]
-
+ return objects
+
# Reporting functions.
-
+
def _report_size_change(self, user, account, size, details={}):
- logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node)[1]
details.update({'user': user, 'total': total})
- self.queue.send(account, 'diskspace', size, details)
-
+ logger.debug(
+ "_report_size_change: %s %s %s %s", user, account, size, details)
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
+ account, QUEUE_INSTANCE_ID, 'diskspace',
+ float(size), details))
+
+ serial = self.quotaholder.issue_commission(
+ context = {},
+ target = account,
+ key = '1',
+ clientkey = 'pithos',
+ ownerkey = '',
+ name = details['path'] if 'path' in details else '',
+ provisions = (('pithos+', 'pithos+.diskspace', size),)
+ )
+ self.serials.append(serial)
+
+ def _report_object_change(self, user, account, path, details={}):
+ details.update({'user': user})
+ logger.debug("_report_object_change: %s %s %s %s", user,
+ account, path, details)
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
+ account, QUEUE_INSTANCE_ID, 'object', path, details))
+
+ def _report_sharing_change(self, user, account, path, details={}):
+ logger.debug("_report_permissions_change: %s %s %s %s",
+ user, account, path, details)
+ details.update({'user': user})
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
+ account, QUEUE_INSTANCE_ID, 'sharing', path, details))
+
# Policy functions.
-
+
def _check_policy(self, policy):
for k in policy.keys():
if policy[k] == '':
policy[k] = self.default_policy.get(k)
for k, v in policy.iteritems():
if k == 'quota':
- q = int(v) # May raise ValueError.
+ q = int(v) # May raise ValueError.
if q < 0:
raise ValueError
elif k == 'versioning':
raise ValueError
else:
raise ValueError
-
+
def _put_policy(self, node, policy, replace):
if replace:
for k, v in self.default_policy.iteritems():
if k not in policy:
policy[k] = v
self.node.policy_set(node, policy)
-
+
def _get_policy(self, node):
policy = self.default_policy.copy()
policy.update(self.node.policy_get(node))
return policy
-
+
def _apply_versioning(self, account, container, version_id):
"""Delete the provided version if such is the policy.
Return size of object removed.
"""
-
+
if version_id is None:
return 0
path, node = self._lookup_container(account, container)
self.store.map_delete(hash)
return size
return 0
-
+
# Access control functions.
-
+
def _check_groups(self, groups):
# raise ValueError('Bad characters in groups')
pass
-
+
def _check_permissions(self, path, permissions):
# raise ValueError('Bad characters in permissions')
pass
-
+
def _get_formatted_paths(self, paths):
formatted = []
for p in paths:
if node is not None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
- if props[self.TYPE] in ('application/directory', 'application/folder'):
+ if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
formatted.append((p, self.MATCH_EXACT))
return formatted
-
+
def _get_permissions_path(self, account, container, name):
path = '/'.join((account, container, name))
permission_paths = self.permissions.access_inherit(path)
if node is not None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
- if props[self.TYPE] in ('application/directory', 'application/folder'):
+ if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
return p
return None
-
+
def _can_read(self, user, account, container, name):
if user == account:
return True
raise NotAllowedError
if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError
-
+
def _can_write(self, user, account, container, name):
if user == account:
return True
raise NotAllowedError
if not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError
-
+
def _allowed_accounts(self, user):
allow = set()
for path in self.permissions.access_list_paths(user):
allow.add(path.split('/', 1)[0])
return sorted(allow)
-
+
def _allowed_containers(self, user, account):
allow = set()
for path in self.permissions.access_list_paths(user, account):