From 44ad586024a6b14a0e3d1c8b3a5a06aa93656f8e Mon Sep 17 00:00:00 2001 From: Antony Chazapis Date: Tue, 2 Aug 2011 19:43:25 +0300 Subject: [PATCH] Modular backend progress. --- pithos/backends/lib/node.py | 218 ++++++++++++++++++-------------- pithos/backends/modular.py | 295 +++++++++++++++++++++++++------------------ 2 files changed, 296 insertions(+), 217 deletions(-) diff --git a/pithos/backends/lib/node.py b/pithos/backends/lib/node.py index 6e31e1b..ef9f555 100644 --- a/pithos/backends/lib/node.py +++ b/pithos/backends/lib/node.py @@ -38,7 +38,7 @@ from dbworker import DBWorker ROOTNODE = 0 -( SERIAL, NODE, SIZE, SOURCE, MTIME, CLUSTER ) = range(6) +( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7) inf = float('inf') @@ -87,13 +87,14 @@ _propnames = { 'size' : 2, 'source' : 3, 'mtime' : 4, - 'cluster' : 5, + 'muser' : 5, + 'cluster' : 6, } class Node(DBWorker): """Nodes store path organization. - Versions store obejct history. + Versions store object history. Attributes store metadata. """ @@ -105,7 +106,7 @@ class Node(DBWorker): execute(""" create table if not exists nodes ( node integer primary key, parent integer not null default 0, - path text not null default '' + path text not null default '', foreign key (parent) references nodes(node) on update cascade @@ -118,6 +119,7 @@ class Node(DBWorker): population integer not null default 0, size integer not null default 0, mtime integer, + muser text not null default '', cluster integer not null default 0, primary key (node, cluster) foreign key (node) @@ -154,6 +156,28 @@ class Node(DBWorker): q = "insert or ignore into nodes(node, parent) values (?, ?)" execute(q, (ROOTNODE, ROOTNODE)) + def node_create(self, parent, path): + """Create a new node from the given properties. + Return the node identifier of the new node. + """ + + q = ("insert into nodes (parent, path) " + "values (?, ?)") + props = (parent, path) + return self.execute(q, props).lastrowid + + def node_lookup(self, path): + """Lookup the current node of the given path. + Return None if the path is not found. + """ + + q = ("select node from nodes where path = ?") + self.execute(q, (path,)) + r = self.fetchone() + if r is not None: + return r[0] + return None + def node_update_ancestors(self, node, population, size, mtime, cluster=0): """Update the population properties of the given node. Population properties keep track the population and total @@ -202,18 +226,15 @@ class Node(DBWorker): return (0, 0, 0) return r - def version_create(self, node, size, source, cluster=0): - """Create a new version from the given properties. - Return the (serial, mtime) of the new version. - """ + def node_children(self, node): + """Return node's child count.""" - q = ("insert into nodes (node, size, source, mtime, cluster) " - "values (?, ?, ?, ?, ?)") - mtime = time() - props = (node, path, size, source, mtime, cluster) - serial = self.execute(q, props).lastrowid - self.node_update_ancestors(node, 1, size, mtime, cluster) - return serial, mtime + q = "select count(node) from nodes where parent = ?" + self.execute(q, (node,)) + r = fetchone() + if r is None: + return 0 + return r # def node_remove(self, serial, recursive=0): # """Remove the node specified by serial. @@ -236,14 +257,46 @@ class Node(DBWorker): # self.node_update_ancestors(parent, -pop-1, -size-popsize) # return True + def version_create(self, node, size, source, muser, cluster=0): + """Create a new version from the given properties. + Return the (serial, mtime) of the new version. + """ + + q = ("insert into nodes (node, size, source, mtime, muser, cluster) " + "values (?, ?, ?, ?, ?)") + mtime = time() + props = (node, path, size, source, mtime, muser, cluster) + serial = self.execute(q, props).lastrowid + self.node_update_ancestors(node, 1, size, mtime, cluster) + return serial, mtime + + def version_lookup(self, node, before=inf, cluster=0): + """Lookup the current version of the given node. + Return a list with its properties: + (serial, node, size, source, mtime, muser, cluster) + or None if the current version is not found in the given cluster. + """ + + q = ("select serial, node, size, source, mtime, muser, cluster " + "from versions " + "where serial = (select max(serial) " + "from versions " + "where node = ? and mtime < ?) " + "and cluster = ?") + self.execute(q, (node, before, cluster)) + props = self.fetchone() + if props is not None: + return props + return None + def version_get_properties(self, serial, keys=(), propnames=_propnames): """Return a sequence of values for the properties of the version specified by serial and the keys, in the order given. If keys is empty, return all properties in the order - (serial, node, size, source, mtime, cluster). + (serial, node, size, source, mtime, muser, cluster). """ - q = ("select serial, node, path, size, source, mtime, cluster " + q = ("select serial, node, path, size, source, mtime, muser, cluster " "from nodes " "where serial = ?") self.execute(q, (serial,)) @@ -273,45 +326,59 @@ class Node(DBWorker): # vals += (serial,) # self.execute(q, vals) - def version_lookup(self, node, before=inf, cluster=0): - """Lookup the current version of the given node. - Return a list with its properties: - (serial, node, size, source, mtime, cluster) - or None if the version is not found. - """ + def version_recluster(self, serial, cluster): + """Move the version into another cluster.""" - q = ("select serial, node, size, source, mtime, cluster " - "from nodes " - "where serial = (select max(serial) " - "from nodes " - "where node = ? and cluster = ? and mtime < ?)") - self.execute(q, (node, cluster, before)) - props = self.fetchone() - if props is not None: - return props - return None - - def node_lookup(self, path): - """Lookup the current node of the given path. - Return None if the path is not found. - """ + props = self.node_get_properties(source) + node = props[NODE] + size = props[SIZE] + mtime = props[MTIME] + oldcluster = props[CLUSTER] + if cluster == oldcluster: + return - q = ("select node from nodes where path = ?") - self.execute(q, (path,)) - r = self.fetchone() - if r is not None: - return r[0] - return None + self.node_update_ancestors(node, -1, -size, mtime, oldcluster) + self.node_update_ancestors(node, 1, size, mtime, cluster) + + q = "update nodes set parent = ?, path = ? where serial = ?" + self.execute(q, (parent, path, source)) - def node_create(self, parent, path): - """Create a new node from the given properties. - Return the node identifier of the new node. +# def version_copy(self, serial, node, muser, copy_attr=True): +# """Copy the version specified by serial into +# a new version of node. Optionally copy attributes. +# Return the (serial, mtime) of the new version. +# """ +# +# props = self.version_get_properties(serial) +# if props is None: +# return None +# size = props[SIZE] +# cluster = props[CLUSTER] +# new_serial, mtime = self.version_create(node, size, serial, muser, cluster) +# if copy_attr: +# self.attribute_copy(serial, new_serial) +# return (new_serial, mtime) + + def path_statistics(self, prefix, before=inf, except_cluster=0): + """Return population, total size and last mtime + for all latest versions under prefix that + do not belong to the cluster. """ - q = ("insert into nodes (parent, path) " - "values (?, ?)") - props = (parent, path) - return self.execute(q, props).lastrowid + q = ("select count(serial), sum(size), max(mtime) " + "from versions v " + "where serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < ?) " + "and cluster != ? " + "and node in (select node " + "from nodes " + "where path like ?") + self.execute(q, (before, except_cluster, prefix + '%')) + r = fetchone() + if r is None: + return (0, 0, 0) + return r def parse_filters(self, filterq): preterms = filterq.split(',') @@ -538,44 +605,9 @@ class Node(DBWorker): # "and path = ? and mtime between ? and ?") # execute(q, args) # return serials - -# def node_move(self, source, parent, path): -# """Move the source node into another path, -# possibly, in another parent's namespace. -# The node is moved with its namespace. -# """ -# props = self.node_get_properties(source) -# -# oldparent = props[PARENT] -# size = props[SIZE] -# population = props[POPULATION] -# popsize = props[POPSIZE] -# -# sizedelta = size + popsize -# popdelta = population + 1 -# node_update_ancestors = self.node_update_ancestors -# node_update_ancestors(oldparent, -popdelta, -sizedelta) -# node_update_ancestors(parent, popdelta, sizedelta) -# -# q = "update nodes set parent = ?, path = ? where serial = ?" -# self.execute(q, (parent, path, source)) - - def version_copy(self, serial, node=None, copy_attr=True): - """Copy the version specified by serial into - a new version of node. Optionally copy attributes. - Return the (serial, mtime) of the new version. - """ - - props = self.version_get_properties(serial) - size = props[SIZE] - cluster = props[CLUSTER] - new_serial, mtime = self.version_create(node, path, size, serial, cluster) - if copy_attr: - self.attr_copy(serial, new_serial) - return (new_serial, mtime) - def node_get_attributes(self, serial, keys=()): - """Return a list of (key, value) pairs of the node specified by serial. + def attribute_get(self, serial, keys=()): + """Return a list of (key, value) pairs of the version specified by serial. If keys is empty, return all attributes. Othwerise, return only those specified. """ @@ -591,8 +623,8 @@ class Node(DBWorker): execute(q, (serial,)) return self.fetchall() - def attr_set(self, serial, items): - """Set the attributes of the node specified by serial. + def attribute_set(self, serial, items): + """Set the attributes of the version specified by serial. Receive attributes as an iterable of (key, value) pairs. """ @@ -600,8 +632,8 @@ class Node(DBWorker): "values (?, ?, ?)") self.executemany(q, ((serial, k, v) for k, v in items)) - def attr_del(self, serial, keys=()): - """Delete attributes of the node specified by serial. + def attribute_del(self, serial, keys=()): + """Delete attributes of the version specified by serial. If keys is empty, delete all attributes. Otherwise delete those specified. """ @@ -623,7 +655,7 @@ class Node(DBWorker): # self.execute(q, (parent,)) # return [r[0] for r in self.fetchall()] - def attr_copy(self, source, dest): + def attribute_copy(self, source, dest): q = ("insert or replace into attributes " "select ?, key, value from attributes " "where serial = ?") diff --git a/pithos/backends/modular.py b/pithos/backends/modular.py index ea68234..94da72c 100644 --- a/pithos/backends/modular.py +++ b/pithos/backends/modular.py @@ -39,10 +39,15 @@ import hashlib import binascii from base import NotAllowedError, BaseBackend +from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER from lib.permissions import Permissions, READ, WRITE from lib.policy import Policy from lib.hashfiler import Mapper, Blocker +( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) + +inf = float('inf') + logger = logging.getLogger(__name__) @@ -86,29 +91,7 @@ class ModularBackend(BaseBackend): if not os.path.isdir(basepath): raise RuntimeError("Cannot open database at '%s'" % (db,)) - self.con = sqlite3.connect(basepath + '/db', check_same_thread=False) - - sql = '''pragma foreign_keys = on''' - self.con.execute(sql) - - sql = '''create table if not exists versions ( - version_id integer primary key, - name text, - user text, - tstamp integer not null, - size integer default 0, - hide integer default 0)''' - self.con.execute(sql) - sql = '''create table if not exists metadata ( - version_id integer, - key text, - value text, - primary key (version_id, key) - foreign key (version_id) references versions(version_id) - on delete cascade)''' - self.con.execute(sql) - - self.con.commit() + self.con = sqlite3.connect(basepath + '/db', check_same_thread=False) params = {'blocksize': self.block_size, 'blockpath': basepath + '/blocks', @@ -123,6 +106,9 @@ class ModularBackend(BaseBackend): 'cursor': self.con.cursor()} self.permissions = Permissions(**params) self.policy = Policy(**params) + self.node = Node(**params) + + self.con.commit() @backend_method def list_accounts(self, user, marker=None, limit=10000): @@ -137,41 +123,41 @@ class ModularBackend(BaseBackend): """Return a dictionary with the account metadata.""" logger.debug("get_account_meta: %s %s", account, until) + node = self._lookup_account(account, user == account) if user != account: - if until or account not in self._allowed_accounts(user): + if until or node is None or account not in self._allowed_accounts(user): raise NotAllowedError - else: - self._create_account(user, account) try: - version_id, mtime = self._get_accountinfo(account, until) + props = self._get_properties(node, until) + version_id = props[SERIAL] + mtime = props[MTIME] except NameError: # Account does not exist before until. version_id = None mtime = until - count, bytes, tstamp = self._get_pathstats(account, until) + object_count, bytes, tstamp = self._get_statistics(node, account, until) if mtime > tstamp: tstamp = mtime if until is None: modified = tstamp else: - modified = self._get_pathstats(account)[2] # Overall last modification + modified = self._get_statistics(node, account)[2] # Overall last modification. if mtime > modified: modified = mtime # Proper count. - sql = 'select count(name) from (%s) where name glob ? and not name glob ?' - sql = sql % self._sql_until(until) - c = self.con.execute(sql, (account + '/*', account + '/*/*')) - row = c.fetchone() - count = row[0] + count = self.node.node_children(node) + object_count -= count if user != account: meta = {'name': account} else: - meta = self._get_metadata(account, version_id) - meta.update({'name': account, 'count': count, 'bytes': bytes}) + meta = {} + if version_id is not None: + meta.update(dict(self.node.attribute_get(version_id))) if until is not None: meta.update({'until_timestamp': tstamp}) + meta.update({'name': account, 'count': count, 'bytes': bytes}) meta.update({'modified': modified}) return meta @@ -182,7 +168,8 @@ class ModularBackend(BaseBackend): logger.debug("update_account_meta: %s %s %s", account, meta, replace) if user != account: raise NotAllowedError - self._put_metadata(user, account, meta, replace, False) + node = self._lookup_account(account, True) + self._put_metadata(user, node, meta, replace, False) @backend_method def get_account_groups(self, user, account): @@ -193,7 +180,7 @@ class ModularBackend(BaseBackend): if account not in self._allowed_accounts(user): raise NotAllowedError return {} - self._create_account(user, account) + self._lookup_account(account, True) return self.permissions.group_dict(account) @backend_method @@ -203,7 +190,7 @@ class ModularBackend(BaseBackend): logger.debug("update_account_groups: %s %s %s", account, groups, replace) if user != account: raise NotAllowedError - self._create_account(user, account) + self._lookup_account(account, True) self._check_groups(groups) if replace: self.permissions.group_destroy(account) @@ -220,13 +207,23 @@ class ModularBackend(BaseBackend): logger.debug("put_account: %s", account) if user != account: raise NotAllowedError - try: - version_id, mtime = self._get_accountinfo(account) - except NameError: - pass - else: + node = self.node.node_lookup(account) + if node is not None: raise NameError('Account already exists') - self._put_version(account, user) + node = self.node.node_create(ROOTNODE, account) + self.node.version_create(node, 0, None, account, CLUSTER_NORMAL) + + + + + + + + + + + + @backend_method def delete_account(self, user, account): @@ -308,7 +305,7 @@ class ModularBackend(BaseBackend): raise NotAllowedError return {} path = self._get_containerinfo(account, container)[0] - return self._get_policy(path) + return self.policy.policy_get(path) @backend_method def update_container_policy(self, user, account, container, policy, replace=False): @@ -622,6 +619,22 @@ class ModularBackend(BaseBackend): h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),)) return binascii.hexlify(h) + + + + + + + + + + + + + + + + def _sql_until(self, until=None): """Return the sql to get the latest versions until the timestamp given.""" if until is None: @@ -632,63 +645,12 @@ class ModularBackend(BaseBackend): and hide = 0''' return sql % (until,) - def _get_pathstats(self, path, until=None): - """Return count and sum of size of everything under path and latest timestamp.""" - - sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?' - sql = sql % self._sql_until(until) - c = self.con.execute(sql, (path + '/%',)) - row = c.fetchone() - tstamp = row[2] if row[2] is not None else 0 - return int(row[0]), int(row[1]), int(tstamp) - - def _get_version(self, path, version=None): - if version is None: - sql = '''select version_id, user, tstamp, size, hide from versions where name = ? - order by version_id desc limit 1''' - c = self.con.execute(sql, (path,)) - row = c.fetchone() - if not row or int(row[4]): - raise NameError('Object does not exist') - else: - # The database (sqlite) will not complain if the version is not an integer. - sql = '''select version_id, user, tstamp, size from versions where name = ? - and version_id = ?''' - c = self.con.execute(sql, (path, version)) - row = c.fetchone() - if not row: - raise IndexError('Version does not exist') - return str(row[0]), str(row[1]), int(row[2]), int(row[3]) - def _put_version(self, path, user, size=0, hide=0): tstamp = int(time.time()) sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)' id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid return str(id), tstamp - def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None): - if src_version is not None: - src_version_id, muser, mtime, size = self._get_version(src_path, src_version) - else: - # Latest or create from scratch. - try: - src_version_id, muser, mtime, size = self._get_version(src_path) - except NameError: - src_version_id = None - size = 0 - if not copy_data: - size = 0 - dest_version_id = self._put_version(dest_path, user, size)[0] - if copy_meta and src_version_id is not None: - sql = 'insert into metadata select %s, key, value from metadata where version_id = ?' - sql = sql % dest_version_id - self.con.execute(sql, (src_version_id,)) - if copy_data and src_version_id is not None: - # TODO: Copy properly. - hashmap = self.mapper.map_retr(src_version_id) - self.mapper.map_stor(dest_version_id, hashmap) - return src_version_id, dest_version_id - def _get_versioninfo(self, account, container, name, until=None): """Return path, latest version, associated timestamp and size until the timestamp given.""" @@ -698,13 +660,15 @@ class ModularBackend(BaseBackend): except ValueError: pass path = '/'.join(p) - sql = '''select version_id, tstamp, size from (%s) where name = ?''' - sql = sql % self._sql_until(until) - c = self.con.execute(sql, (path,)) - row = c.fetchone() - if row is None: - raise NameError('Path does not exist') - return path, str(row[0]), int(row[1]), int(row[2]) + node = self.node.node_lookup(path) + if node is not None: + props = self.node.version_lookup(node, until, CLUSTER_NORMAL) + # TODO: Do one lookup. + if props is None and until is not None: + props = self.node.version_lookup(node, until, CLUSTER_HISTORY) + if props is not None: + return path, props[SERIAL], props[MTIME], props[SIZE] + raise NameError('Path does not exist') def _get_accountinfo(self, account, until=None): try: @@ -731,23 +695,6 @@ class ModularBackend(BaseBackend): except NameError: self._put_version(account, user) - def _get_metadata(self, path, version): - sql = 'select key, value from metadata where version_id = ?' - c = self.con.execute(sql, (version,)) - return dict(c.fetchall()) - - def _put_metadata(self, user, path, meta, replace=False, copy_data=True): - """Create a new version and store metadata.""" - - src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data) - for k, v in meta.iteritems(): - if not replace and v == '': - sql = 'delete from metadata where version_id = ? and key = ?' - self.con.execute(sql, (dest_version_id, k)) - else: - sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)' - self.con.execute(sql, (dest_version_id, k, v)) - def _check_policy(self, policy): for k in policy.keys(): if policy[k] == '': @@ -763,9 +710,6 @@ class ModularBackend(BaseBackend): else: raise ValueError - def _get_policy(self, path): - return self.policy.policy_get(path) - def _list_limits(self, listing, marker, limit): start = 0 if marker: @@ -827,6 +771,109 @@ class ModularBackend(BaseBackend): sql = 'delete from versions where version_id = ?' self.con.execute(sql, (version,)) + # Path functions. + + def _lookup_account(self, account, create=True): + node = self.node.node_lookup(account) + if node is None and create: + node = self.node.node_create(ROOTNODE, account) + self.node.version_create(node, 0, None, account, CLUSTER_NORMAL) + return node + + def _lookup_container(self, account, container): + node = self.node.node_lookup('/'.join((account, container))) + if node is None: + raise NameError('Container does not exist') + return node + + def _lookup_object(self, account, container, name): + node = self.node.node_lookup('/'.join((account, container, name))) + if node is None: + raise NameError('Object does not exist') + return node + + def _get_properties(self, node, until=None): + """Return properties until the timestamp given.""" + + before = until if until is not None else inf + props = self.node.version_lookup(node, before, CLUSTER_NORMAL) + # TODO: Do one lookup. + if props is None and until is not None: + props = self.node.version_lookup(node, before, CLUSTER_HISTORY) + if props is None: + raise NameError('Path does not exist') + return props + + def _get_statistics(self, node, path, until=None): + """Return count, sum of size and latest timestamp of everything under node/path.""" + + if until is None: + return self.node.node_statistics(node, CLUSTER_NORMAL) + else: + return self.node.path_statistics(path + '/', until, CLUSTER_DELETED) + + def _get_version(self, node, version=None): + if version is None: + props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) + if props is None: + raise NameError('Object does not exist') + else: + props = self.node.version_get_properties(version) + if props is None or props[CLUSTER] == CLUSTER_DELETE: + raise IndexError('Version does not exist') + return props + + def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None): + # Get source serial and size. + if src_version is not None: + src_props = self._get_version(src_node, src_version) + src_version_id = src_props[SERIAL] + size = src_props[SIZE] + else: + # Latest or create from scratch. + try: + src_props = self._get_version(src_node) + src_version_id = src_props[SERIAL] + size = src_props[SIZE] + except NameError: + src_version_id = None + size = 0 + if not copy_data: + size = 0 + + # Move the latest version at destination to CLUSTER_HISTORY and create new. + if src_node == dest_node and src_version is None and src_version_id is not None: + self.node.version_recluster(src_version_id, CLUSTER_HISTORY) + else: + dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL) + if dest_props is not None: + self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY) + dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, CLUSTER_NORMAL) + + # Copy meta and data. + if copy_meta and src_version_id is not None: + self.attribute_copy(src_version_id, dest_version_id) + if copy_data and src_version_id is not None: + hashmap = self.mapper.map_retr(src_version_id) + self.mapper.map_stor(dest_version_id, hashmap) + + return src_version_id, dest_version_id + + def _get_metadata(self, version): + if version is None: + return {} + return dict(self.node.attribute_get(version)) + + def _put_metadata(self, user, node, meta, replace=False, copy_data=True): + """Create a new version and store metadata.""" + + src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data) + if not replace: + self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == '')) + self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != '')) + else: + self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems())) + # Access control functions. def _check_groups(self, groups): -- 1.7.10.4