From: Sofia Papagiannaki Date: Thu, 11 Aug 2011 14:39:20 +0000 (+0300) Subject: Merge branch 'master' of https://code.grnet.gr/git/pithos X-Git-Tag: pithos/v0.7.8~122^2 X-Git-Url: https://code.grnet.gr/git/pithos/commitdiff_plain/d8539f8e73d8b2b722e2b6f463e00aea588a84ce?hp=62b697f3adf9a777bd1e6d29a043256e237a71f8 Merge branch 'master' of https://code.grnet.gr/git/pithos --- diff --git a/pithos/backends/lib/node.py b/pithos/backends/lib/node.py index 8e7a7c4..2bad014 100644 --- a/pithos/backends/lib/node.py +++ b/pithos/backends/lib/node.py @@ -139,10 +139,10 @@ class Node(DBWorker): on update cascade on delete cascade ) """) execute(""" create index if not exists idx_versions_node - on nodes(node) """) + on versions(node) """) # TODO: Sort out if more indexes are needed. # execute(""" create index if not exists idx_versions_mtime - # on nodes(mtime) """) + # on versions(mtime) """) execute(""" create table if not exists attributes ( serial integer, diff --git a/pithos/backends/lib_alchemy/__init__.py b/pithos/backends/lib_alchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pithos/backends/lib_alchemy/dbworker.py b/pithos/backends/lib_alchemy/dbworker.py new file mode 100644 index 0000000..414163e --- /dev/null +++ b/pithos/backends/lib_alchemy/dbworker.py @@ -0,0 +1,41 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + + +class DBWorker(object): + """Database connection handler.""" + + def __init__(self, **params): + self.params = params + self.conn = params['connection'] + self.engine = params['engine'] diff --git a/pithos/backends/lib_alchemy/groups.py b/pithos/backends/lib_alchemy/groups.py new file mode 100644 index 0000000..19d2270 --- /dev/null +++ b/pithos/backends/lib_alchemy/groups.py @@ -0,0 +1,148 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from collections import defaultdict +from sqlalchemy import Table, Column, String, MetaData +from sqlalchemy.sql import select, and_ +from sqlalchemy.schema import Index +from dbworker import DBWorker + +class Groups(DBWorker): + """Groups are named collections of members, belonging to an owner.""" + + def __init__(self, **params): + DBWorker.__init__(self, **params) + metadata = MetaData() + columns=[] + columns.append(Column('owner', String(255), primary_key=True)) + columns.append(Column('name', String(255), primary_key=True)) + columns.append(Column('member', String(255), primary_key=True)) + self.groups = Table('groups', metadata, *columns) + + # place an index on member + Index('idx_groups_member', self.groups.c.member) + + metadata.create_all(self.engine) + metadata.bind = self.engine + + def group_names(self, owner): + """List all group names belonging to owner.""" + + s = select([self.groups.c.name], + self.groups.c.owner==owner).distinct() + r = self.conn.execute(s) + l = [row[0] for row in r.fetchall()] + r.close() + return l + + def group_dict(self, owner): + """Return a dict mapping group names to member lists for owner.""" + + s = select([self.groups.c.name, self.groups.c.member], + self.groups.c.owner==owner) + r = self.conn.execute(s) + d = defaultdict(list) + for group, member in r.fetchall(): + d[group].append(member) + r.close() + return d + + def group_add(self, owner, group, member): + """Add a member to a group.""" + + s = self.groups.insert() + r = self.conn.execute(s, owner=owner, name=group, member=member) + r.close() + + def group_addmany(self, owner, group, members): + """Add members to a group.""" + + s = self.groups.insert() + values = [{'owner':owner, 'name':group, 'member':member} for member in members] + r = self.conn.execute(s, values) + r.close() + + def group_remove(self, owner, group, member): + """Remove a member from a group.""" + + s = self.groups.delete().where(and_(self.groups.c.owner==owner, + self.groups.c.name==group, + self.groups.c.member==member)) + r = self.conn.execute(s) + r.close() + + def group_delete(self, owner, group): + """Delete a group.""" + + s = self.groups.delete().where(and_(self.groups.c.owner==owner, + self.groups.c.name==group)) + r = self.conn.execute(s) + r.close() + + def group_destroy(self, owner): + """Delete all groups belonging to owner.""" + + s = self.groups.delete().where(self.groups.c.owner==owner) + r = self.conn.execute(s) + r.close() + + def group_members(self, owner, group): + """Return the list of members of a group.""" + + s = select([self.groups.c.member], and_(self.groups.c.owner==owner, + self.groups.c.name==group)) + r = self.conn.execute(s) + l = [row[0] for row in r.fetchall()] + r.close() + return l + + def group_check(self, owner, group, member): + """Check if a member is in a group.""" + + s = select([self.groups.c.member], and_(self.groups.c.owner==owner, + self.groups.c.name==group, + self.groups.c.member==member)) + r = self.conn.execute(s) + l = r.fetchone() + r.close() + return bool(l) + + def group_parents(self, member): + """Return all (owner, group) tuples that contain member.""" + + s = select([self.groups.c.owner, self.groups.c.name], + self.groups.c.member==member) + r = self.conn.execute(s) + l = r.fetchall() + r.close() + return l diff --git a/pithos/backends/lib_alchemy/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/__init__.py new file mode 100644 index 0000000..aa3b929 --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/__init__.py @@ -0,0 +1,36 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from blocker import Blocker +from mapper import Mapper + diff --git a/pithos/backends/lib_alchemy/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/blocker.py new file mode 100644 index 0000000..47293bc --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/blocker.py @@ -0,0 +1,211 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os import makedirs +from os.path import isdir, realpath, exists, join +from hashlib import new as newhasher +from binascii import hexlify + +from context_file import ContextFile, file_sync_read_chunks + + +class Blocker(object): + """Blocker. + Required contstructor parameters: blocksize, blockpath, hashtype. + """ + + blocksize = None + blockpath = None + hashtype = None + + def __init__(self, **params): + blocksize = params['blocksize'] + blockpath = params['blockpath'] + blockpath = realpath(blockpath) + if not isdir(blockpath): + if not exists(blockpath): + makedirs(blockpath) + else: + raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,)) + + hashtype = params['hashtype'] + try: + hasher = newhasher(hashtype) + except ValueError: + msg = "Variable hashtype '%s' is not available from hashlib" + raise ValueError(msg % (hashtype,)) + + hasher.update("") + emptyhash = hasher.digest() + + self.blocksize = blocksize + self.blockpath = blockpath + self.hashtype = hashtype + self.hashlen = len(emptyhash) + self.emptyhash = emptyhash + + def get_rear_block(self, blkhash, create=0): + filename = hexlify(blkhash) + dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) + if not exists(dir): + makedirs(dir) + name = join(dir, filename) + return ContextFile(name, create) + + def check_rear_block(self, blkhash): + filename = hexlify(blkhash) + dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) + name = join(dir, filename) + return exists(name) + + def block_hash(self, data): + """Hash a block of data""" + hasher = newhasher(self.hashtype) + hasher.update(data.rstrip('\x00')) + return hasher.digest() + + def block_ping(self, hashes): + """Check hashes for existence and + return those missing from block storage. + """ + missing = [] + append = missing.append + for i, h in enumerate(hashes): + if not self.check_rear_block(h): + append(i) + return missing + + def block_retr(self, hashes): + """Retrieve blocks from storage by their hashes.""" + blocksize = self.blocksize + blocks = [] + append = blocks.append + block = None + + for h in hashes: + with self.get_rear_block(h, 0) as rbl: + if not rbl: + break + for block in rbl.sync_read_chunks(blocksize, 1, 0): + break # there should be just one block there + if not block: + break + append(block) + + return blocks + + def block_stor(self, blocklist): + """Store a bunch of blocks and return (hashes, missing). + Hashes is a list of the hashes of the blocks, + missing is a list of indices in that list indicating + which blocks were missing from the store. + """ + block_hash = self.block_hash + hashlist = [block_hash(b) for b in blocklist] + mf = None + missing = self.block_ping(hashlist) + for i in missing: + with self.get_rear_block(hashlist[i], 1) as rbl: + rbl.sync_write(blocklist[i]) #XXX: verify? + + return hashlist, missing + + def block_delta(self, blkhash, offdata=()): + """Construct and store a new block from a given block + and a list of (offset, data) 'patches'. Return: + (the hash of the new block, if the block already existed) + """ + if not offdata: + return None, None + + blocksize = self.blocksize + block = self.block_retr((blkhash,)) + if not block: + return None, None + + block = block[0] + newblock = '' + idx = 0 + size = 0 + trunc = 0 + for off, data in offdata: + if not data: + trunc = 1 + break + newblock += block[idx:off] + data + size += off - idx + len(data) + if size >= blocksize: + break + off = size + + if not trunc: + newblock += block[size:len(block)] + + h, a = self.block_stor((newblock,)) + return h[0], 1 if a else 0 + + def block_hash_file(self, openfile): + """Return the list of hashes (hashes map) + for the blocks in a buffered file. + Helper method, does not affect store. + """ + hashes = [] + append = hashes.append + block_hash = self.block_hash + + for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0): + append(block_hash(block)) + + return hashes + + def block_stor_file(self, openfile): + """Read blocks from buffered file object and store them. Return: + (bytes read, list of hashes, list of hashes that were missing) + """ + blocksize = self.blocksize + block_stor = self.block_stor + hashlist = [] + hextend = hashlist.extend + storedlist = [] + sextend = storedlist.extend + lastsize = 0 + + for block in file_sync_read_chunks(openfile, blocksize, 1, 0): + hl, sl = block_stor((block,)) + hextend(hl) + sextend(sl) + lastsize = len(block) + + size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0 + return size, hashlist, storedlist + diff --git a/pithos/backends/lib_alchemy/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/context_file.py new file mode 100644 index 0000000..0a16b2a --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/context_file.py @@ -0,0 +1,191 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os import SEEK_CUR, SEEK_SET, fsync +from errno import ENOENT + + +_zeros = '' + + +def zeros(nr): + global _zeros + size = len(_zeros) + if nr == size: + return _zeros + + if nr > size: + _zeros += '\0' * (nr - size) + return _zeros + + if nr < size: + _zeros = _zeros[:nr] + return _zeros + + +def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None): + """Write given chunks to the given buffered file object. + Writes never span across chunk boundaries. + If size is given stop after or pad until size bytes have been written. + """ + fwrite = openfile.write + seek = openfile.seek + padding = 0 + + try: + seek(offset * chunksize) + except IOError, e: + seek = None + for x in xrange(offset): + fwrite(zeros(chunksize)) + + cursize = offset * chunksize + + for chunk in chunks: + if padding: + if seek: + seek(padding -1, SEEK_CUR) + fwrite("\x00") + else: + fwrite(buffer(zeros(chunksize), 0, padding)) + if size is not None and cursize + chunksize >= size: + chunk = chunk[:chunksize - (cursize - size)] + fwrite(chunk) + cursize += len(chunk) + break + fwrite(chunk) + padding = chunksize - len(chunk) + + padding = size - cursize if size is not None else 0 + if padding <= 0: + return + + q, r = divmod(padding, chunksize) + for x in xrange(q): + fwrite(zeros(chunksize)) + fwrite(buffer(zeros(chunksize), 0, r)) + + +def file_sync_read_chunks(openfile, chunksize, nr, offset=0): + """Read and yield groups of chunks from a buffered file object at offset. + Reads never span accros chunksize boundaries. + """ + fread = openfile.read + remains = offset * chunksize + seek = openfile.seek + try: + seek(remains) + except IOError, e: + seek = None + while 1: + s = fread(remains) + remains -= len(s) + if remains <= 0: + break + + while nr: + remains = chunksize + chunk = '' + while 1: + s = fread(remains) + if not s: + if chunk: + yield chunk + return + chunk += s + remains -= len(s) + if remains <= 0: + break + yield chunk + nr -= 1 + + +class ContextFile(object): + __slots__ = ("name", "fdesc", "create") + + def __init__(self, name, create=0): + self.name = name + self.fdesc = None + self.create = create + #self.dirty = 0 + + def __enter__(self): + name = self.name + try: + fdesc = open(name, 'rb+') + except IOError, e: + if not self.create or e.errno != ENOENT: + raise + fdesc = open(name, 'w+') + + self.fdesc = fdesc + return self + + def __exit__(self, exc, arg, trace): + fdesc = self.fdesc + if fdesc is not None: + #if self.dirty: + # fsync(fdesc.fileno()) + fdesc.close() + return False # propagate exceptions + + def seek(self, offset, whence=SEEK_SET): + return self.fdesc.seek(offset, whence) + + def tell(self): + return self.fdesc.tell() + + def truncate(self, size): + self.fdesc.truncate(size) + + def sync_write(self, data): + #self.dirty = 1 + self.fdesc.write(data) + + def sync_write_chunks(self, chunksize, offset, chunks, size=None): + #self.dirty = 1 + return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size) + + def sync_read(self, size): + read = self.fdesc.read + data = '' + while 1: + s = read(size) + if not s: + break + data += s + return data + + def sync_read_chunks(self, chunksize, nr, offset=0): + return file_sync_read_chunks(self.fdesc, chunksize, nr, offset) + diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py new file mode 100644 index 0000000..d3cae88 --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py @@ -0,0 +1,38 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from blocker import Blocker +from mapper import Mapper + +__all__ = ["Blocker", "Mapper"] + diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py new file mode 100644 index 0000000..59fd31f --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py @@ -0,0 +1,211 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os import makedirs +from os.path import isdir, realpath, exists, join +from hashlib import new as newhasher +from binascii import hexlify + +from context_file import ContextFile, file_sync_read_chunks + + +class Blocker(object): + """Blocker. + Required contstructor parameters: blocksize, blockpath, hashtype. + """ + + blocksize = None + blockpath = None + hashtype = None + + def __init__(self, **params): + blocksize = params['blocksize'] + blockpath = params['blockpath'] + blockpath = realpath(blockpath) + if not isdir(blockpath): + if not exists(blockpath): + makedirs(blockpath) + else: + raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,)) + + hashtype = params['hashtype'] + try: + hasher = newhasher(hashtype) + except ValueError: + msg = "Variable hashtype '%s' is not available from hashlib" + raise ValueError(msg % (hashtype,)) + + hasher.update("") + emptyhash = hasher.digest() + + self.blocksize = blocksize + self.blockpath = blockpath + self.hashtype = hashtype + self.hashlen = len(emptyhash) + self.emptyhash = emptyhash + + def _get_rear_block(self, blkhash, create=0): + filename = hexlify(blkhash) + dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) + if not exists(dir): + makedirs(dir) + name = join(dir, filename) + return ContextFile(name, create) + + def _check_rear_block(self, blkhash): + filename = hexlify(blkhash) + dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) + name = join(dir, filename) + return exists(name) + + def block_hash(self, data): + """Hash a block of data""" + hasher = newhasher(self.hashtype) + hasher.update(data.rstrip('\x00')) + return hasher.digest() + + def block_ping(self, hashes): + """Check hashes for existence and + return those missing from block storage. + """ + missing = [] + append = missing.append + for i, h in enumerate(hashes): + if not self._check_rear_block(h): + append(i) + return missing + + def block_retr(self, hashes): + """Retrieve blocks from storage by their hashes.""" + blocksize = self.blocksize + blocks = [] + append = blocks.append + block = None + + for h in hashes: + with self._get_rear_block(h, 0) as rbl: + if not rbl: + break + for block in rbl.sync_read_chunks(blocksize, 1, 0): + break # there should be just one block there + if not block: + break + append(block) + + return blocks + + def block_stor(self, blocklist): + """Store a bunch of blocks and return (hashes, missing). + Hashes is a list of the hashes of the blocks, + missing is a list of indices in that list indicating + which blocks were missing from the store. + """ + block_hash = self.block_hash + hashlist = [block_hash(b) for b in blocklist] + mf = None + missing = self.block_ping(hashlist) + for i in missing: + with self._get_rear_block(hashlist[i], 1) as rbl: + rbl.sync_write(blocklist[i]) #XXX: verify? + + return hashlist, missing + + def block_delta(self, blkhash, offdata=()): + """Construct and store a new block from a given block + and a list of (offset, data) 'patches'. Return: + (the hash of the new block, if the block already existed) + """ + if not offdata: + return None, None + + blocksize = self.blocksize + block = self.block_retr((blkhash,)) + if not block: + return None, None + + block = block[0] + newblock = '' + idx = 0 + size = 0 + trunc = 0 + for off, data in offdata: + if not data: + trunc = 1 + break + newblock += block[idx:off] + data + size += off - idx + len(data) + if size >= blocksize: + break + off = size + + if not trunc: + newblock += block[size:len(block)] + + h, a = self.block_stor((newblock,)) + return h[0], 1 if a else 0 + + def block_hash_file(self, openfile): + """Return the list of hashes (hashes map) + for the blocks in a buffered file. + Helper method, does not affect store. + """ + hashes = [] + append = hashes.append + block_hash = self.block_hash + + for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0): + append(block_hash(block)) + + return hashes + + def block_stor_file(self, openfile): + """Read blocks from buffered file object and store them. Return: + (bytes read, list of hashes, list of hashes that were missing) + """ + blocksize = self.blocksize + block_stor = self.block_stor + hashlist = [] + hextend = hashlist.extend + storedlist = [] + sextend = storedlist.extend + lastsize = 0 + + for block in file_sync_read_chunks(openfile, blocksize, 1, 0): + hl, sl = block_stor((block,)) + hextend(hl) + sextend(sl) + lastsize = len(block) + + size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0 + return size, hashlist, storedlist + diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py new file mode 100644 index 0000000..0a16b2a --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py @@ -0,0 +1,191 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os import SEEK_CUR, SEEK_SET, fsync +from errno import ENOENT + + +_zeros = '' + + +def zeros(nr): + global _zeros + size = len(_zeros) + if nr == size: + return _zeros + + if nr > size: + _zeros += '\0' * (nr - size) + return _zeros + + if nr < size: + _zeros = _zeros[:nr] + return _zeros + + +def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None): + """Write given chunks to the given buffered file object. + Writes never span across chunk boundaries. + If size is given stop after or pad until size bytes have been written. + """ + fwrite = openfile.write + seek = openfile.seek + padding = 0 + + try: + seek(offset * chunksize) + except IOError, e: + seek = None + for x in xrange(offset): + fwrite(zeros(chunksize)) + + cursize = offset * chunksize + + for chunk in chunks: + if padding: + if seek: + seek(padding -1, SEEK_CUR) + fwrite("\x00") + else: + fwrite(buffer(zeros(chunksize), 0, padding)) + if size is not None and cursize + chunksize >= size: + chunk = chunk[:chunksize - (cursize - size)] + fwrite(chunk) + cursize += len(chunk) + break + fwrite(chunk) + padding = chunksize - len(chunk) + + padding = size - cursize if size is not None else 0 + if padding <= 0: + return + + q, r = divmod(padding, chunksize) + for x in xrange(q): + fwrite(zeros(chunksize)) + fwrite(buffer(zeros(chunksize), 0, r)) + + +def file_sync_read_chunks(openfile, chunksize, nr, offset=0): + """Read and yield groups of chunks from a buffered file object at offset. + Reads never span accros chunksize boundaries. + """ + fread = openfile.read + remains = offset * chunksize + seek = openfile.seek + try: + seek(remains) + except IOError, e: + seek = None + while 1: + s = fread(remains) + remains -= len(s) + if remains <= 0: + break + + while nr: + remains = chunksize + chunk = '' + while 1: + s = fread(remains) + if not s: + if chunk: + yield chunk + return + chunk += s + remains -= len(s) + if remains <= 0: + break + yield chunk + nr -= 1 + + +class ContextFile(object): + __slots__ = ("name", "fdesc", "create") + + def __init__(self, name, create=0): + self.name = name + self.fdesc = None + self.create = create + #self.dirty = 0 + + def __enter__(self): + name = self.name + try: + fdesc = open(name, 'rb+') + except IOError, e: + if not self.create or e.errno != ENOENT: + raise + fdesc = open(name, 'w+') + + self.fdesc = fdesc + return self + + def __exit__(self, exc, arg, trace): + fdesc = self.fdesc + if fdesc is not None: + #if self.dirty: + # fsync(fdesc.fileno()) + fdesc.close() + return False # propagate exceptions + + def seek(self, offset, whence=SEEK_SET): + return self.fdesc.seek(offset, whence) + + def tell(self): + return self.fdesc.tell() + + def truncate(self, size): + self.fdesc.truncate(size) + + def sync_write(self, data): + #self.dirty = 1 + self.fdesc.write(data) + + def sync_write_chunks(self, chunksize, offset, chunks, size=None): + #self.dirty = 1 + return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size) + + def sync_read(self, size): + read = self.fdesc.read + data = '' + while 1: + s = read(size) + if not s: + break + data += s + return data + + def sync_read_chunks(self, chunksize, nr, offset=0): + return file_sync_read_chunks(self.fdesc, chunksize, nr, offset) + diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py new file mode 100644 index 0000000..d9caaee --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py @@ -0,0 +1,102 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os.path import realpath, join, exists, isdir +from os import makedirs, unlink +from errno import ENOENT + +from context_file import ContextFile + + +class Mapper(object): + """Mapper. + Required contstructor parameters: mappath, namelen. + """ + + mappath = None + namelen = None + + def __init__(self, **params): + self.params = params + self.namelen = params['namelen'] + mappath = realpath(params['mappath']) + if not isdir(mappath): + if not exists(mappath): + makedirs(mappath) + else: + raise ValueError("Variable mappath '%s' is not a directory" % (mappath,)) + self.mappath = mappath + + def _get_rear_map(self, name, create=0): + name = join(self.mappath, hex(int(name))) + return ContextFile(name, create) + + def _delete_rear_map(self, name): + name = join(self.mappath, hex(int(name))) + try: + unlink(name) + return 1 + except OSError, e: + if e.errno != ENOENT: + raise + return 0 + + def map_retr(self, name, blkoff=0, nr=100000000000000): + """Return as a list, part of the hashes map of an object + at the given block offset. + By default, return the whole hashes map. + """ + namelen = self.namelen + hashes = () + + with self._get_rear_map(name, 0) as rmap: + if rmap: + hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff)) + return hashes + + def map_stor(self, name, hashes=(), blkoff=0, create=1): + """Store hashes in the given hashes map, replacing the old ones.""" + namelen = self.namelen + with self._get_rear_map(name, 1) as rmap: + rmap.sync_write_chunks(namelen, blkoff, hashes, None) + +# def map_copy(self, src, dst): +# """Copy a hashes map to another one, replacing it.""" +# with self._get_rear_map(src, 0) as rmap: +# if rmap: +# rmap.copy_to(dst) + + def map_remv(self, name): + """Remove a hashes map. Returns true if the map was found and removed.""" + return self._delete_rear_map(name) + diff --git a/pithos/backends/lib_alchemy/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/mapper.py new file mode 100644 index 0000000..825bc9c --- /dev/null +++ b/pithos/backends/lib_alchemy/hashfiler/mapper.py @@ -0,0 +1,102 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from os.path import realpath, join, exists, isdir +from os import makedirs, unlink +from errno import ENOENT + +from context_file import ContextFile + + +class Mapper(object): + """Mapper. + Required contstructor parameters: mappath, namelen. + """ + + mappath = None + namelen = None + + def __init__(self, **params): + self.params = params + self.namelen = params['namelen'] + mappath = realpath(params['mappath']) + if not isdir(mappath): + if not exists(mappath): + makedirs(mappath) + else: + raise ValueError("Variable mappath '%s' is not a directory" % (mappath,)) + self.mappath = mappath + + def get_rear_map(self, name, create=0): + name = join(self.mappath, hex(int(name))) + return ContextFile(name, create) + + def delete_rear_map(self, name): + name = join(self.mappath, hex(int(name))) + try: + unlink(name) + return 1 + except OSError, e: + if e.errno != ENOENT: + raise + return 0 + + def map_retr(self, name, blkoff=0, nr=100000000000000): + """Return as a list, part of the hashes map of an object + at the given block offset. + By default, return the whole hashes map. + """ + namelen = self.namelen + hashes = () + + with self.get_rear_map(name, 0) as rmap: + if rmap: + hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff)) + return hashes + + def map_stor(self, name, hashes=(), blkoff=0, create=1): + """Store hashes in the given hashes map, replacing the old ones.""" + namelen = self.namelen + with self.get_rear_map(name, 1) as rmap: + rmap.sync_write_chunks(namelen, blkoff, hashes, None) + +# def map_copy(self, src, dst): +# """Copy a hashes map to another one, replacing it.""" +# with self.get_rear_map(src, 0) as rmap: +# if rmap: +# rmap.copy_to(dst) + + def map_remv(self, name): + """Remove a hashes map. Returns true if the map was found and removed.""" + return self.delete_rear_map(name) + diff --git a/pithos/backends/lib_alchemy/node.py b/pithos/backends/lib_alchemy/node.py new file mode 100644 index 0000000..3deac74 --- /dev/null +++ b/pithos/backends/lib_alchemy/node.py @@ -0,0 +1,759 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from time import time +from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey +from sqlalchemy.schema import Index + +from dbworker import DBWorker + + +ROOTNODE = 0 + +( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7) + +inf = float('inf') + + +def strnextling(prefix): + """Return the first unicode string + greater than but not starting with given prefix. + strnextling('hello') -> 'hellp' + """ + if not prefix: + ## all strings start with the null string, + ## therefore we have to approximate strnextling('') + ## with the last unicode character supported by python + ## 0x10ffff for wide (32-bit unicode) python builds + ## 0x00ffff for narrow (16-bit unicode) python builds + ## We will not autodetect. 0xffff is safe enough. + return unichr(0xffff) + s = prefix[:-1] + c = ord(prefix[-1]) + if c >= 0xffff: + raise RuntimeError + s += unichr(c+1) + return s + +def strprevling(prefix): + """Return an approximation of the last unicode string + less than but not starting with given prefix. + strprevling(u'hello') -> u'helln\\xffff' + """ + if not prefix: + ## There is no prevling for the null string + return prefix + s = prefix[:-1] + c = ord(prefix[-1]) + if c > 0: + s += unichr(c-1) + unichr(0xffff) + return s + + +_propnames = { + 'serial' : 0, + 'node' : 1, + 'size' : 2, + 'source' : 3, + 'mtime' : 4, + 'muser' : 5, + 'cluster' : 6, +} + + +class Node(DBWorker): + """Nodes store path organization and have multiple versions. + Versions store object history and have multiple attributes. + Attributes store metadata. + """ + + # TODO: Provide an interface for included and excluded clusters. + + def __init__(self, **params): + DBWorker.__init__(self, **params) + metadata = MetaData() + + #create nodes table + columns=[] + columns.append(Column('node', Integer, primary_key=True)) + columns.append(Column('parent', Integer, + ForeignKey('nodes.node', + ondelete='CASCADE', + onupdate='CASCADE'), + autoincrement=False, default=0)) + #columns.append(Column('path', String(2048), default='', nullable=False)) + columns.append(Column('path', String(255), default='', nullable=False)) + self.nodes = Table('nodes', metadata, *columns) + # place an index on path + Index('idx_nodes_path', self.nodes.c.path, unique=True) + + #create statistics table + columns=[] + columns.append(Column('node', Integer, + ForeignKey('nodes.node', + ondelete='CASCADE', + onupdate='CASCADE'), + primary_key=True)) + columns.append(Column('population', Integer, nullable=False, + autoincrement=False, default=0)) + columns.append(Column('size', Integer, nullable=False, + autoincrement=False, default=0)) + columns.append(Column('mtime', Integer, autoincrement=False)) + columns.append(Column('cluster', Integer, nullable=False, + autoincrement=False, default=0, primary_key=True)) + self.statistics = Table('statistics', metadata, *columns) + + #create versions table + columns=[] + columns.append(Column('serial', Integer, autoincrement=False, + primary_key=True)) + columns.append(Column('node', Integer, + ForeignKey('nodes.node', + ondelete='CASCADE', + onupdate='CASCADE'), + autoincrement=False)) + columns.append(Column('size', Integer, nullable=False, + autoincrement=False, default=0)) + columns.append(Column('source', Integer, autoincrement=False)) + columns.append(Column('mtime', Integer, autoincrement=False)) + columns.append(Column('muser', String(255), nullable=False, default='')) + columns.append(Column('cluster', Integer, nullable=False, + autoincrement=False, default=0)) + self.versions = Table('versions', metadata, *columns) + # place an index on node + Index('idx_versions_node', self.versions.c.mtime) + # TODO: Sort out if more indexes are needed. + #Index('idx_versions_node', self.versions.c.node) + + #create attributes table + columns = [] + columns.append(Column('serial', Integer, + ForeignKey('versions.serial', + ondelete='CASCADE', + onupdate='CASCADE'), + autoincrement=False, + primary_key=True)) + columns.append(Column('key', String(255), primary_key=True)) + columns.append(Column('value', String(255))) + self.attributes = Table('attributes', metadata, *columns) + + metadata.create_all(self.engine) + + s = self.nodes.insert(node=ROOTNODE, parent=ROOTNODE) + self.conn.execute(s) + + def node_create(self, parent, path): + """Create a new node from the given properties. + Return the node identifier of the new node. + """ + + q = ("insert into nodes (parent, path) " + "values (?, ?)") + props = (parent, path) + return self.execute(q, props).lastrowid + + def node_lookup(self, path): + """Lookup the current node of the given path. + Return None if the path is not found. + """ + + q = "select node from nodes where path = ?" + self.execute(q, (path,)) + r = self.fetchone() + if r is not None: + return r[0] + return None + + def node_get_properties(self, node): + """Return the node's (parent, path). + Return None if the node is not found. + """ + + q = "select parent, path from nodes where node = ?" + self.execute(q, (node,)) + return self.fetchone() + + def node_get_versions(self, node, keys=(), propnames=_propnames): + """Return the properties of all versions at node. + If keys is empty, return all properties in the order + (serial, node, size, source, mtime, muser, cluster). + """ + + q = ("select serial, node, size, source, mtime, muser, cluster " + "from versions " + "where node = ?") + self.execute(q, (node,)) + r = self.fetchall() + if r is None: + return r + + if not keys: + return r + return [[p[propnames[k]] for k in keys if k in propnames] for p in r] + + def node_count_children(self, node): + """Return node's child count.""" + + q = "select count(node) from nodes where parent = ? and node != 0" + self.execute(q, (node,)) + r = self.fetchone() + if r is None: + return 0 + return r[0] + + def node_purge_children(self, parent, before=inf, cluster=0): + """Delete all versions with the specified + parent and cluster, and return + the serials of versions deleted. + Clears out nodes with no remaining versions. + """ + + execute = self.execute + q = ("select count(serial), sum(size) from versions " + "where node in (select node " + "from nodes " + "where parent = ?) " + "and cluster = ? " + "and mtime <= ?") + args = (parent, cluster, before) + execute(q, args) + nr, size = self.fetchone() + if not nr: + return () + mtime = time() + self.statistics_update(parent, -nr, -size, mtime, cluster) + self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster) + + q = ("select serial from versions " + "where node in (select node " + "from nodes " + "where parent = ?) " + "and cluster = ? " + "and mtime <= ?") + execute(q, args) + serials = [r[SERIAL] for r in self.fetchall()] + q = ("delete from versions " + "where node in (select node " + "from nodes " + "where parent = ?) " + "and cluster = ? " + "and mtime <= ?") + execute(q, args) + q = ("delete from nodes " + "where node in (select node from nodes n " + "where (select count(serial) " + "from versions " + "where node = n.node) = 0 " + "and parent = ?)") + execute(q, (parent,)) + return serials + + def node_purge(self, node, before=inf, cluster=0): + """Delete all versions with the specified + node and cluster, and return + the serials of versions deleted. + Clears out the node if it has no remaining versions. + """ + + execute = self.execute + q = ("select count(serial), sum(size) from versions " + "where node = ? " + "and cluster = ? " + "and mtime <= ?") + args = (node, cluster, before) + execute(q, args) + nr, size = self.fetchone() + if not nr: + return () + mtime = time() + self.statistics_update_ancestors(node, -nr, -size, mtime, cluster) + + q = ("select serial from versions " + "where node = ? " + "and cluster = ? " + "and mtime <= ?") + execute(q, args) + serials = [r[SERIAL] for r in self.fetchall()] + q = ("delete from versions " + "where node = ? " + "and cluster = ? " + "and mtime <= ?") + execute(q, args) + q = ("delete from nodes " + "where node in (select node from nodes n " + "where (select count(serial) " + "from versions " + "where node = n.node) = 0 " + "and node = ?)") + execute(q, (node,)) + return serials + + def node_remove(self, node): + """Remove the node specified. + Return false if the node has children or is not found. + """ + + if self.node_count_children(node): + return False + + mtime = time() + q = ("select count(serial), sum(size), cluster " + "from versions " + "where node = ? " + "group by cluster") + self.execute(q, (node,)) + for population, size, cluster in self.fetchall(): + self.statistics_update_ancestors(node, -population, -size, mtime, cluster) + + q = "delete from nodes where node = ?" + self.execute(q, (node,)) + return True + + def statistics_get(self, node, cluster=0): + """Return population, total size and last mtime + for all versions under node that belong to the cluster. + """ + + q = ("select population, size, mtime from statistics " + "where node = ? and cluster = ?") + self.execute(q, (node, cluster)) + return self.fetchone() + + def statistics_update(self, node, population, size, mtime, cluster=0): + """Update the statistics of the given node. + Statistics keep track the population, total + size of objects and mtime in the node's namespace. + May be zero or positive or negative numbers. + """ + + qs = ("select population, size from statistics " + "where node = ? and cluster = ?") + qu = ("insert or replace into statistics (node, population, size, mtime, cluster) " + "values (?, ?, ?, ?, ?)") + self.execute(qs, (node, cluster)) + r = self.fetchone() + if r is None: + prepopulation, presize = (0, 0) + else: + prepopulation, presize = r + population += prepopulation + size += presize + self.execute(qu, (node, population, size, mtime, cluster)) + + def statistics_update_ancestors(self, node, population, size, mtime, cluster=0): + """Update the statistics of the given node's parent. + Then recursively update all parents up to the root. + Population is not recursive. + """ + + while True: + if node == 0: + break + props = self.node_get_properties(node) + if props is None: + break + parent, path = props + self.statistics_update(parent, population, size, mtime, cluster) + node = parent + population = 0 # Population isn't recursive + + def statistics_latest(self, node, before=inf, except_cluster=0): + """Return population, total size and last mtime + for all latest versions under node that + do not belong to the cluster. + """ + + execute = self.execute + fetchone = self.fetchone + + # The node. + props = self.node_get_properties(node) + if props is None: + return None + parent, path = props + + # The latest version. + q = ("select serial, node, size, source, mtime, muser, cluster " + "from versions " + "where serial = (select max(serial) " + "from versions " + "where node = ? and mtime < ?) " + "and cluster != ?") + execute(q, (node, before, except_cluster)) + props = fetchone() + if props is None: + return None + mtime = props[MTIME] + + # First level, just under node (get population). + q = ("select count(serial), sum(size), max(mtime) " + "from versions v " + "where serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < ?) " + "and cluster != ? " + "and node in (select node " + "from nodes " + "where parent = ?)") + execute(q, (before, except_cluster, node)) + r = fetchone() + if r is None: + return None + count = r[0] + mtime = max(mtime, r[2]) + if count == 0: + return (0, 0, mtime) + + # All children (get size and mtime). + # XXX: This is why the full path is stored. + q = ("select count(serial), sum(size), max(mtime) " + "from versions v " + "where serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < ?) " + "and cluster != ? " + "and node in (select node " + "from nodes " + "where path like ?)") + execute(q, (before, except_cluster, path + '%')) + r = fetchone() + if r is None: + return None + size = r[1] - props[SIZE] + mtime = max(mtime, r[2]) + return (count, size, mtime) + + def version_create(self, node, size, source, muser, cluster=0): + """Create a new version from the given properties. + Return the (serial, mtime) of the new version. + """ + + q = ("insert into versions (node, size, source, mtime, muser, cluster) " + "values (?, ?, ?, ?, ?, ?)") + mtime = time() + props = (node, size, source, mtime, muser, cluster) + serial = self.execute(q, props).lastrowid + self.statistics_update_ancestors(node, 1, size, mtime, cluster) + return serial, mtime + + def version_lookup(self, node, before=inf, cluster=0): + """Lookup the current version of the given node. + Return a list with its properties: + (serial, node, size, source, mtime, muser, cluster) + or None if the current version is not found in the given cluster. + """ + + q = ("select serial, node, size, source, mtime, muser, cluster " + "from versions " + "where serial = (select max(serial) " + "from versions " + "where node = ? and mtime < ?) " + "and cluster = ?") + self.execute(q, (node, before, cluster)) + props = self.fetchone() + if props is not None: + return props + return None + + def version_get_properties(self, serial, keys=(), propnames=_propnames): + """Return a sequence of values for the properties of + the version specified by serial and the keys, in the order given. + If keys is empty, return all properties in the order + (serial, node, size, source, mtime, muser, cluster). + """ + + q = ("select serial, node, size, source, mtime, muser, cluster " + "from versions " + "where serial = ?") + self.execute(q, (serial,)) + r = self.fetchone() + if r is None: + return r + + if not keys: + return r + return [r[propnames[k]] for k in keys if k in propnames] + + def version_recluster(self, serial, cluster): + """Move the version into another cluster.""" + + props = self.version_get_properties(serial) + if not props: + return + node = props[NODE] + size = props[SIZE] + oldcluster = props[CLUSTER] + if cluster == oldcluster: + return + + mtime = time() + self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster) + self.statistics_update_ancestors(node, 1, size, mtime, cluster) + + q = "update versions set cluster = ? where serial = ?" + self.execute(q, (cluster, serial)) + + def version_remove(self, serial): + """Remove the serial specified.""" + + props = self.node_get_properties(serial) + if not props: + return + node = props[NODE] + size = props[SIZE] + cluster = props[CLUSTER] + + mtime = time() + self.statistics_update_ancestors(node, -1, -size, mtime, cluster) + + q = "delete from versions where serial = ?" + self.execute(q, (serial,)) + return True + + def attribute_get(self, serial, keys=()): + """Return a list of (key, value) pairs of the version specified by serial. + If keys is empty, return all attributes. + Othwerise, return only those specified. + """ + + execute = self.execute + if keys: + marks = ','.join('?' for k in keys) + q = ("select key, value from attributes " + "where key in (%s) and serial = ?" % (marks,)) + execute(q, keys + (serial,)) + else: + q = "select key, value from attributes where serial = ?" + execute(q, (serial,)) + return self.fetchall() + + def attribute_set(self, serial, items): + """Set the attributes of the version specified by serial. + Receive attributes as an iterable of (key, value) pairs. + """ + + q = ("insert or replace into attributes (serial, key, value) " + "values (?, ?, ?)") + self.executemany(q, ((serial, k, v) for k, v in items)) + + def attribute_del(self, serial, keys=()): + """Delete attributes of the version specified by serial. + If keys is empty, delete all attributes. + Otherwise delete those specified. + """ + + if keys: + q = "delete from attributes where serial = ? and key = ?" + self.executemany(q, ((serial, key) for key in keys)) + else: + q = "delete from attributes where serial = ?" + self.execute(q, (serial,)) + + def attribute_copy(self, source, dest): + q = ("insert or replace into attributes " + "select ?, key, value from attributes " + "where serial = ?") + self.execute(q, (dest, source)) + + def _construct_filters(self, filterq): + if not filterq: + return None, None + + args = filterq.split(',') + subq = " and a.key in (" + subq += ','.join(('?' for x in args)) + subq += ")" + + return subq, args + + def _construct_paths(self, pathq): + if not pathq: + return None, None + + subq = " and (" + subq += ' or '.join(('n.path like ?' for x in pathq)) + subq += ")" + args = tuple([x + '%' for x in pathq]) + + return subq, args + + def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]): + """Return a list with all keys pairs defined + for all latest versions under parent that + do not belong to the cluster. + """ + + # TODO: Use another table to store before=inf results. + q = ("select distinct a.key " + "from attributes a, versions v, nodes n " + "where v.serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < ?) " + "and v.cluster != ? " + "and v.node in (select node " + "from nodes " + "where parent = ?) " + "and a.serial = v.serial " + "and n.node = v.node") + args = (before, except_cluster, parent) + subq, subargs = self._construct_paths(pathq) + if subq is not None: + q += subq + args += subargs + self.execute(q, args) + return [r[0] for r in self.fetchall()] + + def latest_version_list(self, parent, prefix='', delimiter=None, + start='', limit=10000, before=inf, + except_cluster=0, pathq=[], filterq=None): + """Return a (list of (path, serial) tuples, list of common prefixes) + for the current versions of the paths with the given parent, + matching the following criteria. + + The property tuple for a version is returned if all + of these conditions are true: + + a. parent matches + + b. path > start + + c. path starts with prefix (and paths in pathq) + + d. version is the max up to before + + e. version is not in cluster + + f. the path does not have the delimiter occuring + after the prefix, or ends with the delimiter + + g. serial matches the attribute filter query. + + A filter query is a comma-separated list of + terms in one of these three forms: + + key + an attribute with this key must exist + + !key + an attribute with this key must not exist + + key ?op value + the attribute with this key satisfies the value + where ?op is one of ==, != <=, >=, <, >. + + The list of common prefixes includes the prefixes + matching up to the first delimiter after prefix, + and are reported only once, as "virtual directories". + The delimiter is included in the prefixes. + + If arguments are None, then the corresponding matching rule + will always match. + + Limit applies to the first list of tuples returned. + """ + + execute = self.execute + + if not start or start < prefix: + start = strprevling(prefix) + nextling = strnextling(prefix) + + q = ("select distinct n.path, v.serial " + "from attributes a, versions v, nodes n " + "where v.serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < ?) " + "and v.cluster != ? " + "and v.node in (select node " + "from nodes " + "where parent = ?) " + "and a.serial = v.serial " + "and n.node = v.node " + "and n.path > ? and n.path < ?") + args = [before, except_cluster, parent, start, nextling] + + subq, subargs = self._construct_paths(pathq) + if subq is not None: + q += subq + args += subargs + subq, subargs = self._construct_filters(filterq) + if subq is not None: + q += subq + args += subargs + else: + q = q.replace("attributes a, ", "") + q = q.replace("and a.serial = v.serial ", "") + q += " order by n.path" + + if not delimiter: + q += " limit ?" + args.append(limit) + execute(q, args) + return self.fetchall(), () + + pfz = len(prefix) + dz = len(delimiter) + count = 0 + fetchone = self.fetchone + prefixes = [] + pappend = prefixes.append + matches = [] + mappend = matches.append + + execute(q, args) + while True: + props = fetchone() + if props is None: + break + path, serial = props + idx = path.find(delimiter, pfz) + + if idx < 0: + mappend(props) + count += 1 + if count >= limit: + break + continue + + pf = path[:idx + dz] + pappend(pf) + if idx + dz == len(path): + mappend(props) + count += 1 + if count >= limit: + break + + args[3] = strnextling(pf) # New start. + execute(q, args) + + return matches, prefixes diff --git a/pithos/backends/lib_alchemy/permissions.py b/pithos/backends/lib_alchemy/permissions.py new file mode 100644 index 0000000..1dea22f --- /dev/null +++ b/pithos/backends/lib_alchemy/permissions.py @@ -0,0 +1,149 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from sqlalchemy.sql import select, literal +from sqlalchemy.sql.expression import join + +from xfeatures import XFeatures +from groups import Groups +from public import Public + + +READ = 0 +WRITE = 1 + + +class Permissions(XFeatures, Groups, Public): + + def __init__(self, **params): + XFeatures.__init__(self, **params) + Groups.__init__(self, **params) + Public.__init__(self, **params) + + def access_grant(self, path, access, members=()): + """Grant members with access to path. + Members can also be '*' (all), + or some group specified as 'owner:group'.""" + + if not members: + return + feature = self.xfeature_create(path) + if feature is None: + return + self.feature_setmany(feature, access, members) + + def access_set(self, path, permissions): + """Set permissions for path. The permissions dict + maps 'read', 'write' keys to member lists.""" + + self.xfeature_destroy(path) + self.access_grant(path, READ, permissions.get('read', [])) + self.access_grant(path, WRITE, permissions.get('write', [])) + + def access_clear(self, path): + """Revoke access to path (both permissions and public).""" + + self.xfeature_destroy(path) + self.public_unset(path) + + def access_check(self, path, access, member): + """Return true if the member has this access to the path.""" + + if access == READ and self.public_check(path): + return True + + r = self.xfeature_inherit(path) + if not r: + return False + fpath, feature = r + members = self.feature_get(feature, access) + if member in members or '*' in members: + return True + for owner, group in self.group_parents(member): + if owner + ':' + group in members: + return True + return False + + def access_inherit(self, path): + """Return the inherited or assigned (path, permissions) pair for path.""" + + r = self.xfeature_inherit(path) + if not r: + return (path, {}) + fpath, feature = r + permissions = self.feature_dict(feature) + if READ in permissions: + permissions['read'] = permissions[READ] + del(permissions[READ]) + if WRITE in permissions: + permissions['write'] = permissions[WRITE] + del(permissions[WRITE]) + return (fpath, permissions) + + def access_list(self, path): + """List all permission paths inherited by or inheriting from path.""" + + return [x[0] for x in self.xfeature_list(path) if x[0] != path] + + def access_list_paths(self, member, prefix=None): + """Return the list of paths granted to member.""" + + xfeatures_xfeaturevals = self.xfeatures.join(self.xfeaturevals) + + selectable = (self.groups.c.owner + ':' + self.groups.c.name) + member_groups = select([selectable.label('value')], + self.groups.c.member == member) + + members = select([literal(member).label('value')]) + + extended_member_groups = member_groups.union(members).alias() + inner_join = join(xfeatures_xfeaturevals, + extended_member_groups, + self.xfeaturevals.c.value == extended_member_groups.c.value) + s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct() + if prefix: + s = s.where(self.xfeatures.c.path.like(prefix + '%')) + r = self.conn.execute(s) + l = [row[0] for row in r.fetchall()] + r.close() + return l + + def access_list_shared(self, prefix=''): + """Return the list of shared paths.""" + + s = select([self.xfeatures.c.path], + self.xfeatures.c.path.like(prefix + '%')) + r = self.conn.execute(s) + l = [row[0] for row in r.fetchall()] + r.close() + return l \ No newline at end of file diff --git a/pithos/backends/lib_alchemy/policy.py b/pithos/backends/lib_alchemy/policy.py new file mode 100644 index 0000000..e6dbd39 --- /dev/null +++ b/pithos/backends/lib_alchemy/policy.py @@ -0,0 +1,71 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from sqlalchemy import Table, Column, String, MetaData +from sqlalchemy.sql import select +from dbworker import DBWorker + + +class Policy(DBWorker): + """Paths can be assigned key, value pairs, representing policy.""" + + def __init__(self, **params): + DBWorker.__init__(self, **params) + metadata = MetaData() + columns=[] + #columns.append(Column('path', String(2048), primary_key=True)) + columns.append(Column('path', String(255), primary_key=True)) + columns.append(Column('key', String(255), primary_key=True)) + columns.append(Column('value', String(255))) + self.policies = Table('policy', metadata, *columns) + metadata.create_all(self.engine) + metadata.bind = self.engine + + def policy_set(self, path, policy): + s = self.policies.insert() + values = [{'path':path, 'key':k, 'value':v} for k,v in policy.iteritems()] + r = self.conn.execute(s, values) + r.close() + + def policy_unset(self, path): + s = self.policies.delete().where(self.policies.c.path==path) + r = self.conn.execute(s) + r.close() + + def policy_get(self, path): + s = select([self.policies.c.key, self.policies.c.value], + self.policies.c.path==self.policies.c.path==path) + r = self.conn.execute(s) + d = dict(r.fetchall()) + r.close() + return d \ No newline at end of file diff --git a/pithos/backends/lib_alchemy/public.py b/pithos/backends/lib_alchemy/public.py new file mode 100644 index 0000000..63aeebe --- /dev/null +++ b/pithos/backends/lib_alchemy/public.py @@ -0,0 +1,67 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from dbworker import DBWorker +from sqlalchemy import Table, Column, String, MetaData +from sqlalchemy.sql import select + +class Public(DBWorker): + """Paths can be marked as public.""" + + def __init__(self, **params): + DBWorker.__init__(self, **params) + metadata = MetaData() + columns=[] + #columns.append(Column('path', String(2048), primary_key=True)) + columns.append(Column('path', String(255), primary_key=True)) + self.public = Table('public', metadata, *columns) + metadata.create_all(self.engine) + metadata.bind = self.engine + + + def public_set(self, path): + s = self.public.insert() + r = self.conn.execute(s, path = path) + r.close() + + def public_unset(self, path): + s = self.public.delete().where(self.public.c.path == path) + r = self.conn.execute(s) + r.close() + + def public_check(self, path): + s = select([self.public.c.path], self.public.c.path == path) + r = self.conn.execute(s) + l = r.fetchone() + r.close() + return bool(l) \ No newline at end of file diff --git a/pithos/backends/lib_alchemy/xfeatures.py b/pithos/backends/lib_alchemy/xfeatures.py new file mode 100644 index 0000000..8ff0d5c --- /dev/null +++ b/pithos/backends/lib_alchemy/xfeatures.py @@ -0,0 +1,197 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +from collections import defaultdict +from sqlalchemy import Table, Column, String, Integer, MetaData, ForeignKey +from sqlalchemy.sql import select, and_ +from sqlalchemy.schema import Index +from sqlalchemy.sql.expression import desc + +from dbworker import DBWorker + + +class XFeatures(DBWorker): + """XFeatures are path properties that allow non-nested + inheritance patterns. Currently used for storing permissions. + """ + + def __init__(self, **params): + DBWorker.__init__(self, **params) + metadata = MetaData() + columns=[] + columns.append(Column('feature_id', Integer, primary_key=True)) + columns.append(Column('path', String(2048))) + self.xfeatures = Table('xfeatures', metadata, *columns) + # place an index on path + Index('idx_features_path', self.xfeatures.c.path) + + columns=[] + columns.append(Column('feature_id', Integer, + ForeignKey('xfeatures.feature_id', + ondelete='CASCADE'), + primary_key=True)) + columns.append(Column('key', Integer, autoincrement=False, + primary_key=True)) + columns.append(Column('value', String(255), primary_key=True)) + self.xfeaturevals = Table('xfeaturevals', metadata, *columns) + + metadata.create_all(self.engine) + metadata.bind = self.engine + + def xfeature_inherit(self, path): + """Return the (path, feature) inherited by the path, or None.""" + + s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id]) + s = s.where(self.xfeatures.c.path <= path) + s = s.order_by(desc(self.xfeatures.c.path)).limit(1) + r = self.conn.execute(s) + row = r.fetchone() + r.close() + if row and path.startswith(row[0]): + return row + else: + return None + + def xfeature_list(self, path): + """Return the list of the (prefix, feature) pairs matching path. + A prefix matches path if either the prefix includes the path, + or the path includes the prefix. + """ + + inherited = self.xfeature_inherit(path) + if inherited: + return [inherited] + + s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id]) + s = s.where(and_(self.xfeatures.c.path.like(path + '%'), + self.xfeatures.c.path != path)) + s = s.order_by(self.xfeatures.c.path) + r = self.conn.execute(s) + l = r.fetchall() + r.close() + return l + + def xfeature_create(self, path): + """Create and return a feature for path. + If the path already inherits a feature or + bestows to paths already inheriting a feature, + create no feature and return None. + If the path has a feature, return it. + """ + + prefixes = self.xfeature_list(path) + pl = len(prefixes) + if (pl > 1) or (pl == 1 and prefixes[0][0] != path): + return None + if pl == 1 and prefixes[0][0] == path: + return prefixes[0][1] + s = self.xfeatures.insert() + r = self.conn.execute(s, path=path) + inserted_primary_key = r.inserted_primary_key[0] + r.close() + return inserted_primary_key + + def xfeature_destroy(self, path): + """Destroy a feature and all its key, value pairs.""" + + s = self.xfeatures.delete().where(self.xfeatures.c.path == path) + r = self.conn.execute(s) + r.close() + + def feature_dict(self, feature): + """Return a dict mapping keys to list of values for feature.""" + + s = select([self.xfeaturevals.c.key, self.xfeaturevals.c.value]) + s = s.where(self.xfeaturevals.c.feature_id == feature) + r = self.conn.execute(s) + d = defaultdict(list) + for key, value in r.fetchall(): + d[key].append(value) + r.close() + return d + + def feature_set(self, feature, key, value): + """Associate a key, value pair with a feature.""" + + s = self.xfeaturevals.insert() + r = self.conn.execute(s, feature_id=feature, key=key, value=value) + r.close() + + def feature_setmany(self, feature, key, values): + """Associate the given key, and values with a feature.""" + + s = self.xfeaturevals.insert() + values = [{'feature_id':feature, 'key':key, 'value':v} for v in values] + r = self.conn.execute(s, values) + r.close() + + def feature_unset(self, feature, key, value): + """Disassociate a key, value pair from a feature.""" + + s = self.xfeaturevals.delete() + s = s.where(and_(self.xfeaturevals.c.feature_id == feature, + self.xfeaturevals.c.key == key, + self.xfeaturevals.c.value == value)) + r = self.conn.execute(s) + r.close() + + def feature_unsetmany(self, feature, key, values): + """Disassociate the key for the values given, from a feature.""" + + for v in values: + conditional = and_(self.xfeaturevals.c.feature_id == feature, + self.xfeaturevals.c.key == key, + self.xfeaturevals.c.value == v) + s = self.xfeaturevals.delete().where(conditional) + r = self.conn.execute(s) + r.close() + + def feature_get(self, feature, key): + """Return the list of values for a key of a feature.""" + + s = select([self.xfeaturevals.c.value]) + s = s.where(and_(self.xfeaturevals.c.feature_id == feature, + self.xfeaturevals.c.key == key)) + r = self.conn.execute(s) + l = [row[0] for row in r.fetchall()] + r.close() + return l + + def feature_clear(self, feature, key): + """Delete all key, value pairs for a key of a feature.""" + + s = self.xfeaturevals.delete() + s = s.where(and_(self.xfeaturevals.c.feature_id == feature, + self.xfeaturevals.c.key == key)) + r = self.conn.execute(s) + r.close() diff --git a/pithos/backends/modular_alchemy.py b/pithos/backends/modular_alchemy.py new file mode 100644 index 0000000..0ca00cc --- /dev/null +++ b/pithos/backends/modular_alchemy.py @@ -0,0 +1,844 @@ +# Copyright 2011 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. + +import os +import time +import sqlite3 +import logging +import hashlib +import binascii + +from base import NotAllowedError, BaseBackend +from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER +from lib_alchemy.permissions import Permissions, READ, WRITE +from lib_alchemy.policy import Policy +from lib_alchemy.hashfiler import Mapper, Blocker +from sqlalchemy import create_engine + +( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) + +inf = float('inf') + + +logger = logging.getLogger(__name__) + +def backend_method(func=None, autocommit=1): + if func is None: + def fn(func): + return backend_method(func, autocommit) + return fn + + if not autocommit: + return func + def fn(self, *args, **kw): + self.con.execute('begin deferred') + try: + ret = func(self, *args, **kw) + self.con.commit() + return ret + except: + self.con.rollback() + raise + return fn + + +class ModularBackend(BaseBackend): + """A modular backend. + + Uses modules for SQL functions and storage. + """ + + def __init__(self, db): + self.hash_algorithm = 'sha256' + self.block_size = 4 * 1024 * 1024 # 4MB + + self.default_policy = {'quota': 0, 'versioning': 'auto'} + + basepath = os.path.split(db)[0] + if basepath and not os.path.exists(basepath): + os.makedirs(basepath) + if not os.path.isdir(basepath): + raise RuntimeError("Cannot open database at '%s'" % (db,)) + + dbuser = 'pithos' + dbpass = 'archipelagos' + dbhost = '62.217.112.56' + dbname = 'pithosdb' + connection_str = 'mysql://%s:%s@%s/%s' %(dbuser, dbpass, dbhost, dbname) + engine = create_engine(connection_str, echo=True) + + params = {'blocksize': self.block_size, + 'blockpath': basepath + '/blocks', + 'hashtype': self.hash_algorithm} + self.blocker = Blocker(**params) + + params = {'mappath': basepath + '/maps', + 'namelen': self.blocker.hashlen} + self.mapper = Mapper(**params) + + params = {'connection': engine.connect(), + 'engine': engine} + self.permissions = Permissions(**params) + self.policy = Policy(**params) + self.node = Node(**params) + + self.con.commit() + + @backend_method + def list_accounts(self, user, marker=None, limit=10000): + """Return a list of accounts the user can access.""" + + logger.debug("list_accounts: %s %s", user, marker, limit) + allowed = self._allowed_accounts(user) + start, limit = self._list_limits(allowed, marker, limit) + return allowed[start:start + limit] + + @backend_method + def get_account_meta(self, user, account, until=None): + """Return a dictionary with the account metadata.""" + + logger.debug("get_account_meta: %s %s", account, until) + path, node = self._lookup_account(account, user == account) + if user != account: + if until or node is None or account not in self._allowed_accounts(user): + raise NotAllowedError + try: + props = self._get_properties(node, until) + mtime = props[MTIME] + except NameError: + props = None + mtime = until + count, bytes, tstamp = self._get_statistics(node, until) + tstamp = max(tstamp, mtime) + if until is None: + modified = tstamp + else: + modified = self._get_statistics(node)[2] # Overall last modification. + modified = max(modified, mtime) + + if user != account: + meta = {'name': account} + else: + meta = {} + if props is not None: + meta.update(dict(self.node.attribute_get(props[SERIAL]))) + if until is not None: + meta.update({'until_timestamp': tstamp}) + meta.update({'name': account, 'count': count, 'bytes': bytes}) + meta.update({'modified': modified}) + return meta + + @backend_method + def update_account_meta(self, user, account, meta, replace=False): + """Update the metadata associated with the account.""" + + logger.debug("update_account_meta: %s %s %s", account, meta, replace) + if user != account: + raise NotAllowedError + path, node = self._lookup_account(account, True) + self._put_metadata(user, node, meta, replace, False) + + @backend_method + def get_account_groups(self, user, account): + """Return a dictionary with the user groups defined for this account.""" + + logger.debug("get_account_groups: %s", account) + if user != account: + if account not in self._allowed_accounts(user): + raise NotAllowedError + return {} + self._lookup_account(account, True) + return self.permissions.group_dict(account) + + @backend_method + def update_account_groups(self, user, account, groups, replace=False): + """Update the groups associated with the account.""" + + logger.debug("update_account_groups: %s %s %s", account, groups, replace) + if user != account: + raise NotAllowedError + self._lookup_account(account, True) + self._check_groups(groups) + if replace: + self.permissions.group_destroy(account) + for k, v in groups.iteritems(): + if not replace: # If not already deleted. + self.permissions.group_delete(account, k) + if v: + self.permissions.group_addmany(account, k, v) + + @backend_method + def put_account(self, user, account): + """Create a new account with the given name.""" + + logger.debug("put_account: %s", account) + if user != account: + raise NotAllowedError + node = self.node.node_lookup(account) + if node is not None: + raise NameError('Account already exists') + self._put_path(user, ROOTNODE, account) + + @backend_method + def delete_account(self, user, account): + """Delete the account with the given name.""" + + logger.debug("delete_account: %s", account) + if user != account: + raise NotAllowedError + node = self.node.node_lookup(account) + if node is None: + return + if not self.node.node_remove(node): + raise IndexError('Account is not empty') + self.permissions.group_destroy(account) + + @backend_method + def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None): + """Return a list of containers existing under an account.""" + + logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until) + if user != account: + if until or account not in self._allowed_accounts(user): + raise NotAllowedError + allowed = self._allowed_containers(user, account) + start, limit = self._list_limits(allowed, marker, limit) + return allowed[start:start + limit] + if shared: + allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)] + start, limit = self._list_limits(allowed, marker, limit) + return allowed[start:start + limit] + node = self.node.node_lookup(account) + return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)] + + @backend_method + def get_container_meta(self, user, account, container, until=None): + """Return a dictionary with the container metadata.""" + + logger.debug("get_container_meta: %s %s %s", account, container, until) + if user != account: + if until or container not in self._allowed_containers(user, account): + raise NotAllowedError + path, node = self._lookup_container(account, container) + props = self._get_properties(node, until) + mtime = props[MTIME] + count, bytes, tstamp = self._get_statistics(node, until) + tstamp = max(tstamp, mtime) + if until is None: + modified = tstamp + else: + modified = self._get_statistics(node)[2] # Overall last modification. + modified = max(modified, mtime) + + if user != account: + meta = {'name': container} + else: + meta = dict(self.node.attribute_get(props[SERIAL])) + if until is not None: + meta.update({'until_timestamp': tstamp}) + meta.update({'name': container, 'count': count, 'bytes': bytes}) + meta.update({'modified': modified}) + return meta + + @backend_method + def update_container_meta(self, user, account, container, meta, replace=False): + """Update the metadata associated with the container.""" + + logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace) + if user != account: + raise NotAllowedError + path, node = self._lookup_container(account, container) + self._put_metadata(user, node, meta, replace, False) + + @backend_method + def get_container_policy(self, user, account, container): + """Return a dictionary with the container policy.""" + + logger.debug("get_container_policy: %s %s", account, container) + if user != account: + if container not in self._allowed_containers(user, account): + raise NotAllowedError + return {} + path = self._lookup_container(account, container)[0] + return self.policy.policy_get(path) + + @backend_method + def update_container_policy(self, user, account, container, policy, replace=False): + """Update the policy associated with the account.""" + + logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace) + if user != account: + raise NotAllowedError + path = self._lookup_container(account, container)[0] + self._check_policy(policy) + if replace: + for k, v in self.default_policy.iteritems(): + if k not in policy: + policy[k] = v + self.policy.policy_set(path, policy) + + @backend_method + def put_container(self, user, account, container, policy=None): + """Create a new container with the given name.""" + + logger.debug("put_container: %s %s %s", account, container, policy) + if user != account: + raise NotAllowedError + try: + path, node = self._lookup_container(account, container) + except NameError: + pass + else: + raise NameError('Container already exists') + if policy: + self._check_policy(policy) + path = '/'.join((account, container)) + self._put_path(user, self._lookup_account(account, True)[1], path) + for k, v in self.default_policy.iteritems(): + if k not in policy: + policy[k] = v + self.policy.policy_set(path, policy) + + @backend_method + def delete_container(self, user, account, container, until=None): + """Delete/purge the container with the given name.""" + + logger.debug("delete_container: %s %s %s", account, container, until) + if user != account: + raise NotAllowedError + path, node = self._lookup_container(account, container) + + if until is not None: + versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY) + for v in versions: + self.mapper.map_remv(v) + self.node.node_purge_children(node, until, CLUSTER_DELETED) + return + + if self._get_statistics(node)[0] > 0: + raise IndexError('Container is not empty') + versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) + for v in versions: + self.mapper.map_remv(v) + self.node.node_purge_children(node, inf, CLUSTER_DELETED) + self.node.node_remove(node) + self.policy.policy_unset(path) + + @backend_method + def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None): + """Return a list of objects existing under a container.""" + + logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until) + allowed = [] + if user != account: + if until: + raise NotAllowedError + allowed = self.permissions.access_list_paths(user, '/'.join((account, container))) + if not allowed: + raise NotAllowedError + else: + if shared: + allowed = self.permissions.access_list_shared('/'.join((account, container))) + path, node = self._lookup_container(account, container) + return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed) + + @backend_method + def list_object_meta(self, user, account, container, until=None): + """Return a list with all the container's object meta keys.""" + + logger.debug("list_object_meta: %s %s %s", account, container, until) + allowed = [] + if user != account: + if until: + raise NotAllowedError + allowed = self.permissions.access_list_paths(user, '/'.join((account, container))) + if not allowed: + raise NotAllowedError + path, node = self._lookup_container(account, container) + before = until if until is not None else inf + return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed) + + @backend_method + def get_object_meta(self, user, account, container, name, version=None): + """Return a dictionary with the object metadata.""" + + logger.debug("get_object_meta: %s %s %s %s", account, container, name, version) + self._can_read(user, account, container, name) + path, node = self._lookup_object(account, container, name) + props = self._get_version(node, version) + if version is None: + modified = props[MTIME] + else: + modified = self._get_version(node)[MTIME] # Overall last modification. + + meta = dict(self.node.attribute_get(props[SERIAL])) + meta.update({'name': name, 'bytes': props[SIZE]}) + meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]}) + meta.update({'modified': modified, 'modified_by': props[MUSER]}) + return meta + + @backend_method + def update_object_meta(self, user, account, container, name, meta, replace=False): + """Update the metadata associated with the object.""" + + logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace) + self._can_write(user, account, container, name) + path, node = self._lookup_object(account, container, name) + self._put_metadata(user, node, meta, replace) + + @backend_method + def get_object_permissions(self, user, account, container, name): + """Return the path from which this object gets its permissions from,\ + along with a dictionary containing the permissions.""" + + logger.debug("get_object_permissions: %s %s %s", account, container, name) + self._can_read(user, account, container, name) + path = self._lookup_object(account, container, name)[0] + return self.permissions.access_inherit(path) + + @backend_method + def update_object_permissions(self, user, account, container, name, permissions): + """Update the permissions associated with the object.""" + + logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions) + if user != account: + raise NotAllowedError + path = self._lookup_object(account, container, name)[0] + self._check_permissions(path, permissions) + self.permissions.access_set(path, permissions) + + @backend_method + def get_object_public(self, user, account, container, name): + """Return the public URL of the object if applicable.""" + + logger.debug("get_object_public: %s %s %s", account, container, name) + self._can_read(user, account, container, name) + path = self._lookup_object(account, container, name)[0] + if self.permissions.public_check(path): + return '/public/' + path + return None + + @backend_method + def update_object_public(self, user, account, container, name, public): + """Update the public status of the object.""" + + logger.debug("update_object_public: %s %s %s %s", account, container, name, public) + self._can_write(user, account, container, name) + path = self._lookup_object(account, container, name)[0] + if not public: + self.permissions.public_unset(path) + else: + self.permissions.public_set(path) + + @backend_method + def get_object_hashmap(self, user, account, container, name, version=None): + """Return the object's size and a list with partial hashes.""" + + logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version) + self._can_read(user, account, container, name) + path, node = self._lookup_object(account, container, name) + props = self._get_version(node, version) + hashmap = self.mapper.map_retr(props[SERIAL]) + return props[SIZE], [binascii.hexlify(x) for x in hashmap] + + @backend_method + def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None): + """Create/update an object with the specified size and partial hashes.""" + + logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap) + if permissions is not None and user != account: + raise NotAllowedError + self._can_write(user, account, container, name) + missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap]) + if missing: + ie = IndexError() + ie.data = missing + raise ie + if permissions is not None: + self._check_permissions(path, permissions) + path, node = self._put_object_node(account, container, name) + src_version_id, dest_version_id = self._copy_version(user, node, None, node, size) + self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap]) + if not replace_meta and src_version_id is not None: + self.node.attribute_copy(src_version_id, dest_version_id) + self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems())) + if permissions is not None: + self.permissions.access_set(path, permissions) + + @backend_method + def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None): + """Copy an object's data and metadata.""" + + logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version) + if permissions is not None and user != account: + raise NotAllowedError + self._can_read(user, account, src_container, src_name) + self._can_write(user, account, dest_container, dest_name) + src_path, src_node = self._lookup_object(account, src_container, src_name) + if permissions is not None: + self._check_permissions(dest_path, permissions) + dest_path, dest_node = self._put_object_node(account, dest_container, dest_name) + src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node) + if src_version_id is not None: + self._copy_data(src_version_id, dest_version_id) + if not replace_meta and src_version_id is not None: + self.node.attribute_copy(src_version_id, dest_version_id) + self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems())) + if permissions is not None: + self.permissions.access_set(dest_path, permissions) + + @backend_method + def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None): + """Move an object's data and metadata.""" + + logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions) + self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None) + self.delete_object(user, account, src_container, src_name) + + @backend_method + def delete_object(self, user, account, container, name, until=None): + """Delete/purge an object.""" + + logger.debug("delete_object: %s %s %s %s", account, container, name, until) + if user != account: + raise NotAllowedError + + if until is not None: + path = '/'.join((account, container, name)) + node = self.node.node_lookup(path) + if node is None: + return + versions = self.node.node_purge(node, until, CLUSTER_NORMAL) + versions += self.node.node_purge(node, until, CLUSTER_HISTORY) + for v in versions: + self.mapper.map_remv(v) + self.node.node_purge_children(node, until, CLUSTER_DELETED) + try: + props = self._get_version(node) + except NameError: + pass + else: + self.permissions.access_clear(path) + return + + path, node = self._lookup_object(account, container, name) + self._copy_version(user, node, None, node, 0, CLUSTER_DELETED) + self.permissions.access_clear(path) + + @backend_method + def list_versions(self, user, account, container, name): + """Return a list of all (version, version_timestamp) tuples for an object.""" + + logger.debug("list_versions: %s %s %s", account, container, name) + self._can_read(user, account, container, name) + return self.node.node_get_versions(node, ['serial', 'mtime']) + + @backend_method(autocommit=0) + def get_block(self, hash): + """Return a block's data.""" + + logger.debug("get_block: %s", hash) + blocks = self.blocker.block_retr((binascii.unhexlify(hash),)) + if not blocks: + raise NameError('Block does not exist') + return blocks[0] + + @backend_method(autocommit=0) + def put_block(self, data): + """Create a block and return the hash.""" + + logger.debug("put_block: %s", len(data)) + hashes, absent = self.blocker.block_stor((data,)) + return binascii.hexlify(hashes[0]) + + @backend_method(autocommit=0) + def update_block(self, hash, data, offset=0): + """Update a known block and return the hash.""" + + logger.debug("update_block: %s %s %s", hash, len(data), offset) + if offset == 0 and len(data) == self.block_size: + return self.put_block(data) + h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),)) + return binascii.hexlify(h) + + def _check_policy(self, policy): + for k in policy.keys(): + if policy[k] == '': + policy[k] = self.default_policy.get(k) + for k, v in policy.iteritems(): + if k == 'quota': + q = int(v) # May raise ValueError. + if q < 0: + raise ValueError + elif k == 'versioning': + if v not in ['auto', 'manual', 'none']: + raise ValueError + else: + raise ValueError + + def _sql_until(self, parent, until=None): + """Return the sql to get the latest versions until the timestamp given.""" + + if until is None: + until = time.time() + sql = ("select v.serial, n.path, v.mtime, v.size " + "from versions v, nodes n " + "where v.serial = (select max(serial) " + "from versions " + "where node = v.node and mtime < %s) " + "and v.cluster != %s " + "and v.node = n.node " + "and v.node in (select node " + "from nodes " + "where parent = %s)") + return sql % (until, CLUSTER_DELETED, parent) + + def _list_limits(self, listing, marker, limit): + start = 0 + if marker: + try: + start = listing.index(marker) + 1 + except ValueError: + pass + if not limit or limit > 10000: + limit = 10000 + return start, limit + + def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]): + cont_prefix = path + '/' + if keys and len(keys) > 0: +# sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and +# m.version_id = o.version_id and m.key in (%s)''' +# sql = sql % (self._sql_until(until), ', '.join('?' * len(keys))) +# param = (cont_prefix + prefix + '%',) + tuple(keys) +# if allowed: +# sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')' +# param += tuple([x + '%' for x in allowed]) +# sql += ' order by o.name' + return [] + else: + sql = 'select path, serial from (%s) where path like ?' + sql = sql % self._sql_until(parent, until) + param = (cont_prefix + prefix + '%',) + if allowed: + sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')' + param += tuple([x + '%' for x in allowed]) + sql += ' order by path' + c = self.con.execute(sql, param) + objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()] + if delimiter: + pseudo_objects = [] + for x in objects: + pseudo_name = x[0] + i = pseudo_name.find(delimiter, len(prefix)) + if not virtual: + # If the delimiter is not found, or the name ends + # with the delimiter's first occurence. + if i == -1 or len(pseudo_name) == i + len(delimiter): + pseudo_objects.append(x) + else: + # If the delimiter is found, keep up to (and including) the delimiter. + if i != -1: + pseudo_name = pseudo_name[:i + len(delimiter)] + if pseudo_name not in [y[0] for y in pseudo_objects]: + if pseudo_name == x[0]: + pseudo_objects.append(x) + else: + pseudo_objects.append((pseudo_name, None)) + objects = pseudo_objects + + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] + + # Path functions. + + def _put_object_node(self, account, container, name): + path, parent = self._lookup_container(account, container) + path = '/'.join((path, name)) + node = self.node.node_lookup(path) + if node is None: + node = self.node.node_create(parent, path) + return path, node + + def _put_path(self, user, parent, path): + node = self.node.node_create(parent, path) + self.node.version_create(node, 0, None, user, CLUSTER_NORMAL) + return node + + def _lookup_account(self, account, create=True): + node = self.node.node_lookup(account) + if node is None and create: + node = self._put_path(account, ROOTNODE, account) # User is account. + return account, node + + def _lookup_container(self, account, container): + path = '/'.join((account, container)) + node = self.node.node_lookup(path) + if node is None: + raise NameError('Container does not exist') + return path, node + + def _lookup_object(self, account, container, name): + path = '/'.join((account, container, name)) + node = self.node.node_lookup(path) + if node is None: + raise NameError('Object does not exist') + return path, node + + def _get_properties(self, node, until=None): + """Return properties until the timestamp given.""" + + before = until if until is not None else inf + props = self.node.version_lookup(node, before, CLUSTER_NORMAL) + if props is None and until is not None: + props = self.node.version_lookup(node, before, CLUSTER_HISTORY) + if props is None: + raise NameError('Path does not exist') + return props + + def _get_statistics(self, node, until=None): + """Return count, sum of size and latest timestamp of everything under node.""" + + if until is None: + stats = self.node.statistics_get(node, CLUSTER_NORMAL) + else: + stats = self.node.statistics_latest(node, until, CLUSTER_DELETED) + if stats is None: + stats = (0, 0, 0) + return stats + + def _get_version(self, node, version=None): + if version is None: + props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) + if props is None: + raise NameError('Object does not exist') + else: + props = self.node.version_get_properties(version) + if props is None or props[CLUSTER] == CLUSTER_DELETED: + raise IndexError('Version does not exist') + return props + + def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL): + + # Get source serial and size. + if src_version is not None: + src_props = self._get_version(src_node, src_version) + src_version_id = src_props[SERIAL] + size = src_props[SIZE] + else: + # Latest or create from scratch. + try: + src_props = self._get_version(src_node) + src_version_id = src_props[SERIAL] + size = src_props[SIZE] + except NameError: + src_version_id = None + size = 0 + if dest_size is not None: + size = dest_size + + # Move the latest version at destination to CLUSTER_HISTORY and create new. + if src_node == dest_node and src_version is None and src_version_id is not None: + self.node.version_recluster(src_version_id, CLUSTER_HISTORY) + else: + dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL) + if dest_props is not None: + self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY) + dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster) + + return src_version_id, dest_version_id + + def _copy_data(self, src_version, dest_version): + hashmap = self.mapper.map_retr(src_version) + self.mapper.map_stor(dest_version, hashmap) + + def _get_metadata(self, version): + if version is None: + return {} + return dict(self.node.attribute_get(version)) + + def _put_metadata(self, user, node, meta, replace=False, copy_data=True): + """Create a new version and store metadata.""" + + src_version_id, dest_version_id = self._copy_version(user, node, None, node) + if not replace: + if src_version_id is not None: + self.node.attribute_copy(src_version_id, dest_version_id) + self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == '')) + self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != '')) + else: + self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems())) + if copy_data and src_version_id is not None: + self._copy_data(src_version_id, dest_version_id) + + # Access control functions. + + def _check_groups(self, groups): + # raise ValueError('Bad characters in groups') + pass + + def _check_permissions(self, path, permissions): + # raise ValueError('Bad characters in permissions') + + # Check for existing permissions. + paths = self.permissions.access_list(path) + if paths: + ae = AttributeError() + ae.data = paths + raise ae + + def _can_read(self, user, account, container, name): + if user == account: + return True + path = '/'.join((account, container, name)) + if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user): + raise NotAllowedError + + def _can_write(self, user, account, container, name): + if user == account: + return True + path = '/'.join((account, container, name)) + if not self.permissions.access_check(path, WRITE, user): + raise NotAllowedError + + def _allowed_accounts(self, user): + allow = set() + for path in self.permissions.access_list_paths(user): + allow.add(path.split('/', 1)[0]) + return sorted(allow) + + def _allowed_containers(self, user, account): + allow = set() + for path in self.permissions.access_list_paths(user, account): + allow.add(path.split('/', 2)[1]) + return sorted(allow)