# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
+import sys
import os
import time
-import sqlite3
import logging
import hashlib
import binascii
-from base import NotAllowedError, BaseBackend
-from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
-from lib.permissions import Permissions, READ, WRITE
-from lib.policy import Policy
+from base import NotAllowedError, QuotaError, BaseBackend
from lib.hashfiler import Mapper, Blocker
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
logger = logging.getLogger(__name__)
+
+class HashMap(list):
+
+ def __init__(self, blocksize, blockhash):
+ super(HashMap, self).__init__()
+ self.blocksize = blocksize
+ self.blockhash = blockhash
+
+ def _hash_raw(self, v):
+ h = hashlib.new(self.blockhash)
+ h.update(v)
+ return h.digest()
+
+ def hash(self):
+ if len(self) == 0:
+ return self._hash_raw('')
+ if len(self) == 1:
+ return self.__getitem__(0)
+
+ h = list(self)
+ s = 2
+ while s < len(h):
+ s = s * 2
+ h += [('\x00' * len(h[0]))] * (s - len(h))
+ while len(h) > 1:
+ h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
+ return h[0]
+
+
def backend_method(func=None, autocommit=1):
if func is None:
def fn(func):
if not autocommit:
return func
def fn(self, *args, **kw):
- self.con.execute('begin deferred')
+ self.wrapper.execute()
try:
ret = func(self, *args, **kw)
- self.con.commit()
+ self.wrapper.commit()
return ret
except:
- self.con.rollback()
+ self.wrapper.rollback()
raise
return fn
class ModularBackend(BaseBackend):
"""A modular backend.
- Uses SQLite for storage.
+ Uses modules for SQL functions and hashfiler for storage.
"""
- # TODO: Create account if not present in all functions.
-
- def __init__(self, db):
+ def __init__(self, mod, path, db):
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
- self.default_policy = {'quota': 0, 'versioning': 'auto'}
+ self.default_policy = {'quota': 0, 'versioning': 'manual'}
- basepath = os.path.split(db)[0]
- if basepath and not os.path.exists(basepath):
- os.makedirs(basepath)
- if not os.path.isdir(basepath):
- raise RuntimeError("Cannot open database at '%s'" % (db,))
+ if path and not os.path.exists(path):
+ os.makedirs(path)
+ if not os.path.isdir(path):
+ raise RuntimeError("Cannot open path '%s'" % (path,))
- self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
+ __import__(mod)
+ self.mod = sys.modules[mod]
+ self.db = db
+ self.wrapper = self.mod.dbwrapper.DBWrapper(db)
params = {'blocksize': self.block_size,
- 'blockpath': basepath + '/blocks',
+ 'blockpath': os.path.join(path + '/blocks'),
'hashtype': self.hash_algorithm}
self.blocker = Blocker(**params)
- params = {'mappath': basepath + '/maps',
+ params = {'mappath': os.path.join(path + '/maps'),
'namelen': self.blocker.hashlen}
self.mapper = Mapper(**params)
- params = {'connection': self.con,
- 'cursor': self.con.cursor()}
- self.permissions = Permissions(**params)
- self.policy = Policy(**params)
- self.node = Node(**params)
-
- self.con.commit()
+ params = {'wrapper': self.wrapper}
+ self.permissions = self.mod.permissions.Permissions(**params)
+ for x in ['READ', 'WRITE']:
+ setattr(self, x, getattr(self.mod.permissions, x))
+ self.node = self.mod.node.Node(**params)
+ for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
+ setattr(self, x, getattr(self.mod.node, x))
+
+ def close(self):
+ self.wrapper.close()
@backend_method
def list_accounts(self, user, marker=None, limit=10000):
"""Return a list of accounts the user can access."""
+ logger.debug("list_accounts: %s %s %s", user, marker, limit)
allowed = self._allowed_accounts(user)
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
"""Return a dictionary with the account metadata."""
logger.debug("get_account_meta: %s %s", account, until)
- node = self._lookup_account(account, user == account)
+ path, node = self._lookup_account(account, user == account)
if user != account:
if until or node is None or account not in self._allowed_accounts(user):
raise NotAllowedError
try:
props = self._get_properties(node, until)
- version_id = props[SERIAL]
- mtime = props[MTIME]
+ mtime = props[self.MTIME]
except NameError:
- # Account does not exist before until.
- version_id = None
+ props = None
mtime = until
- object_count, bytes, tstamp = self._get_statistics(node, account, until)
- if mtime > tstamp:
- tstamp = mtime
+ count, bytes, tstamp = self._get_statistics(node, until)
+ tstamp = max(tstamp, mtime)
if until is None:
modified = tstamp
else:
- modified = self._get_statistics(node, account)[2] # Overall last modification.
- if mtime > modified:
- modified = mtime
-
- # Proper count.
- count = self.node.node_children(node)
- object_count -= count
+ modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = max(modified, mtime)
if user != account:
meta = {'name': account}
else:
meta = {}
- if version_id is not None:
- meta.update(dict(self.node.attribute_get(version_id)))
+ if props is not None:
+ meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
logger.debug("update_account_meta: %s %s %s", account, meta, replace)
if user != account:
raise NotAllowedError
- node = self._lookup_account(account, True)
- self._put_metadata(user, node, meta, replace, False)
+ path, node = self._lookup_account(account, True)
+ self._put_metadata(user, node, meta, replace)
@backend_method
def get_account_groups(self, user, account):
self.permissions.group_addmany(account, k, v)
@backend_method
- def put_account(self, user, account):
+ def get_account_policy(self, user, account):
+ """Return a dictionary with the account policy."""
+
+ logger.debug("get_account_policy: %s", account)
+ if user != account:
+ if account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ return {}
+ path, node = self._lookup_account(account, True)
+ return self._get_policy(node)
+
+ @backend_method
+ def update_account_policy(self, user, account, policy, replace=False):
+ """Update the policy associated with the account."""
+
+ logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_account(account, True)
+ self._check_policy(policy)
+ self._put_policy(node, policy, replace)
+
+ @backend_method
+ def put_account(self, user, account, policy={}):
"""Create a new account with the given name."""
- logger.debug("put_account: %s", account)
+ logger.debug("put_account: %s %s", account, policy)
if user != account:
raise NotAllowedError
node = self.node.node_lookup(account)
if node is not None:
raise NameError('Account already exists')
- node = self.node.node_create(ROOTNODE, account)
- self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
-
-
-
-
-
-
-
-
-
-
-
-
+ if policy:
+ self._check_policy(policy)
+ node = self._put_path(user, self.ROOTNODE, account)
+ self._put_policy(node, policy, True)
@backend_method
def delete_account(self, user, account):
logger.debug("delete_account: %s", account)
if user != account:
raise NotAllowedError
- count = self._get_pathstats(account)[0]
- if count > 0:
+ node = self.node.node_lookup(account)
+ if node is None:
+ return
+ if not self.node.node_remove(node):
raise IndexError('Account is not empty')
- sql = 'delete from versions where name = ?'
- self.con.execute(sql, (account,))
self.permissions.group_destroy(account)
@backend_method
def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
"""Return a list of containers existing under an account."""
- logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
+ logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
if user != account:
if until or account not in self._allowed_accounts(user):
raise NotAllowedError
allowed = self._allowed_containers(user, account)
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
- else:
- if shared:
- allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
- start, limit = self._list_limits(allowed, marker, limit)
- return allowed[start:start + limit]
- return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
+ if shared:
+ allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
+ allowed = list(set(allowed))
+ start, limit = self._list_limits(allowed, marker, limit)
+ return allowed[start:start + limit]
+ node = self.node.node_lookup(account)
+ return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
@backend_method
def get_container_meta(self, user, account, container, until=None):
if user != account:
if until or container not in self._allowed_containers(user, account):
raise NotAllowedError
- path, version_id, mtime = self._get_containerinfo(account, container, until)
- count, bytes, tstamp = self._get_pathstats(path, until)
- if mtime > tstamp:
- tstamp = mtime
+ path, node = self._lookup_container(account, container)
+ props = self._get_properties(node, until)
+ mtime = props[self.MTIME]
+ count, bytes, tstamp = self._get_statistics(node, until)
+ tstamp = max(tstamp, mtime)
if until is None:
modified = tstamp
else:
- modified = self._get_pathstats(path)[2] # Overall last modification
- if mtime > modified:
- modified = mtime
+ modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = max(modified, mtime)
if user != account:
- meta = {'name': container, 'modified': modified}
+ meta = {'name': container}
else:
- meta = self._get_metadata(path, version_id)
- meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
+ meta = dict(self.node.attribute_get(props[self.SERIAL]))
if until is not None:
meta.update({'until_timestamp': tstamp})
+ meta.update({'name': container, 'count': count, 'bytes': bytes})
+ meta.update({'modified': modified})
return meta
@backend_method
logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
if user != account:
raise NotAllowedError
- path, version_id, mtime = self._get_containerinfo(account, container)
- self._put_metadata(user, path, meta, replace, False)
+ path, node = self._lookup_container(account, container)
+ self._put_metadata(user, node, meta, replace)
@backend_method
def get_container_policy(self, user, account, container):
if container not in self._allowed_containers(user, account):
raise NotAllowedError
return {}
- path = self._get_containerinfo(account, container)[0]
- return self.policy.policy_get(path)
+ path, node = self._lookup_container(account, container)
+ return self._get_policy(node)
@backend_method
def update_container_policy(self, user, account, container, policy, replace=False):
- """Update the policy associated with the account."""
+ """Update the policy associated with the container."""
logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
if user != account:
raise NotAllowedError
- path = self._get_containerinfo(account, container)[0]
+ path, node = self._lookup_container(account, container)
self._check_policy(policy)
- if replace:
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ self._put_policy(node, policy, replace)
@backend_method
- def put_container(self, user, account, container, policy=None):
+ def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name."""
logger.debug("put_container: %s %s %s", account, container, policy)
if user != account:
raise NotAllowedError
try:
- path, version_id, mtime = self._get_containerinfo(account, container)
+ path, node = self._lookup_container(account, container)
except NameError:
pass
else:
if policy:
self._check_policy(policy)
path = '/'.join((account, container))
- version_id = self._put_version(path, user)[0]
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ node = self._put_path(user, self._lookup_account(account, True)[1], path)
+ self._put_policy(node, policy, True)
@backend_method
def delete_container(self, user, account, container, until=None):
logger.debug("delete_container: %s %s %s", account, container, until)
if user != account:
raise NotAllowedError
- path, version_id, mtime = self._get_containerinfo(account, container)
+ path, node = self._lookup_container(account, container)
if until is not None:
- sql = '''select version_id from versions where name like ? and tstamp <= ?
- and version_id not in (select version_id from (%s))'''
- sql = sql % self._sql_until() # Do not delete current versions.
- c = self.con.execute(sql, (path + '/%', until))
- for v in [x[0] for x in c.fetchall()]:
- self._del_version(v)
+ self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+ self.node.node_purge_children(node, until, CLUSTER_DELETED)
return
- count = self._get_pathstats(path)[0]
- if count > 0:
+ if self._get_statistics(node)[0] > 0:
raise IndexError('Container is not empty')
- sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
- self.con.execute(sql, (path, path + '/%',))
- self.policy.policy_unset(path)
- self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
+ self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+ self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+ self.node.node_remove(node)
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
else:
if shared:
allowed = self.permissions.access_list_shared('/'.join((account, container)))
- path, version_id, mtime = self._get_containerinfo(account, container, until)
- return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+ if not allowed:
+ return []
+ path, node = self._lookup_container(account, container)
+ return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
@backend_method
def list_object_meta(self, user, account, container, until=None):
allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
if not allowed:
raise NotAllowedError
- path, version_id, mtime = self._get_containerinfo(account, container, until)
- sql = '''select distinct m.key from (%s) o, metadata m
- where m.version_id = o.version_id and o.name like ?'''
- sql = sql % self._sql_until(until)
- param = (path + '/%',)
- if allowed:
- for x in allowed:
- sql += ' and o.name like ?'
- param += (x,)
- c = self.con.execute(sql, param)
- return [x[0] for x in c.fetchall()]
+ path, node = self._lookup_container(account, container)
+ before = until if until is not None else inf
+ return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
@backend_method
def get_object_meta(self, user, account, container, name, version=None):
logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
self._can_read(user, account, container, name)
- path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
+ path, node = self._lookup_object(account, container, name)
+ props = self._get_version(node, version)
if version is None:
- modified = mtime
+ modified = props[self.MTIME]
else:
- modified = self._get_version(path, version)[2] # Overall last modification
-
- meta = self._get_metadata(path, version_id)
- meta.update({'name': name, 'bytes': size})
- meta.update({'version': version_id, 'version_timestamp': mtime})
- meta.update({'modified': modified, 'modified_by': muser})
+ try:
+ modified = self._get_version(node)[self.MTIME] # Overall last modification.
+ except NameError: # Object may be deleted.
+ del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
+ if del_props is None:
+ raise NameError('Object does not exist')
+ modified = del_props[self.MTIME]
+
+ meta = dict(self.node.attribute_get(props[self.SERIAL]))
+ meta.update({'name': name, 'bytes': props[self.SIZE]})
+ meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
+ meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
return meta
@backend_method
logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
self._can_write(user, account, container, name)
- path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
- self._put_metadata(user, path, meta, replace)
+ path, node = self._lookup_object(account, container, name)
+ return self._put_metadata(user, node, meta, replace)
@backend_method
def get_object_permissions(self, user, account, container, name):
- """Return the path from which this object gets its permissions from,\
+ """Return the action allowed on the object, the path
+ from which the object gets its permissions from,
along with a dictionary containing the permissions."""
logger.debug("get_object_permissions: %s %s %s", account, container, name)
- self._can_read(user, account, container, name)
- path = self._get_objectinfo(account, container, name)[0]
- return self.permissions.access_inherit(path)
+ allowed = 'write'
+ if user != account:
+ path = '/'.join((account, container, name))
+ if self.permissions.access_check(path, self.WRITE, user):
+ allowed = 'write'
+ elif self.permissions.access_check(path, self.READ, user):
+ allowed = 'read'
+ else:
+ raise NotAllowedError
+ path = self._lookup_object(account, container, name)[0]
+ return (allowed,) + self.permissions.access_inherit(path)
@backend_method
def update_object_permissions(self, user, account, container, name, permissions):
logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
if user != account:
raise NotAllowedError
- path = self._get_objectinfo(account, container, name)[0]
+ path = self._lookup_object(account, container, name)[0]
self._check_permissions(path, permissions)
self.permissions.access_set(path, permissions)
logger.debug("get_object_public: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
- path = self._get_objectinfo(account, container, name)[0]
+ path = self._lookup_object(account, container, name)[0]
if self.permissions.public_check(path):
return '/public/' + path
return None
logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
self._can_write(user, account, container, name)
- path = self._get_objectinfo(account, container, name)[0]
+ path = self._lookup_object(account, container, name)[0]
if not public:
self.permissions.public_unset(path)
else:
logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
self._can_read(user, account, container, name)
- path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
- hashmap = self.mapper.map_retr(version_id)
- return size, [binascii.hexlify(x) for x in hashmap]
+ path, node = self._lookup_object(account, container, name)
+ props = self._get_version(node, version)
+ hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH]))
+ return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
- @backend_method
- def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
- """Create/update an object with the specified size and partial hashes."""
-
- logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+ def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
- missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
- if missing:
- ie = IndexError()
- ie.data = missing
- raise ie
- path = self._get_containerinfo(account, container)[0]
- path = '/'.join((path, name))
if permissions is not None:
+ path = '/'.join((account, container, name))
self._check_permissions(path, permissions)
- src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
- sql = 'update versions set size = ? where version_id = ?'
- self.con.execute(sql, (size, dest_version_id))
- self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
- for k, v in meta.iteritems():
- sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
- self.con.execute(sql, (dest_version_id, k, v))
+
+ account_path, account_node = self._lookup_account(account, True)
+ container_path, container_node = self._lookup_container(account, container)
+ path, node = self._put_object_node(container_path, container_node, name)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
+
+ # Check quota.
+ size_delta = size # Change with versioning.
+ if size_delta > 0:
+ account_quota = long(self._get_policy(account_node)['quota'])
+ container_quota = long(self._get_policy(container_node)['quota'])
+ if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
+ (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
+ # This must be executed in a transaction, so the version is never created if it fails.
+ raise QuotaError
+
+ if not replace_meta and src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
if permissions is not None:
self.permissions.access_set(path, permissions)
+ return dest_version_id
@backend_method
- def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
- """Copy an object's data and metadata."""
+ def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+ """Create/update an object with the specified size and partial hashes."""
- logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
- if permissions is not None and user != account:
- raise NotAllowedError
- self._can_read(user, account, src_container, src_name)
- self._can_write(user, account, dest_container, dest_name)
- self._get_containerinfo(account, src_container)
- if src_version is None:
- src_path = self._get_objectinfo(account, src_container, src_name)[0]
+ logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+ if size == 0: # No such thing as an empty hashmap.
+ hashmap = [self.put_block('')]
+ map = HashMap(self.block_size, self.hash_algorithm)
+ map.extend([binascii.unhexlify(x) for x in hashmap])
+ missing = self.blocker.block_ping(map)
+ if missing:
+ ie = IndexError()
+ ie.data = [binascii.hexlify(x) for x in missing]
+ raise ie
+
+ hash = map.hash()
+ dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+ self.mapper.map_stor(hash, map)
+ return dest_version_id
+
+ def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ self._can_read(user, src_account, src_container, src_name)
+ path, node = self._lookup_object(src_account, src_container, src_name)
+ props = self._get_version(node, src_version)
+ src_version_id = props[self.SERIAL]
+ hash = props[self.HASH]
+ size = props[self.SIZE]
+
+ if replace_meta:
+ meta = dest_meta
else:
- src_path = '/'.join((account, src_container, src_name))
- dest_path = self._get_containerinfo(account, dest_container)[0]
- dest_path = '/'.join((dest_path, dest_name))
- if permissions is not None:
- self._check_permissions(dest_path, permissions)
- src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
- for k, v in dest_meta.iteritems():
- sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
- self.con.execute(sql, (dest_version_id, k, v))
- if permissions is not None:
- self.permissions.access_set(dest_path, permissions)
+ meta = {}
+ dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
+ if not replace_meta:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+ return dest_version_id
@backend_method
- def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
- """Move an object's data and metadata."""
+ def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ """Copy an object's data and metadata."""
- logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
- self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
- self.delete_object(user, account, src_container, src_name)
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+ return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
@backend_method
- def delete_object(self, user, account, container, name, until=None):
- """Delete/purge an object."""
+ def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+ """Move an object's data and metadata."""
- logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+ logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
+ if user != src_account:
+ raise NotAllowedError
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+ self._delete_object(user, src_account, src_container, src_name)
+ return dest_version_id
+
+ def _delete_object(self, user, account, container, name, until=None):
if user != account:
raise NotAllowedError
if until is not None:
path = '/'.join((account, container, name))
- sql = '''select version_id from versions where name = ? and tstamp <= ?'''
- c = self.con.execute(sql, (path, until))
- for v in [x[0] in c.fetchall()]:
- self._del_version(v)
+ node = self.node.node_lookup(path)
+ if node is None:
+ return
+ self.node.node_purge(node, until, CLUSTER_NORMAL)
+ self.node.node_purge(node, until, CLUSTER_HISTORY)
+ self.node.node_purge_children(node, until, CLUSTER_DELETED)
try:
- version_id = self._get_version(path)[0]
+ props = self._get_version(node)
except NameError:
pass
else:
self.permissions.access_clear(path)
return
- path = self._get_objectinfo(account, container, name)[0]
- self._put_version(path, user, 0, 1)
+ path, node = self._lookup_object(account, container, name)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
self.permissions.access_clear(path)
@backend_method
+ def delete_object(self, user, account, container, name, until=None):
+ """Delete/purge an object."""
+
+ logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+ self._delete_object(user, account, container, name, until)
+
+ @backend_method
def list_versions(self, user, account, container, name):
"""Return a list of all (version, version_timestamp) tuples for an object."""
logger.debug("list_versions: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
- # This will even show deleted versions.
- path = '/'.join((account, container, name))
- sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
- c = self.con.execute(sql, (path,))
- return [(int(x[0]), int(x[1])) for x in c.fetchall()]
+ path, node = self._lookup_object(account, container, name)
+ versions = self.node.node_get_versions(node)
+ return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
@backend_method(autocommit=0)
def get_block(self, hash):
@backend_method(autocommit=0)
def put_block(self, data):
- """Create a block and return the hash."""
+ """Store a block and return the hash."""
logger.debug("put_block: %s", len(data))
hashes, absent = self.blocker.block_stor((data,))
h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
return binascii.hexlify(h)
+ # Path functions.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- def _sql_until(self, until=None):
- """Return the sql to get the latest versions until the timestamp given."""
- if until is None:
- until = int(time.time())
- sql = '''select version_id, name, tstamp, size from versions v
- where version_id = (select max(version_id) from versions
- where v.name = name and tstamp <= %s)
- and hide = 0'''
- return sql % (until,)
-
- def _put_version(self, path, user, size=0, hide=0):
- tstamp = int(time.time())
- sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
- id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
- return str(id), tstamp
-
- def _get_versioninfo(self, account, container, name, until=None):
- """Return path, latest version, associated timestamp and size until the timestamp given."""
-
- p = (account, container, name)
- try:
- p = p[:p.index(None)]
- except ValueError:
- pass
- path = '/'.join(p)
+ def _put_object_node(self, path, parent, name):
+ path = '/'.join((path, name))
node = self.node.node_lookup(path)
- if node is not None:
- props = self.node.version_lookup(node, until, CLUSTER_NORMAL)
- # TODO: Do one lookup.
- if props is None and until is not None:
- props = self.node.version_lookup(node, until, CLUSTER_HISTORY)
- if props is not None:
- return path, props[SERIAL], props[MTIME], props[SIZE]
- raise NameError('Path does not exist')
-
- def _get_accountinfo(self, account, until=None):
- try:
- path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
- return version_id, mtime
- except:
- raise NameError('Account does not exist')
-
- def _get_containerinfo(self, account, container, until=None):
- try:
- path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
- return path, version_id, mtime
- except:
- raise NameError('Container does not exist')
-
- def _get_objectinfo(self, account, container, name, version=None):
- path = '/'.join((account, container, name))
- version_id, muser, mtime, size = self._get_version(path, version)
- return path, version_id, muser, mtime, size
-
- def _create_account(self, user, account):
- try:
- self._get_accountinfo(account)
- except NameError:
- self._put_version(account, user)
-
- def _check_policy(self, policy):
- for k in policy.keys():
- if policy[k] == '':
- policy[k] = self.default_policy.get(k)
- for k, v in policy.iteritems():
- if k == 'quota':
- q = int(v) # May raise ValueError.
- if q < 0:
- raise ValueError
- elif k == 'versioning':
- if v not in ['auto', 'manual', 'none']:
- raise ValueError
- else:
- raise ValueError
-
- def _list_limits(self, listing, marker, limit):
- start = 0
- if marker:
- try:
- start = listing.index(marker) + 1
- except ValueError:
- pass
- if not limit or limit > 10000:
- limit = 10000
- return start, limit
-
- def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
- cont_prefix = path + '/'
- if keys and len(keys) > 0:
- sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
- m.version_id = o.version_id and m.key in (%s)'''
- sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
- param = (cont_prefix + prefix + '%',) + tuple(keys)
- if allowed:
- sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
- param += tuple([x + '%' for x in allowed])
- sql += ' order by o.name'
- else:
- sql = 'select name, version_id from (%s) where name like ?'
- sql = sql % self._sql_until(until)
- param = (cont_prefix + prefix + '%',)
- if allowed:
- sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
- param += tuple([x + '%' for x in allowed])
- sql += ' order by name'
- c = self.con.execute(sql, param)
- objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
- if delimiter:
- pseudo_objects = []
- for x in objects:
- pseudo_name = x[0]
- i = pseudo_name.find(delimiter, len(prefix))
- if not virtual:
- # If the delimiter is not found, or the name ends
- # with the delimiter's first occurence.
- if i == -1 or len(pseudo_name) == i + len(delimiter):
- pseudo_objects.append(x)
- else:
- # If the delimiter is found, keep up to (and including) the delimiter.
- if i != -1:
- pseudo_name = pseudo_name[:i + len(delimiter)]
- if pseudo_name not in [y[0] for y in pseudo_objects]:
- if pseudo_name == x[0]:
- pseudo_objects.append(x)
- else:
- pseudo_objects.append((pseudo_name, None))
- objects = pseudo_objects
-
- start, limit = self._list_limits([x[0] for x in objects], marker, limit)
- return objects[start:start + limit]
-
- def _del_version(self, version):
- self.mapper.map_remv(version)
- sql = 'delete from versions where version_id = ?'
- self.con.execute(sql, (version,))
+ if node is None:
+ node = self.node.node_create(parent, path)
+ return path, node
- # Path functions.
+ def _put_path(self, user, parent, path):
+ node = self.node.node_create(parent, path)
+ self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
+ return node
def _lookup_account(self, account, create=True):
node = self.node.node_lookup(account)
if node is None and create:
- node = self.node.node_create(ROOTNODE, account)
- self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
- return node
+ node = self._put_path(account, self.ROOTNODE, account) # User is account.
+ return account, node
def _lookup_container(self, account, container):
- node = self.node.node_lookup('/'.join((account, container)))
+ path = '/'.join((account, container))
+ node = self.node.node_lookup(path)
if node is None:
raise NameError('Container does not exist')
- return node
+ return path, node
def _lookup_object(self, account, container, name):
- node = self.node.node_lookup('/'.join((account, container, name)))
+ path = '/'.join((account, container, name))
+ node = self.node.node_lookup(path)
if node is None:
raise NameError('Object does not exist')
- return node
+ return path, node
def _get_properties(self, node, until=None):
"""Return properties until the timestamp given."""
before = until if until is not None else inf
props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
- # TODO: Do one lookup.
if props is None and until is not None:
props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
if props is None:
raise NameError('Path does not exist')
return props
- def _get_statistics(self, node, path, until=None):
- """Return count, sum of size and latest timestamp of everything under node/path."""
+ def _get_statistics(self, node, until=None):
+ """Return count, sum of size and latest timestamp of everything under node."""
if until is None:
- return self.node.node_statistics(node, CLUSTER_NORMAL)
+ stats = self.node.statistics_get(node, CLUSTER_NORMAL)
else:
- return self.node.path_statistics(path + '/', until, CLUSTER_DELETED)
+ stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
+ if stats is None:
+ stats = (0, 0, 0)
+ return stats
def _get_version(self, node, version=None):
if version is None:
if props is None:
raise NameError('Object does not exist')
else:
+ try:
+ version = int(version)
+ except ValueError:
+ raise IndexError('Version does not exist')
props = self.node.version_get_properties(version)
- if props is None or props[CLUSTER] == CLUSTER_DELETE:
+ if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
raise IndexError('Version does not exist')
return props
- def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None):
- # Get source serial and size.
- if src_version is not None:
- src_props = self._get_version(src_node, src_version)
- src_version_id = src_props[SERIAL]
- size = src_props[SIZE]
- else:
- # Latest or create from scratch.
- try:
- src_props = self._get_version(src_node)
- src_version_id = src_props[SERIAL]
- size = src_props[SIZE]
- except NameError:
- src_version_id = None
- size = 0
- if not copy_data:
- size = 0
+ def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
+ """Create a new version of the node."""
- # Move the latest version at destination to CLUSTER_HISTORY and create new.
- if src_node == dest_node and src_version is None and src_version_id is not None:
- self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is not None:
+ src_version_id = props[self.SERIAL]
+ src_hash = props[self.HASH]
+ src_size = props[self.SIZE]
else:
- dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
- if dest_props is not None:
- self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
- dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, CLUSTER_NORMAL)
-
- # Copy meta and data.
- if copy_meta and src_version_id is not None:
- self.attribute_copy(src_version_id, dest_version_id)
- if copy_data and src_version_id is not None:
- hashmap = self.mapper.map_retr(src_version_id)
- self.mapper.map_stor(dest_version_id, hashmap)
-
+ src_version_id = None
+ src_hash = None
+ src_size = 0
+ if size is None:
+ hash = src_hash # This way hash can be set to None.
+ size = src_size
+
+ if src_version_id is not None:
+ self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+ dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
return src_version_id, dest_version_id
- def _get_metadata(self, version):
- if version is None:
- return {}
- return dict(self.node.attribute_get(version))
-
- def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+ def _put_metadata(self, user, node, meta, replace=False):
"""Create a new version and store metadata."""
- src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node)
+
+ # TODO: Merge with other functions that update metadata...
if not replace:
+ if src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
else:
self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+ return dest_version_id
+
+ def _list_limits(self, listing, marker, limit):
+ start = 0
+ if marker:
+ try:
+ start = listing.index(marker) + 1
+ except ValueError:
+ pass
+ if not limit or limit > 10000:
+ limit = 10000
+ return start, limit
+
+ def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+ cont_prefix = path + '/'
+ prefix = cont_prefix + prefix
+ start = cont_prefix + marker if marker else None
+ before = until if until is not None else inf
+ filterq = ','.join(keys) if keys else None
+
+ objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
+ objects.extend([(p, None) for p in prefixes] if virtual else [])
+ objects.sort(key=lambda x: x[0])
+ objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
+
+ start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+ return objects[start:start + limit]
+
+ # Policy functions.
+
+ def _check_policy(self, policy):
+ for k in policy.keys():
+ if policy[k] == '':
+ policy[k] = self.default_policy.get(k)
+ for k, v in policy.iteritems():
+ if k == 'quota':
+ q = int(v) # May raise ValueError.
+ if q < 0:
+ raise ValueError
+ elif k == 'versioning':
+ if v not in ['auto', 'manual', 'none']:
+ raise ValueError
+ else:
+ raise ValueError
+
+ def _put_policy(self, node, policy, replace):
+ if replace:
+ for k, v in self.default_policy.iteritems():
+ if k not in policy:
+ policy[k] = v
+ self.node.policy_set(node, policy)
+
+ def _get_policy(self, node):
+ policy = self.default_policy.copy()
+ policy.update(self.node.policy_get(node))
+ return policy
# Access control functions.
if user == account:
return True
path = '/'.join((account, container, name))
- if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
+ if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError
def _can_write(self, user, account, container, name):
if user == account:
return True
path = '/'.join((account, container, name))
- if not self.permissions.access_check(path, WRITE, user):
+ if not self.permissions.access_check(path, self.WRITE, user):
raise NotAllowedError
def _allowed_accounts(self, user):