-# Copyright 2011 GRNET S.A. All rights reserved.
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
import sys
import os
import time
-import sqlite3
+import uuid as uuidlib
import logging
-import hashlib
import binascii
-from base import NotAllowedError, BaseBackend
-from lib.hashfiler import Mapper, Blocker
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
+
+from pithos.lib.hashmap import HashMap
+
+# Default modules and settings.
+DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
+DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
+DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
+DEFAULT_BLOCK_PATH = 'data/'
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY = '#'
+QUEUE_CLIENT_ID = 2 # Pithos.
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
inf = float('inf')
+ULTIMATE_ANSWER = 42
+
logger = logging.getLogger(__name__)
+
def backend_method(func=None, autocommit=1):
if func is None:
def fn(func):
class ModularBackend(BaseBackend):
"""A modular backend.
- Uses modules for SQL functions and hashfiler for storage.
+ Uses modules for SQL functions and storage.
"""
- def __init__(self, mod, path, db):
+ def __init__(self, db_module=None, db_connection=None,
+ block_module=None, block_path=None,
+ queue_module=None, queue_connection=None):
+ db_module = db_module or DEFAULT_DB_MODULE
+ db_connection = db_connection or DEFAULT_DB_CONNECTION
+ block_module = block_module or DEFAULT_BLOCK_MODULE
+ block_path = block_path or DEFAULT_BLOCK_PATH
+ #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+ #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
+
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
- self.default_policy = {'quota': 0, 'versioning': 'auto'}
-
- if path and not os.path.exists(path):
- os.makedirs(path)
- if not os.path.isdir(path):
- raise RuntimeError("Cannot open path '%s'" % (path,))
-
- __import__(mod)
- self.mod = sys.modules[mod]
- self.wrapper = self.mod.dbwrapper.DBWrapper(db)
-
- params = {'blocksize': self.block_size,
- 'blockpath': os.path.join(path + '/blocks'),
- 'hashtype': self.hash_algorithm}
- self.blocker = Blocker(**params)
+ self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
- params = {'mappath': os.path.join(path + '/maps'),
- 'namelen': self.blocker.hashlen}
- self.mapper = Mapper(**params)
+ def load_module(m):
+ __import__(m)
+ return sys.modules[m]
- params = {'connection': self.wrapper.conn,
- 'cursor': self.wrapper.conn.cursor()}
- self.permissions = self.mod.permissions.Permissions(**params)
+ self.db_module = load_module(db_module)
+ self.wrapper = self.db_module.DBWrapper(db_connection)
+ params = {'wrapper': self.wrapper}
+ self.permissions = self.db_module.Permissions(**params)
for x in ['READ', 'WRITE']:
- setattr(self, x, getattr(self.mod.permissions, x))
- self.policy = self.mod.policy.Policy(**params)
- self.node = self.mod.node.Node(**params)
- for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
- setattr(self, x, getattr(self.mod.node, x))
+ setattr(self, x, getattr(self.db_module, x))
+ self.node = self.db_module.Node(**params)
+ for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
+ setattr(self, x, getattr(self.db_module, x))
+
+ self.block_module = load_module(block_module)
+ params = {'path': block_path,
+ 'block_size': self.block_size,
+ 'hash_algorithm': self.hash_algorithm}
+ self.store = self.block_module.Store(**params)
+
+ if queue_module and queue_connection:
+ self.queue_module = load_module(queue_module)
+ params = {'exchange': queue_connection,
+ 'message_key': QUEUE_MESSAGE_KEY,
+ 'client_id': QUEUE_CLIENT_ID}
+ self.queue = self.queue_module.Queue(**params)
+ else:
+ class NoQueue:
+ def send(self, *args):
+ pass
+
+ self.queue = NoQueue()
+
+ def close(self):
+ self.wrapper.close()
@backend_method
def list_accounts(self, user, marker=None, limit=10000):
return allowed[start:start + limit]
@backend_method
- def get_account_meta(self, user, account, until=None):
- """Return a dictionary with the account metadata."""
+ def get_account_meta(self, user, account, domain, until=None):
+ """Return a dictionary with the account metadata for the domain."""
- logger.debug("get_account_meta: %s %s", account, until)
+ logger.debug("get_account_meta: %s %s %s", account, domain, until)
path, node = self._lookup_account(account, user == account)
if user != account:
if until or node is None or account not in self._allowed_accounts(user):
else:
meta = {}
if props is not None:
- meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
+ meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
return meta
@backend_method
- def update_account_meta(self, user, account, meta, replace=False):
- """Update the metadata associated with the account."""
+ def update_account_meta(self, user, account, domain, meta, replace=False):
+ """Update the metadata associated with the account for the domain."""
- logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+ logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
- self._put_metadata(user, node, meta, replace, False)
+ self._put_metadata(user, node, domain, meta, replace)
@backend_method
def get_account_groups(self, user, account):
self.permissions.group_addmany(account, k, v)
@backend_method
- def put_account(self, user, account):
+ def get_account_policy(self, user, account):
+ """Return a dictionary with the account policy."""
+
+ logger.debug("get_account_policy: %s", account)
+ if user != account:
+ if account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ return {}
+ path, node = self._lookup_account(account, True)
+ return self._get_policy(node)
+
+ @backend_method
+ def update_account_policy(self, user, account, policy, replace=False):
+ """Update the policy associated with the account."""
+
+ logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_account(account, True)
+ self._check_policy(policy)
+ self._put_policy(node, policy, replace)
+
+ @backend_method
+ def put_account(self, user, account, policy={}):
"""Create a new account with the given name."""
- logger.debug("put_account: %s", account)
+ logger.debug("put_account: %s %s", account, policy)
if user != account:
raise NotAllowedError
node = self.node.node_lookup(account)
if node is not None:
raise NameError('Account already exists')
- self._put_path(user, self.ROOTNODE, account)
+ if policy:
+ self._check_policy(policy)
+ node = self._put_path(user, self.ROOTNODE, account)
+ self._put_policy(node, policy, True)
@backend_method
def delete_account(self, user, account):
return allowed[start:start + limit]
if shared:
allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
+ allowed = list(set(allowed))
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
node = self.node.node_lookup(account)
- return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+ return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
@backend_method
- def get_container_meta(self, user, account, container, until=None):
- """Return a dictionary with the container metadata."""
+ def get_container_meta(self, user, account, container, domain, until=None):
+ """Return a dictionary with the container metadata for the domain."""
- logger.debug("get_container_meta: %s %s %s", account, container, until)
+ logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
if user != account:
if until or container not in self._allowed_containers(user, account):
raise NotAllowedError
if user != account:
meta = {'name': container}
else:
- meta = dict(self.node.attribute_get(props[self.SERIAL]))
+ meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': container, 'count': count, 'bytes': bytes})
return meta
@backend_method
- def update_container_meta(self, user, account, container, meta, replace=False):
- """Update the metadata associated with the container."""
+ def update_container_meta(self, user, account, container, domain, meta, replace=False):
+ """Update the metadata associated with the container for the domain."""
- logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+ logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
- self._put_metadata(user, node, meta, replace, False)
+ self._put_metadata(user, node, domain, meta, replace)
@backend_method
def get_container_policy(self, user, account, container):
if container not in self._allowed_containers(user, account):
raise NotAllowedError
return {}
- path = self._lookup_container(account, container)[0]
- return self.policy.policy_get(path)
+ path, node = self._lookup_container(account, container)
+ return self._get_policy(node)
@backend_method
def update_container_policy(self, user, account, container, policy, replace=False):
- """Update the policy associated with the account."""
+ """Update the policy associated with the container."""
logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
if user != account:
raise NotAllowedError
- path = self._lookup_container(account, container)[0]
+ path, node = self._lookup_container(account, container)
self._check_policy(policy)
- if replace:
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ self._put_policy(node, policy, replace)
@backend_method
- def put_container(self, user, account, container, policy=None):
+ def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name."""
logger.debug("put_container: %s %s %s", account, container, policy)
if policy:
self._check_policy(policy)
path = '/'.join((account, container))
- self._put_path(user, self._lookup_account(account, True)[1], path)
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ node = self._put_path(user, self._lookup_account(account, True)[1], path)
+ self._put_policy(node, policy, True)
@backend_method
def delete_container(self, user, account, container, until=None):
path, node = self._lookup_container(account, container)
if until is not None:
- versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
+ hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+ for h in hashes:
+ self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
return
if self._get_statistics(node)[0] > 0:
raise IndexError('Container is not empty')
- versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
+ hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+ for h in hashes:
+ self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
- self.policy.policy_unset(path)
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+
+ # XXX: Up to here...
@backend_method
- def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+ def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
"""Return a list of objects existing under a container."""
- logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+ logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
allowed = []
if user != account:
if until:
if not allowed:
return []
path, node = self._lookup_container(account, container)
- return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+ return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
@backend_method
- def list_object_meta(self, user, account, container, until=None):
- """Return a list with all the container's object meta keys."""
+ def list_object_meta(self, user, account, container, domain, until=None):
+ """Return a list with all the container's object meta keys for the domain."""
- logger.debug("list_object_meta: %s %s %s", account, container, until)
+ logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
allowed = []
if user != account:
if until:
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
- return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+ return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
@backend_method
- def get_object_meta(self, user, account, container, name, version=None):
- """Return a dictionary with the object metadata."""
+ def get_object_meta(self, user, account, container, name, domain, version=None):
+ """Return a dictionary with the object metadata for the domain."""
- logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+ logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
if version is None:
modified = props[self.MTIME]
else:
- modified = self._get_version(node)[self.MTIME] # Overall last modification.
-
- meta = dict(self.node.attribute_get(props[self.SERIAL]))
- meta.update({'name': name, 'bytes': props[self.SIZE]})
+ try:
+ modified = self._get_version(node)[self.MTIME] # Overall last modification.
+ except NameError: # Object may be deleted.
+ del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
+ if del_props is None:
+ raise NameError('Object does not exist')
+ modified = del_props[self.MTIME]
+
+ meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
+ meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
- meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
+ meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
return meta
@backend_method
- def update_object_meta(self, user, account, container, name, meta, replace=False):
- """Update the metadata associated with the object."""
+ def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
+ """Update the metadata associated with the object for the domain and return the new version."""
- logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+ logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
- return self._put_metadata(user, node, meta, replace)
+ src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+ self._apply_versioning(account, container, src_version_id)
+ return dest_version_id
@backend_method
def get_object_permissions(self, user, account, container, name):
- """Return the path from which this object gets its permissions from,\
+ """Return the action allowed on the object, the path
+ from which the object gets its permissions from,
along with a dictionary containing the permissions."""
logger.debug("get_object_permissions: %s %s %s", account, container, name)
- self._can_read(user, account, container, name)
+ allowed = 'write'
+ if user != account:
+ path = '/'.join((account, container, name))
+ if self.permissions.access_check(path, self.WRITE, user):
+ allowed = 'write'
+ elif self.permissions.access_check(path, self.READ, user):
+ allowed = 'read'
+ else:
+ raise NotAllowedError
path = self._lookup_object(account, container, name)[0]
- return self.permissions.access_inherit(path)
+ return (allowed,) + self.permissions.access_inherit(path)
@backend_method
def update_object_permissions(self, user, account, container, name, permissions):
@backend_method
def get_object_public(self, user, account, container, name):
- """Return the public URL of the object if applicable."""
+ """Return the public id of the object if applicable."""
logger.debug("get_object_public: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
- if self.permissions.public_check(path):
- return '/public/' + path
- return None
+ p = self.permissions.public_get(path)
+ if p is not None:
+ p += ULTIMATE_ANSWER
+ return p
@backend_method
def update_object_public(self, user, account, container, name, public):
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
- hashmap = self.mapper.map_retr(props[self.SERIAL])
+ hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
+ def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
+ if permissions is not None and user != account:
+ raise NotAllowedError
+ self._can_write(user, account, container, name)
+ if permissions is not None:
+ path = '/'.join((account, container, name))
+ self._check_permissions(path, permissions)
+
+ account_path, account_node = self._lookup_account(account, True)
+ container_path, container_node = self._lookup_container(account, container)
+ path, node = self._put_object_node(container_path, container_node, name)
+ pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
+
+ # Check quota.
+ versioning = self._get_policy(container_node)['versioning']
+ if versioning != 'auto':
+ size_delta = size - 0 # TODO: Get previous size.
+ else:
+ size_delta = size
+ if size_delta > 0:
+ account_quota = long(self._get_policy(account_node)['quota'])
+ container_quota = long(self._get_policy(container_node)['quota'])
+ if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
+ (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
+ # This must be executed in a transaction, so the version is never created if it fails.
+ raise QuotaError
+
+ if permissions is not None:
+ self.permissions.access_set(path, permissions)
+ self._apply_versioning(account, container, pre_version_id)
+ return pre_version_id, dest_version_id
+
@backend_method
- def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+ def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
- if permissions is not None and user != account:
- raise NotAllowedError
- self._can_write(user, account, container, name)
- missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
+ if size == 0: # No such thing as an empty hashmap.
+ hashmap = [self.put_block('')]
+ map = HashMap(self.block_size, self.hash_algorithm)
+ map.extend([binascii.unhexlify(x) for x in hashmap])
+ missing = self.store.block_search(map)
if missing:
ie = IndexError()
ie.data = [binascii.hexlify(x) for x in missing]
raise ie
- path = '/'.join((account, container, name))
- if permissions is not None:
- self._check_permissions(path, permissions)
- path, node = self._put_object_node(account, container, name)
- src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
- self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
- if not replace_meta and src_version_id is not None:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
- if permissions is not None:
- self.permissions.access_set(path, permissions)
+
+ hash = map.hash()
+ pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
+ self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+ self.store.map_put(hash, map)
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
+ return dest_version_id
+
+ def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
+ self._can_read(user, src_account, src_container, src_name)
+ path, node = self._lookup_object(src_account, src_container, src_name)
+ # TODO: Will do another fetch of the properties in duplicate version...
+ props = self._get_version(node, src_version) # Check to see if source exists.
+ src_version_id = props[self.SERIAL]
+ hash = props[self.HASH]
+ size = props[self.SIZE]
+
+ is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
+ pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
+ self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
return dest_version_id
@backend_method
- def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
"""Copy an object's data and metadata."""
- logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
- if permissions is not None and user != account:
- raise NotAllowedError
- self._can_read(user, account, src_container, src_name)
- self._can_write(user, account, dest_container, dest_name)
- src_path, src_node = self._lookup_object(account, src_container, src_name)
- if permissions is not None:
- self._check_permissions(dest_path, permissions)
- dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
- src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
- if src_version_id is not None:
- self._copy_data(src_version_id, dest_version_id)
- if not replace_meta and src_version_id is not None:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
- if permissions is not None:
- dest_path = '/'.join((account, container, name))
- self.permissions.access_set(dest_path, permissions)
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
@backend_method
- def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+ def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
"""Move an object's data and metadata."""
- logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
- dest_version_id = self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
- self.delete_object(user, account, src_container, src_name)
+ logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
+ if user != src_account:
+ raise NotAllowedError
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
+ if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+ self._delete_object(user, src_account, src_container, src_name)
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
- @backend_method
- def delete_object(self, user, account, container, name, until=None):
- """Delete/purge an object."""
-
- logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+ def _delete_object(self, user, account, container, name, until=None):
if user != account:
raise NotAllowedError
node = self.node.node_lookup(path)
if node is None:
return
- versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
- versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
- self.node.node_purge_children(node, until, CLUSTER_DELETED)
+ hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
+ hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
+ for h in hashes:
+ self.store.map_delete(h)
+ self.node.node_purge(node, until, CLUSTER_DELETED)
try:
props = self._get_version(node)
except NameError:
- pass
- else:
self.permissions.access_clear(path)
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
return
path, node = self._lookup_object(account, container, name)
- self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
+ self._apply_versioning(account, container, src_version_id)
self.permissions.access_clear(path)
@backend_method
+ def delete_object(self, user, account, container, name, until=None):
+ """Delete/purge an object."""
+
+ logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+ self._delete_object(user, account, container, name, until)
+
+ @backend_method
def list_versions(self, user, account, container, name):
"""Return a list of all (version, version_timestamp) tuples for an object."""
logger.debug("list_versions: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
- return self.node.node_get_versions(node, ['serial', 'mtime'])
+ versions = self.node.node_get_versions(node)
+ return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
+
+ @backend_method
+ def get_uuid(self, user, uuid):
+ """Return the (account, container, name) for the UUID given."""
+
+ logger.debug("get_uuid: %s", uuid)
+ info = self.node.latest_uuid(uuid)
+ if info is None:
+ raise NameError
+ path, serial = info
+ account, container, name = path.split('/', 2)
+ self._can_read(user, account, container, name)
+ return (account, container, name)
+
+ @backend_method
+ def get_public(self, user, public):
+ """Return the (account, container, name) for the public id given."""
+
+ logger.debug("get_public: %s", public)
+ if public is None or public < ULTIMATE_ANSWER:
+ raise NameError
+ path = self.permissions.public_path(public - ULTIMATE_ANSWER)
+ if path is None:
+ raise NameError
+ account, container, name = path.split('/', 2)
+ self._can_read(user, account, container, name)
+ return (account, container, name)
@backend_method(autocommit=0)
def get_block(self, hash):
"""Return a block's data."""
logger.debug("get_block: %s", hash)
- blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
- if not blocks:
+ block = self.store.block_get(binascii.unhexlify(hash))
+ if not block:
raise NameError('Block does not exist')
- return blocks[0]
+ return block
@backend_method(autocommit=0)
def put_block(self, data):
"""Store a block and return the hash."""
logger.debug("put_block: %s", len(data))
- hashes, absent = self.blocker.block_stor((data,))
- return binascii.hexlify(hashes[0])
+ return binascii.hexlify(self.store.block_put(data))
@backend_method(autocommit=0)
def update_block(self, hash, data, offset=0):
logger.debug("update_block: %s %s %s", hash, len(data), offset)
if offset == 0 and len(data) == self.block_size:
return self.put_block(data)
- h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+ h = self.store.block_update(binascii.unhexlify(hash), offset, data)
return binascii.hexlify(h)
# Path functions.
- def _put_object_node(self, account, container, name):
- path, parent = self._lookup_container(account, container)
+ def _generate_uuid(self):
+ return str(uuidlib.uuid4())
+
+ def _put_object_node(self, path, parent, name):
path = '/'.join((path, name))
node = self.node.node_lookup(path)
if node is None:
def _put_path(self, user, parent, path):
node = self.node.node_create(parent, path)
- self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
+ self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
return node
def _lookup_account(self, account, create=True):
if props is None:
raise NameError('Object does not exist')
else:
+ try:
+ version = int(version)
+ except ValueError:
+ raise IndexError('Version does not exist')
props = self.node.version_get_properties(version)
if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
raise IndexError('Version does not exist')
return props
- def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
+ def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
+ """Create a new version of the node."""
- # Get source serial and size.
- if src_version is not None:
- src_props = self._get_version(src_node, src_version)
- src_version_id = src_props[self.SERIAL]
- size = src_props[self.SIZE]
+ props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
+ if props is not None:
+ src_version_id = props[self.SERIAL]
+ src_hash = props[self.HASH]
+ src_size = props[self.SIZE]
else:
- # Latest or create from scratch.
- try:
- src_props = self._get_version(src_node)
- src_version_id = src_props[self.SERIAL]
- size = src_props[self.SIZE]
- except NameError:
- src_version_id = None
- size = 0
- if dest_size is not None:
- size = dest_size
-
- # Move the latest version at destination to CLUSTER_HISTORY and create new.
- if src_node == dest_node and src_version is None and src_version_id is not None:
- self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+ src_version_id = None
+ src_hash = None
+ src_size = 0
+ if size is None:
+ hash = src_hash # This way hash can be set to None.
+ size = src_size
+ uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
+
+ if src_node is None:
+ pre_version_id = src_version_id
else:
- dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
- if dest_props is not None:
- self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
- dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
+ pre_version_id = None
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is not None:
+ pre_version_id = props[self.SERIAL]
+ if pre_version_id is not None:
+ self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
- return src_version_id, dest_version_id
-
- def _copy_data(self, src_version, dest_version):
- hashmap = self.mapper.map_retr(src_version)
- self.mapper.map_stor(dest_version, hashmap)
+ dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
+ return pre_version_id, dest_version_id
- def _get_metadata(self, version):
- if version is None:
- return {}
- return dict(self.node.attribute_get(version))
+ def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
+ if src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ if not replace:
+ self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
+ self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+ else:
+ self.node.attribute_del(dest_version_id, domain)
+ self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
- def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+ def _put_metadata(self, user, node, domain, meta, replace=False):
"""Create a new version and store metadata."""
- src_version_id, dest_version_id = self._copy_version(user, node, None, node)
- if not replace:
- if src_version_id is not None:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
- else:
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
- if copy_data and src_version_id is not None:
- self._copy_data(src_version_id, dest_version_id)
- return dest_version_id
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node)
+ self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
+ return src_version_id, dest_version_id
def _list_limits(self, listing, marker, limit):
start = 0
limit = 10000
return start, limit
- def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+ def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
cont_prefix = path + '/'
prefix = cont_prefix + prefix
start = cont_prefix + marker if marker else None
before = until if until is not None else inf
- filterq = ','.join(keys) if keys else None
+ filterq = keys if domain else []
+ sizeq = size_range
- objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
+ objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
objects.extend([(p, None) for p in prefixes] if virtual else [])
- objects.sort()
+ objects.sort(key=lambda x: x[0])
objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
start, limit = self._list_limits([x[0] for x in objects], marker, limit)
if q < 0:
raise ValueError
elif k == 'versioning':
- if v not in ['auto', 'manual', 'none']:
+ if v not in ['auto', 'none']:
raise ValueError
else:
raise ValueError
+ def _put_policy(self, node, policy, replace):
+ if replace:
+ for k, v in self.default_policy.iteritems():
+ if k not in policy:
+ policy[k] = v
+ self.node.policy_set(node, policy)
+
+ def _get_policy(self, node):
+ policy = self.default_policy.copy()
+ policy.update(self.node.policy_get(node))
+ return policy
+
+ def _apply_versioning(self, account, container, version_id):
+ if version_id is None:
+ return
+ path, node = self._lookup_container(account, container)
+ versioning = self._get_policy(node)['versioning']
+ if versioning != 'auto':
+ hash = self.node.version_remove(version_id)
+ self.store.map_delete(hash)
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+
# Access control functions.
def _check_groups(self, groups):
def _check_permissions(self, path, permissions):
# raise ValueError('Bad characters in permissions')
-
- # Check for existing permissions.
- paths = self.permissions.access_list(path)
- if paths:
- ae = AttributeError()
- ae.data = paths
- raise ae
+ pass
+
+ def _get_permissions_path(self, account, container, name):
+ path = '/'.join((account, container, name))
+ permission_paths = self.permissions.access_inherit(path)
+ permission_paths.sort()
+ permission_paths.reverse()
+ for p in permission_paths:
+ if p == path:
+ return p
+ else:
+ try:
+ parts = p.split('/', 2)
+ if len(parts) != 3:
+ return None
+ path, node = self._lookup_object(*p.split('/', 2))
+ props = self._get_version(node)
+ # XXX: Put type in properties...
+ meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
+ if meta['Content-Type'] == 'application/directory':
+ return p
+ except NameError:
+ pass
+ return None
def _can_read(self, user, account, container, name):
if user == account:
return True
- path = '/'.join((account, container, name))
+ path = self._get_permissions_path(account, container, name)
if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError