ROOTNODE = 0
-( SERIAL, NODE, SIZE, SOURCE, MTIME, CLUSTER ) = range(6)
+( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
inf = float('inf')
'size' : 2,
'source' : 3,
'mtime' : 4,
- 'cluster' : 5,
+ 'muser' : 5,
+ 'cluster' : 6,
}
class Node(DBWorker):
"""Nodes store path organization.
- Versions store obejct history.
+ Versions store object history.
Attributes store metadata.
"""
execute(""" create table if not exists nodes
( node integer primary key,
parent integer not null default 0,
- path text not null default ''
+ path text not null default '',
foreign key (parent)
references nodes(node)
on update cascade
population integer not null default 0,
size integer not null default 0,
mtime integer,
+ muser text not null default '',
cluster integer not null default 0,
primary key (node, cluster)
foreign key (node)
q = "insert or ignore into nodes(node, parent) values (?, ?)"
execute(q, (ROOTNODE, ROOTNODE))
+ def node_create(self, parent, path):
+ """Create a new node from the given properties.
+ Return the node identifier of the new node.
+ """
+
+ q = ("insert into nodes (parent, path) "
+ "values (?, ?)")
+ props = (parent, path)
+ return self.execute(q, props).lastrowid
+
+ def node_lookup(self, path):
+ """Lookup the current node of the given path.
+ Return None if the path is not found.
+ """
+
+ q = ("select node from nodes where path = ?")
+ self.execute(q, (path,))
+ r = self.fetchone()
+ if r is not None:
+ return r[0]
+ return None
+
def node_update_ancestors(self, node, population, size, mtime, cluster=0):
"""Update the population properties of the given node.
Population properties keep track the population and total
return (0, 0, 0)
return r
- def version_create(self, node, size, source, cluster=0):
- """Create a new version from the given properties.
- Return the (serial, mtime) of the new version.
- """
+ def node_children(self, node):
+ """Return node's child count."""
- q = ("insert into nodes (node, size, source, mtime, cluster) "
- "values (?, ?, ?, ?, ?)")
- mtime = time()
- props = (node, path, size, source, mtime, cluster)
- serial = self.execute(q, props).lastrowid
- self.node_update_ancestors(node, 1, size, mtime, cluster)
- return serial, mtime
+ q = "select count(node) from nodes where parent = ?"
+ self.execute(q, (node,))
+ r = fetchone()
+ if r is None:
+ return 0
+ return r
# def node_remove(self, serial, recursive=0):
# """Remove the node specified by serial.
# self.node_update_ancestors(parent, -pop-1, -size-popsize)
# return True
+ def version_create(self, node, size, source, muser, cluster=0):
+ """Create a new version from the given properties.
+ Return the (serial, mtime) of the new version.
+ """
+
+ q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
+ "values (?, ?, ?, ?, ?)")
+ mtime = time()
+ props = (node, path, size, source, mtime, muser, cluster)
+ serial = self.execute(q, props).lastrowid
+ self.node_update_ancestors(node, 1, size, mtime, cluster)
+ return serial, mtime
+
+ def version_lookup(self, node, before=inf, cluster=0):
+ """Lookup the current version of the given node.
+ Return a list with its properties:
+ (serial, node, size, source, mtime, muser, cluster)
+ or None if the current version is not found in the given cluster.
+ """
+
+ q = ("select serial, node, size, source, mtime, muser, cluster "
+ "from versions "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = ? and mtime < ?) "
+ "and cluster = ?")
+ self.execute(q, (node, before, cluster))
+ props = self.fetchone()
+ if props is not None:
+ return props
+ return None
+
def version_get_properties(self, serial, keys=(), propnames=_propnames):
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
- (serial, node, size, source, mtime, cluster).
+ (serial, node, size, source, mtime, muser, cluster).
"""
- q = ("select serial, node, path, size, source, mtime, cluster "
+ q = ("select serial, node, path, size, source, mtime, muser, cluster "
"from nodes "
"where serial = ?")
self.execute(q, (serial,))
# vals += (serial,)
# self.execute(q, vals)
- def version_lookup(self, node, before=inf, cluster=0):
- """Lookup the current version of the given node.
- Return a list with its properties:
- (serial, node, size, source, mtime, cluster)
- or None if the version is not found.
- """
+ def version_recluster(self, serial, cluster):
+ """Move the version into another cluster."""
- q = ("select serial, node, size, source, mtime, cluster "
- "from nodes "
- "where serial = (select max(serial) "
- "from nodes "
- "where node = ? and cluster = ? and mtime < ?)")
- self.execute(q, (node, cluster, before))
- props = self.fetchone()
- if props is not None:
- return props
- return None
-
- def node_lookup(self, path):
- """Lookup the current node of the given path.
- Return None if the path is not found.
- """
+ props = self.node_get_properties(source)
+ node = props[NODE]
+ size = props[SIZE]
+ mtime = props[MTIME]
+ oldcluster = props[CLUSTER]
+ if cluster == oldcluster:
+ return
- q = ("select node from nodes where path = ?")
- self.execute(q, (path,))
- r = self.fetchone()
- if r is not None:
- return r[0]
- return None
+ self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
+ self.node_update_ancestors(node, 1, size, mtime, cluster)
+
+ q = "update nodes set parent = ?, path = ? where serial = ?"
+ self.execute(q, (parent, path, source))
- def node_create(self, parent, path):
- """Create a new node from the given properties.
- Return the node identifier of the new node.
+# def version_copy(self, serial, node, muser, copy_attr=True):
+# """Copy the version specified by serial into
+# a new version of node. Optionally copy attributes.
+# Return the (serial, mtime) of the new version.
+# """
+#
+# props = self.version_get_properties(serial)
+# if props is None:
+# return None
+# size = props[SIZE]
+# cluster = props[CLUSTER]
+# new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
+# if copy_attr:
+# self.attribute_copy(serial, new_serial)
+# return (new_serial, mtime)
+
+ def path_statistics(self, prefix, before=inf, except_cluster=0):
+ """Return population, total size and last mtime
+ for all latest versions under prefix that
+ do not belong to the cluster.
"""
- q = ("insert into nodes (parent, path) "
- "values (?, ?)")
- props = (parent, path)
- return self.execute(q, props).lastrowid
+ q = ("select count(serial), sum(size), max(mtime) "
+ "from versions v "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) "
+ "and cluster != ? "
+ "and node in (select node "
+ "from nodes "
+ "where path like ?")
+ self.execute(q, (before, except_cluster, prefix + '%'))
+ r = fetchone()
+ if r is None:
+ return (0, 0, 0)
+ return r
def parse_filters(self, filterq):
preterms = filterq.split(',')
# "and path = ? and mtime between ? and ?")
# execute(q, args)
# return serials
-
-# def node_move(self, source, parent, path):
-# """Move the source node into another path,
-# possibly, in another parent's namespace.
-# The node is moved with its namespace.
-# """
-# props = self.node_get_properties(source)
-#
-# oldparent = props[PARENT]
-# size = props[SIZE]
-# population = props[POPULATION]
-# popsize = props[POPSIZE]
-#
-# sizedelta = size + popsize
-# popdelta = population + 1
-# node_update_ancestors = self.node_update_ancestors
-# node_update_ancestors(oldparent, -popdelta, -sizedelta)
-# node_update_ancestors(parent, popdelta, sizedelta)
-#
-# q = "update nodes set parent = ?, path = ? where serial = ?"
-# self.execute(q, (parent, path, source))
-
- def version_copy(self, serial, node=None, copy_attr=True):
- """Copy the version specified by serial into
- a new version of node. Optionally copy attributes.
- Return the (serial, mtime) of the new version.
- """
-
- props = self.version_get_properties(serial)
- size = props[SIZE]
- cluster = props[CLUSTER]
- new_serial, mtime = self.version_create(node, path, size, serial, cluster)
- if copy_attr:
- self.attr_copy(serial, new_serial)
- return (new_serial, mtime)
- def node_get_attributes(self, serial, keys=()):
- """Return a list of (key, value) pairs of the node specified by serial.
+ def attribute_get(self, serial, keys=()):
+ """Return a list of (key, value) pairs of the version specified by serial.
If keys is empty, return all attributes.
Othwerise, return only those specified.
"""
execute(q, (serial,))
return self.fetchall()
- def attr_set(self, serial, items):
- """Set the attributes of the node specified by serial.
+ def attribute_set(self, serial, items):
+ """Set the attributes of the version specified by serial.
Receive attributes as an iterable of (key, value) pairs.
"""
"values (?, ?, ?)")
self.executemany(q, ((serial, k, v) for k, v in items))
- def attr_del(self, serial, keys=()):
- """Delete attributes of the node specified by serial.
+ def attribute_del(self, serial, keys=()):
+ """Delete attributes of the version specified by serial.
If keys is empty, delete all attributes.
Otherwise delete those specified.
"""
# self.execute(q, (parent,))
# return [r[0] for r in self.fetchall()]
- def attr_copy(self, source, dest):
+ def attribute_copy(self, source, dest):
q = ("insert or replace into attributes "
"select ?, key, value from attributes "
"where serial = ?")
import binascii
from base import NotAllowedError, BaseBackend
+from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
from lib.permissions import Permissions, READ, WRITE
from lib.policy import Policy
from lib.hashfiler import Mapper, Blocker
+( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
+
+inf = float('inf')
+
logger = logging.getLogger(__name__)
if not os.path.isdir(basepath):
raise RuntimeError("Cannot open database at '%s'" % (db,))
- self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
-
- sql = '''pragma foreign_keys = on'''
- self.con.execute(sql)
-
- sql = '''create table if not exists versions (
- version_id integer primary key,
- name text,
- user text,
- tstamp integer not null,
- size integer default 0,
- hide integer default 0)'''
- self.con.execute(sql)
- sql = '''create table if not exists metadata (
- version_id integer,
- key text,
- value text,
- primary key (version_id, key)
- foreign key (version_id) references versions(version_id)
- on delete cascade)'''
- self.con.execute(sql)
-
- self.con.commit()
+ self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
params = {'blocksize': self.block_size,
'blockpath': basepath + '/blocks',
'cursor': self.con.cursor()}
self.permissions = Permissions(**params)
self.policy = Policy(**params)
+ self.node = Node(**params)
+
+ self.con.commit()
@backend_method
def list_accounts(self, user, marker=None, limit=10000):
"""Return a dictionary with the account metadata."""
logger.debug("get_account_meta: %s %s", account, until)
+ node = self._lookup_account(account, user == account)
if user != account:
- if until or account not in self._allowed_accounts(user):
+ if until or node is None or account not in self._allowed_accounts(user):
raise NotAllowedError
- else:
- self._create_account(user, account)
try:
- version_id, mtime = self._get_accountinfo(account, until)
+ props = self._get_properties(node, until)
+ version_id = props[SERIAL]
+ mtime = props[MTIME]
except NameError:
# Account does not exist before until.
version_id = None
mtime = until
- count, bytes, tstamp = self._get_pathstats(account, until)
+ object_count, bytes, tstamp = self._get_statistics(node, account, until)
if mtime > tstamp:
tstamp = mtime
if until is None:
modified = tstamp
else:
- modified = self._get_pathstats(account)[2] # Overall last modification
+ modified = self._get_statistics(node, account)[2] # Overall last modification.
if mtime > modified:
modified = mtime
# Proper count.
- sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
- sql = sql % self._sql_until(until)
- c = self.con.execute(sql, (account + '/*', account + '/*/*'))
- row = c.fetchone()
- count = row[0]
+ count = self.node.node_children(node)
+ object_count -= count
if user != account:
meta = {'name': account}
else:
- meta = self._get_metadata(account, version_id)
- meta.update({'name': account, 'count': count, 'bytes': bytes})
+ meta = {}
+ if version_id is not None:
+ meta.update(dict(self.node.attribute_get(version_id)))
if until is not None:
meta.update({'until_timestamp': tstamp})
+ meta.update({'name': account, 'count': count, 'bytes': bytes})
meta.update({'modified': modified})
return meta
logger.debug("update_account_meta: %s %s %s", account, meta, replace)
if user != account:
raise NotAllowedError
- self._put_metadata(user, account, meta, replace, False)
+ node = self._lookup_account(account, True)
+ self._put_metadata(user, node, meta, replace, False)
@backend_method
def get_account_groups(self, user, account):
if account not in self._allowed_accounts(user):
raise NotAllowedError
return {}
- self._create_account(user, account)
+ self._lookup_account(account, True)
return self.permissions.group_dict(account)
@backend_method
logger.debug("update_account_groups: %s %s %s", account, groups, replace)
if user != account:
raise NotAllowedError
- self._create_account(user, account)
+ self._lookup_account(account, True)
self._check_groups(groups)
if replace:
self.permissions.group_destroy(account)
logger.debug("put_account: %s", account)
if user != account:
raise NotAllowedError
- try:
- version_id, mtime = self._get_accountinfo(account)
- except NameError:
- pass
- else:
+ node = self.node.node_lookup(account)
+ if node is not None:
raise NameError('Account already exists')
- self._put_version(account, user)
+ node = self.node.node_create(ROOTNODE, account)
+ self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
+
+
+
+
+
+
+
+
+
+
+
+
@backend_method
def delete_account(self, user, account):
raise NotAllowedError
return {}
path = self._get_containerinfo(account, container)[0]
- return self._get_policy(path)
+ return self.policy.policy_get(path)
@backend_method
def update_container_policy(self, user, account, container, policy, replace=False):
h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
return binascii.hexlify(h)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
def _sql_until(self, until=None):
"""Return the sql to get the latest versions until the timestamp given."""
if until is None:
and hide = 0'''
return sql % (until,)
- def _get_pathstats(self, path, until=None):
- """Return count and sum of size of everything under path and latest timestamp."""
-
- sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
- sql = sql % self._sql_until(until)
- c = self.con.execute(sql, (path + '/%',))
- row = c.fetchone()
- tstamp = row[2] if row[2] is not None else 0
- return int(row[0]), int(row[1]), int(tstamp)
-
- def _get_version(self, path, version=None):
- if version is None:
- sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
- order by version_id desc limit 1'''
- c = self.con.execute(sql, (path,))
- row = c.fetchone()
- if not row or int(row[4]):
- raise NameError('Object does not exist')
- else:
- # The database (sqlite) will not complain if the version is not an integer.
- sql = '''select version_id, user, tstamp, size from versions where name = ?
- and version_id = ?'''
- c = self.con.execute(sql, (path, version))
- row = c.fetchone()
- if not row:
- raise IndexError('Version does not exist')
- return str(row[0]), str(row[1]), int(row[2]), int(row[3])
-
def _put_version(self, path, user, size=0, hide=0):
tstamp = int(time.time())
sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
return str(id), tstamp
- def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
- if src_version is not None:
- src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
- else:
- # Latest or create from scratch.
- try:
- src_version_id, muser, mtime, size = self._get_version(src_path)
- except NameError:
- src_version_id = None
- size = 0
- if not copy_data:
- size = 0
- dest_version_id = self._put_version(dest_path, user, size)[0]
- if copy_meta and src_version_id is not None:
- sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
- sql = sql % dest_version_id
- self.con.execute(sql, (src_version_id,))
- if copy_data and src_version_id is not None:
- # TODO: Copy properly.
- hashmap = self.mapper.map_retr(src_version_id)
- self.mapper.map_stor(dest_version_id, hashmap)
- return src_version_id, dest_version_id
-
def _get_versioninfo(self, account, container, name, until=None):
"""Return path, latest version, associated timestamp and size until the timestamp given."""
except ValueError:
pass
path = '/'.join(p)
- sql = '''select version_id, tstamp, size from (%s) where name = ?'''
- sql = sql % self._sql_until(until)
- c = self.con.execute(sql, (path,))
- row = c.fetchone()
- if row is None:
- raise NameError('Path does not exist')
- return path, str(row[0]), int(row[1]), int(row[2])
+ node = self.node.node_lookup(path)
+ if node is not None:
+ props = self.node.version_lookup(node, until, CLUSTER_NORMAL)
+ # TODO: Do one lookup.
+ if props is None and until is not None:
+ props = self.node.version_lookup(node, until, CLUSTER_HISTORY)
+ if props is not None:
+ return path, props[SERIAL], props[MTIME], props[SIZE]
+ raise NameError('Path does not exist')
def _get_accountinfo(self, account, until=None):
try:
except NameError:
self._put_version(account, user)
- def _get_metadata(self, path, version):
- sql = 'select key, value from metadata where version_id = ?'
- c = self.con.execute(sql, (version,))
- return dict(c.fetchall())
-
- def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
- """Create a new version and store metadata."""
-
- src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
- for k, v in meta.iteritems():
- if not replace and v == '':
- sql = 'delete from metadata where version_id = ? and key = ?'
- self.con.execute(sql, (dest_version_id, k))
- else:
- sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
- self.con.execute(sql, (dest_version_id, k, v))
-
def _check_policy(self, policy):
for k in policy.keys():
if policy[k] == '':
else:
raise ValueError
- def _get_policy(self, path):
- return self.policy.policy_get(path)
-
def _list_limits(self, listing, marker, limit):
start = 0
if marker:
sql = 'delete from versions where version_id = ?'
self.con.execute(sql, (version,))
+ # Path functions.
+
+ def _lookup_account(self, account, create=True):
+ node = self.node.node_lookup(account)
+ if node is None and create:
+ node = self.node.node_create(ROOTNODE, account)
+ self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
+ return node
+
+ def _lookup_container(self, account, container):
+ node = self.node.node_lookup('/'.join((account, container)))
+ if node is None:
+ raise NameError('Container does not exist')
+ return node
+
+ def _lookup_object(self, account, container, name):
+ node = self.node.node_lookup('/'.join((account, container, name)))
+ if node is None:
+ raise NameError('Object does not exist')
+ return node
+
+ def _get_properties(self, node, until=None):
+ """Return properties until the timestamp given."""
+
+ before = until if until is not None else inf
+ props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
+ # TODO: Do one lookup.
+ if props is None and until is not None:
+ props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
+ if props is None:
+ raise NameError('Path does not exist')
+ return props
+
+ def _get_statistics(self, node, path, until=None):
+ """Return count, sum of size and latest timestamp of everything under node/path."""
+
+ if until is None:
+ return self.node.node_statistics(node, CLUSTER_NORMAL)
+ else:
+ return self.node.path_statistics(path + '/', until, CLUSTER_DELETED)
+
+ def _get_version(self, node, version=None):
+ if version is None:
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is None:
+ raise NameError('Object does not exist')
+ else:
+ props = self.node.version_get_properties(version)
+ if props is None or props[CLUSTER] == CLUSTER_DELETE:
+ raise IndexError('Version does not exist')
+ return props
+
+ def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None):
+ # Get source serial and size.
+ if src_version is not None:
+ src_props = self._get_version(src_node, src_version)
+ src_version_id = src_props[SERIAL]
+ size = src_props[SIZE]
+ else:
+ # Latest or create from scratch.
+ try:
+ src_props = self._get_version(src_node)
+ src_version_id = src_props[SERIAL]
+ size = src_props[SIZE]
+ except NameError:
+ src_version_id = None
+ size = 0
+ if not copy_data:
+ size = 0
+
+ # Move the latest version at destination to CLUSTER_HISTORY and create new.
+ if src_node == dest_node and src_version is None and src_version_id is not None:
+ self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+ else:
+ dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
+ if dest_props is not None:
+ self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
+ dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, CLUSTER_NORMAL)
+
+ # Copy meta and data.
+ if copy_meta and src_version_id is not None:
+ self.attribute_copy(src_version_id, dest_version_id)
+ if copy_data and src_version_id is not None:
+ hashmap = self.mapper.map_retr(src_version_id)
+ self.mapper.map_stor(dest_version_id, hashmap)
+
+ return src_version_id, dest_version_id
+
+ def _get_metadata(self, version):
+ if version is None:
+ return {}
+ return dict(self.node.attribute_get(version))
+
+ def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+ """Create a new version and store metadata."""
+
+ src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data)
+ if not replace:
+ self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
+ else:
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+
# Access control functions.
def _check_groups(self, groups):