Merge branch 'master' of https://code.grnet.gr/git/pithos
authorSofia Papagiannaki <papagian@gmail.com>
Thu, 11 Aug 2011 14:39:20 +0000 (17:39 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Thu, 11 Aug 2011 14:39:20 +0000 (17:39 +0300)
18 files changed:
pithos/backends/lib/node.py
pithos/backends/lib_alchemy/__init__.py [new file with mode: 0644]
pithos/backends/lib_alchemy/dbworker.py [new file with mode: 0644]
pithos/backends/lib_alchemy/groups.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/__init__.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/blocker.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/context_file.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py [new file with mode: 0644]
pithos/backends/lib_alchemy/hashfiler/mapper.py [new file with mode: 0644]
pithos/backends/lib_alchemy/node.py [new file with mode: 0644]
pithos/backends/lib_alchemy/permissions.py [new file with mode: 0644]
pithos/backends/lib_alchemy/policy.py [new file with mode: 0644]
pithos/backends/lib_alchemy/public.py [new file with mode: 0644]
pithos/backends/lib_alchemy/xfeatures.py [new file with mode: 0644]
pithos/backends/modular_alchemy.py [new file with mode: 0644]

index 8e7a7c4..2bad014 100644 (file)
@@ -139,10 +139,10 @@ class Node(DBWorker):
                             on update cascade
                             on delete cascade ) """)
         execute(""" create index if not exists idx_versions_node
-                    on nodes(node) """)
+                    on versions(node) """)
         # TODO: Sort out if more indexes are needed.
         # execute(""" create index if not exists idx_versions_mtime
-        #             on nodes(mtime) """)
+        #             on versions(mtime) """)
         
         execute(""" create table if not exists attributes
                           ( serial integer,
diff --git a/pithos/backends/lib_alchemy/__init__.py b/pithos/backends/lib_alchemy/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/pithos/backends/lib_alchemy/dbworker.py b/pithos/backends/lib_alchemy/dbworker.py
new file mode 100644 (file)
index 0000000..414163e
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+
+class DBWorker(object):
+    """Database connection handler."""
+    
+    def __init__(self, **params):
+        self.params = params
+        self.conn = params['connection']
+        self.engine = params['engine']
diff --git a/pithos/backends/lib_alchemy/groups.py b/pithos/backends/lib_alchemy/groups.py
new file mode 100644 (file)
index 0000000..19d2270
--- /dev/null
@@ -0,0 +1,148 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from collections import defaultdict
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select, and_
+from sqlalchemy.schema import Index
+from dbworker import DBWorker
+
+class Groups(DBWorker):
+    """Groups are named collections of members, belonging to an owner."""
+    
+    def __init__(self, **params):
+        DBWorker.__init__(self, **params)
+        metadata = MetaData()
+        columns=[]
+        columns.append(Column('owner', String(255), primary_key=True))
+        columns.append(Column('name', String(255), primary_key=True))
+        columns.append(Column('member', String(255), primary_key=True))
+        self.groups = Table('groups', metadata, *columns)
+        
+        # place an index on member
+        Index('idx_groups_member', self.groups.c.member)
+        
+        metadata.create_all(self.engine)
+        metadata.bind = self.engine
+    
+    def group_names(self, owner):
+        """List all group names belonging to owner."""
+        
+        s = select([self.groups.c.name],
+            self.groups.c.owner==owner).distinct()
+        r = self.conn.execute(s)
+        l = [row[0] for row in r.fetchall()]
+        r.close()
+        return l
+    
+    def group_dict(self, owner):
+        """Return a dict mapping group names to member lists for owner."""
+        
+        s = select([self.groups.c.name, self.groups.c.member],
+            self.groups.c.owner==owner)
+        r = self.conn.execute(s)
+        d = defaultdict(list)
+        for group, member in r.fetchall():
+            d[group].append(member)
+        r.close()
+        return d
+    
+    def group_add(self, owner, group, member):
+        """Add a member to a group."""
+        
+        s = self.groups.insert()
+        r = self.conn.execute(s, owner=owner, name=group, member=member)
+        r.close()
+    
+    def group_addmany(self, owner, group, members):
+        """Add members to a group."""
+        
+        s = self.groups.insert()
+        values = [{'owner':owner, 'name':group, 'member':member} for member in members]
+        r = self.conn.execute(s, values)
+        r.close()
+    
+    def group_remove(self, owner, group, member):
+        """Remove a member from a group."""
+        
+        s = self.groups.delete().where(and_(self.groups.c.owner==owner,
+                                            self.groups.c.name==group,
+                                            self.groups.c.member==member))
+        r = self.conn.execute(s)
+        r.close()
+    
+    def group_delete(self, owner, group):
+        """Delete a group."""
+        
+        s = self.groups.delete().where(and_(self.groups.c.owner==owner,
+                                            self.groups.c.name==group))
+        r = self.conn.execute(s)
+        r.close()
+    
+    def group_destroy(self, owner):
+        """Delete all groups belonging to owner."""
+        
+        s = self.groups.delete().where(self.groups.c.owner==owner)
+        r = self.conn.execute(s)
+        r.close()
+    
+    def group_members(self, owner, group):
+        """Return the list of members of a group."""
+        
+        s = select([self.groups.c.member], and_(self.groups.c.owner==owner,
+                                                self.groups.c.name==group))
+        r = self.conn.execute(s)
+        l = [row[0] for row in r.fetchall()]
+        r.close()
+        return l
+    
+    def group_check(self, owner, group, member):
+        """Check if a member is in a group."""
+        
+        s = select([self.groups.c.member], and_(self.groups.c.owner==owner,
+                           self.groups.c.name==group,
+                           self.groups.c.member==member))
+        r = self.conn.execute(s)
+        l = r.fetchone()
+        r.close()
+        return bool(l)
+    
+    def group_parents(self, member):
+        """Return all (owner, group) tuples that contain member."""
+        
+        s = select([self.groups.c.owner, self.groups.c.name],
+            self.groups.c.member==member)
+        r = self.conn.execute(s)
+        l = r.fetchall()
+        r.close()
+        return l
diff --git a/pithos/backends/lib_alchemy/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/__init__.py
new file mode 100644 (file)
index 0000000..aa3b929
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/blocker.py
new file mode 100644 (file)
index 0000000..47293bc
--- /dev/null
@@ -0,0 +1,211 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+    """Blocker.
+       Required contstructor parameters: blocksize, blockpath, hashtype.
+    """
+
+    blocksize = None
+    blockpath = None
+    hashtype = None
+
+    def __init__(self, **params):
+        blocksize = params['blocksize']
+        blockpath = params['blockpath']
+        blockpath = realpath(blockpath)
+        if not isdir(blockpath):
+            if not exists(blockpath):
+                makedirs(blockpath)
+            else:
+                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+        hashtype = params['hashtype']
+        try:
+            hasher = newhasher(hashtype)
+        except ValueError:
+            msg = "Variable hashtype '%s' is not available from hashlib"
+            raise ValueError(msg % (hashtype,))
+
+        hasher.update("")
+        emptyhash = hasher.digest()
+
+        self.blocksize = blocksize
+        self.blockpath = blockpath
+        self.hashtype = hashtype
+        self.hashlen = len(emptyhash)
+        self.emptyhash = emptyhash
+
+    def get_rear_block(self, blkhash, create=0):
+        filename = hexlify(blkhash)
+        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+        if not exists(dir):
+            makedirs(dir)
+        name = join(dir, filename)
+        return ContextFile(name, create)
+
+    def check_rear_block(self, blkhash):
+        filename = hexlify(blkhash)
+        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+        name = join(dir, filename)
+        return exists(name)
+
+    def block_hash(self, data):
+        """Hash a block of data"""
+        hasher = newhasher(self.hashtype)
+        hasher.update(data.rstrip('\x00'))
+        return hasher.digest()
+
+    def block_ping(self, hashes):
+        """Check hashes for existence and
+           return those missing from block storage.
+        """
+        missing = []
+        append = missing.append
+        for i, h in enumerate(hashes):
+            if not self.check_rear_block(h):
+                append(i)
+        return missing
+
+    def block_retr(self, hashes):
+        """Retrieve blocks from storage by their hashes."""
+        blocksize = self.blocksize
+        blocks = []
+        append = blocks.append
+        block = None
+
+        for h in hashes:
+            with self.get_rear_block(h, 0) as rbl:
+                if not rbl:
+                    break
+                for block in rbl.sync_read_chunks(blocksize, 1, 0):
+                    break # there should be just one block there
+            if not block:
+                break
+            append(block)
+
+        return blocks
+
+    def block_stor(self, blocklist):
+        """Store a bunch of blocks and return (hashes, missing).
+           Hashes is a list of the hashes of the blocks,
+           missing is a list of indices in that list indicating
+           which blocks were missing from the store.
+        """
+        block_hash = self.block_hash
+        hashlist = [block_hash(b) for b in blocklist]
+        mf = None
+        missing = self.block_ping(hashlist)
+        for i in missing:
+            with self.get_rear_block(hashlist[i], 1) as rbl:
+                 rbl.sync_write(blocklist[i]) #XXX: verify?
+
+        return hashlist, missing
+
+    def block_delta(self, blkhash, offdata=()):
+        """Construct and store a new block from a given block
+           and a list of (offset, data) 'patches'. Return:
+           (the hash of the new block, if the block already existed)
+        """
+        if not offdata:
+            return None, None
+
+        blocksize = self.blocksize
+        block = self.block_retr((blkhash,))
+        if not block:
+            return None, None
+
+        block = block[0]
+        newblock = ''
+        idx = 0
+        size = 0
+        trunc = 0
+        for off, data in offdata:
+            if not data:
+                trunc = 1
+                break
+            newblock += block[idx:off] + data
+            size += off - idx + len(data)
+            if size >= blocksize:
+                break
+            off = size
+
+        if not trunc:
+            newblock += block[size:len(block)]
+
+        h, a = self.block_stor((newblock,))
+        return h[0], 1 if a else 0
+
+    def block_hash_file(self, openfile):
+        """Return the list of hashes (hashes map)
+           for the blocks in a buffered file.
+           Helper method, does not affect store.
+        """
+        hashes = []
+        append = hashes.append
+        block_hash = self.block_hash
+
+        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+            append(block_hash(block))
+
+        return hashes
+
+    def block_stor_file(self, openfile):
+        """Read blocks from buffered file object and store them. Return:
+           (bytes read, list of hashes, list of hashes that were missing)
+        """
+        blocksize = self.blocksize
+        block_stor = self.block_stor
+        hashlist = []
+        hextend = hashlist.extend
+        storedlist = []
+        sextend = storedlist.extend
+        lastsize = 0
+
+        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+            hl, sl = block_stor((block,))
+            hextend(hl)
+            sextend(sl)
+            lastsize = len(block)
+
+        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+        return size, hashlist, storedlist
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/context_file.py
new file mode 100644 (file)
index 0000000..0a16b2a
--- /dev/null
@@ -0,0 +1,191 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+    global _zeros
+    size = len(_zeros)
+    if nr == size:
+        return _zeros
+
+    if nr > size:
+        _zeros += '\0' * (nr - size)
+        return _zeros
+
+    if nr < size:
+        _zeros = _zeros[:nr]
+        return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+    """Write given chunks to the given buffered file object.
+       Writes never span across chunk boundaries.
+       If size is given stop after or pad until size bytes have been written.
+    """
+    fwrite = openfile.write
+    seek = openfile.seek
+    padding = 0
+
+    try:
+        seek(offset * chunksize)
+    except IOError, e:
+        seek = None
+        for x in xrange(offset):
+            fwrite(zeros(chunksize))
+
+    cursize = offset * chunksize
+
+    for chunk in chunks:
+        if padding:
+            if seek:
+                seek(padding -1, SEEK_CUR)
+                fwrite("\x00")
+            else:
+                fwrite(buffer(zeros(chunksize), 0, padding))
+        if size is not None and cursize + chunksize >= size:
+            chunk = chunk[:chunksize - (cursize - size)]
+            fwrite(chunk)
+            cursize += len(chunk)
+            break
+        fwrite(chunk)
+        padding = chunksize - len(chunk)
+
+    padding = size - cursize if size is not None else 0
+    if padding <= 0:
+        return
+
+    q, r = divmod(padding, chunksize)
+    for x in xrange(q):
+        fwrite(zeros(chunksize))
+    fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+    """Read and yield groups of chunks from a buffered file object at offset.
+       Reads never span accros chunksize boundaries.
+    """
+    fread = openfile.read
+    remains = offset * chunksize
+    seek = openfile.seek
+    try:
+        seek(remains)
+    except IOError, e:
+        seek = None
+        while 1:
+            s = fread(remains)
+            remains -= len(s)
+            if remains <= 0:
+                break
+
+    while nr:
+        remains = chunksize
+        chunk = ''
+        while 1:
+            s = fread(remains)
+            if not s:
+                if chunk:
+                    yield chunk
+                return
+            chunk += s
+            remains -= len(s)
+            if remains <= 0:
+                break
+        yield chunk
+        nr -= 1
+
+
+class ContextFile(object):
+    __slots__ = ("name", "fdesc", "create")
+
+    def __init__(self, name, create=0):
+        self.name = name
+        self.fdesc = None
+        self.create = create
+        #self.dirty = 0
+
+    def __enter__(self):
+        name = self.name
+        try:
+            fdesc = open(name, 'rb+')
+        except IOError, e:
+            if not self.create or e.errno != ENOENT:
+                raise
+            fdesc = open(name, 'w+')
+
+        self.fdesc = fdesc
+        return self
+
+    def __exit__(self, exc, arg, trace):
+        fdesc = self.fdesc
+        if fdesc is not None:
+            #if self.dirty:
+            #    fsync(fdesc.fileno())
+            fdesc.close()
+        return False # propagate exceptions
+
+    def seek(self, offset, whence=SEEK_SET):
+        return self.fdesc.seek(offset, whence)
+
+    def tell(self):
+        return self.fdesc.tell()
+
+    def truncate(self, size):
+        self.fdesc.truncate(size)
+
+    def sync_write(self, data):
+        #self.dirty = 1
+        self.fdesc.write(data)
+
+    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+        #self.dirty = 1
+        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+    def sync_read(self, size):
+        read = self.fdesc.read
+        data = ''
+        while 1:
+            s = read(size)
+            if not s:
+                break
+            data += s
+        return data
+
+    def sync_read_chunks(self, chunksize, nr, offset=0):
+        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py
new file mode 100644 (file)
index 0000000..d3cae88
--- /dev/null
@@ -0,0 +1,38 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
+__all__ = ["Blocker", "Mapper"]
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py
new file mode 100644 (file)
index 0000000..59fd31f
--- /dev/null
@@ -0,0 +1,211 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+    """Blocker.
+       Required contstructor parameters: blocksize, blockpath, hashtype.
+    """
+
+    blocksize = None
+    blockpath = None
+    hashtype = None
+
+    def __init__(self, **params):
+        blocksize = params['blocksize']
+        blockpath = params['blockpath']
+        blockpath = realpath(blockpath)
+        if not isdir(blockpath):
+            if not exists(blockpath):
+                makedirs(blockpath)
+            else:
+                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+        hashtype = params['hashtype']
+        try:
+            hasher = newhasher(hashtype)
+        except ValueError:
+            msg = "Variable hashtype '%s' is not available from hashlib"
+            raise ValueError(msg % (hashtype,))
+
+        hasher.update("")
+        emptyhash = hasher.digest()
+
+        self.blocksize = blocksize
+        self.blockpath = blockpath
+        self.hashtype = hashtype
+        self.hashlen = len(emptyhash)
+        self.emptyhash = emptyhash
+
+    def _get_rear_block(self, blkhash, create=0):
+        filename = hexlify(blkhash)
+        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+        if not exists(dir):
+            makedirs(dir)
+        name = join(dir, filename)
+        return ContextFile(name, create)
+
+    def _check_rear_block(self, blkhash):
+        filename = hexlify(blkhash)
+        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+        name = join(dir, filename)
+        return exists(name)
+
+    def block_hash(self, data):
+        """Hash a block of data"""
+        hasher = newhasher(self.hashtype)
+        hasher.update(data.rstrip('\x00'))
+        return hasher.digest()
+
+    def block_ping(self, hashes):
+        """Check hashes for existence and
+           return those missing from block storage.
+        """
+        missing = []
+        append = missing.append
+        for i, h in enumerate(hashes):
+            if not self._check_rear_block(h):
+                append(i)
+        return missing
+
+    def block_retr(self, hashes):
+        """Retrieve blocks from storage by their hashes."""
+        blocksize = self.blocksize
+        blocks = []
+        append = blocks.append
+        block = None
+
+        for h in hashes:
+            with self._get_rear_block(h, 0) as rbl:
+                if not rbl:
+                    break
+                for block in rbl.sync_read_chunks(blocksize, 1, 0):
+                    break # there should be just one block there
+            if not block:
+                break
+            append(block)
+
+        return blocks
+
+    def block_stor(self, blocklist):
+        """Store a bunch of blocks and return (hashes, missing).
+           Hashes is a list of the hashes of the blocks,
+           missing is a list of indices in that list indicating
+           which blocks were missing from the store.
+        """
+        block_hash = self.block_hash
+        hashlist = [block_hash(b) for b in blocklist]
+        mf = None
+        missing = self.block_ping(hashlist)
+        for i in missing:
+            with self._get_rear_block(hashlist[i], 1) as rbl:
+                 rbl.sync_write(blocklist[i]) #XXX: verify?
+
+        return hashlist, missing
+
+    def block_delta(self, blkhash, offdata=()):
+        """Construct and store a new block from a given block
+           and a list of (offset, data) 'patches'. Return:
+           (the hash of the new block, if the block already existed)
+        """
+        if not offdata:
+            return None, None
+
+        blocksize = self.blocksize
+        block = self.block_retr((blkhash,))
+        if not block:
+            return None, None
+
+        block = block[0]
+        newblock = ''
+        idx = 0
+        size = 0
+        trunc = 0
+        for off, data in offdata:
+            if not data:
+                trunc = 1
+                break
+            newblock += block[idx:off] + data
+            size += off - idx + len(data)
+            if size >= blocksize:
+                break
+            off = size
+
+        if not trunc:
+            newblock += block[size:len(block)]
+
+        h, a = self.block_stor((newblock,))
+        return h[0], 1 if a else 0
+
+    def block_hash_file(self, openfile):
+        """Return the list of hashes (hashes map)
+           for the blocks in a buffered file.
+           Helper method, does not affect store.
+        """
+        hashes = []
+        append = hashes.append
+        block_hash = self.block_hash
+
+        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+            append(block_hash(block))
+
+        return hashes
+
+    def block_stor_file(self, openfile):
+        """Read blocks from buffered file object and store them. Return:
+           (bytes read, list of hashes, list of hashes that were missing)
+        """
+        blocksize = self.blocksize
+        block_stor = self.block_stor
+        hashlist = []
+        hextend = hashlist.extend
+        storedlist = []
+        sextend = storedlist.extend
+        lastsize = 0
+
+        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+            hl, sl = block_stor((block,))
+            hextend(hl)
+            sextend(sl)
+            lastsize = len(block)
+
+        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+        return size, hashlist, storedlist
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py
new file mode 100644 (file)
index 0000000..0a16b2a
--- /dev/null
@@ -0,0 +1,191 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+    global _zeros
+    size = len(_zeros)
+    if nr == size:
+        return _zeros
+
+    if nr > size:
+        _zeros += '\0' * (nr - size)
+        return _zeros
+
+    if nr < size:
+        _zeros = _zeros[:nr]
+        return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+    """Write given chunks to the given buffered file object.
+       Writes never span across chunk boundaries.
+       If size is given stop after or pad until size bytes have been written.
+    """
+    fwrite = openfile.write
+    seek = openfile.seek
+    padding = 0
+
+    try:
+        seek(offset * chunksize)
+    except IOError, e:
+        seek = None
+        for x in xrange(offset):
+            fwrite(zeros(chunksize))
+
+    cursize = offset * chunksize
+
+    for chunk in chunks:
+        if padding:
+            if seek:
+                seek(padding -1, SEEK_CUR)
+                fwrite("\x00")
+            else:
+                fwrite(buffer(zeros(chunksize), 0, padding))
+        if size is not None and cursize + chunksize >= size:
+            chunk = chunk[:chunksize - (cursize - size)]
+            fwrite(chunk)
+            cursize += len(chunk)
+            break
+        fwrite(chunk)
+        padding = chunksize - len(chunk)
+
+    padding = size - cursize if size is not None else 0
+    if padding <= 0:
+        return
+
+    q, r = divmod(padding, chunksize)
+    for x in xrange(q):
+        fwrite(zeros(chunksize))
+    fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+    """Read and yield groups of chunks from a buffered file object at offset.
+       Reads never span accros chunksize boundaries.
+    """
+    fread = openfile.read
+    remains = offset * chunksize
+    seek = openfile.seek
+    try:
+        seek(remains)
+    except IOError, e:
+        seek = None
+        while 1:
+            s = fread(remains)
+            remains -= len(s)
+            if remains <= 0:
+                break
+
+    while nr:
+        remains = chunksize
+        chunk = ''
+        while 1:
+            s = fread(remains)
+            if not s:
+                if chunk:
+                    yield chunk
+                return
+            chunk += s
+            remains -= len(s)
+            if remains <= 0:
+                break
+        yield chunk
+        nr -= 1
+
+
+class ContextFile(object):
+    __slots__ = ("name", "fdesc", "create")
+
+    def __init__(self, name, create=0):
+        self.name = name
+        self.fdesc = None
+        self.create = create
+        #self.dirty = 0
+
+    def __enter__(self):
+        name = self.name
+        try:
+            fdesc = open(name, 'rb+')
+        except IOError, e:
+            if not self.create or e.errno != ENOENT:
+                raise
+            fdesc = open(name, 'w+')
+
+        self.fdesc = fdesc
+        return self
+
+    def __exit__(self, exc, arg, trace):
+        fdesc = self.fdesc
+        if fdesc is not None:
+            #if self.dirty:
+            #    fsync(fdesc.fileno())
+            fdesc.close()
+        return False # propagate exceptions
+
+    def seek(self, offset, whence=SEEK_SET):
+        return self.fdesc.seek(offset, whence)
+
+    def tell(self):
+        return self.fdesc.tell()
+
+    def truncate(self, size):
+        self.fdesc.truncate(size)
+
+    def sync_write(self, data):
+        #self.dirty = 1
+        self.fdesc.write(data)
+
+    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+        #self.dirty = 1
+        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+    def sync_read(self, size):
+        read = self.fdesc.read
+        data = ''
+        while 1:
+            s = read(size)
+            if not s:
+                break
+            data += s
+        return data
+
+    def sync_read_chunks(self, chunksize, nr, offset=0):
+        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py
new file mode 100644 (file)
index 0000000..d9caaee
--- /dev/null
@@ -0,0 +1,102 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from context_file import ContextFile
+
+
+class Mapper(object):
+    """Mapper.
+       Required contstructor parameters: mappath, namelen.
+    """
+    
+    mappath = None
+    namelen = None
+
+    def __init__(self, **params):
+        self.params = params
+        self.namelen = params['namelen']
+        mappath = realpath(params['mappath'])
+        if not isdir(mappath):
+            if not exists(mappath):
+                makedirs(mappath)
+            else:
+                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+        self.mappath = mappath
+
+    def _get_rear_map(self, name, create=0):
+        name = join(self.mappath, hex(int(name)))
+        return ContextFile(name, create)
+
+    def _delete_rear_map(self, name):
+        name = join(self.mappath, hex(int(name)))
+        try:
+            unlink(name)
+            return 1
+        except OSError, e:
+            if e.errno != ENOENT:
+                raise
+        return 0
+
+    def map_retr(self, name, blkoff=0, nr=100000000000000):
+        """Return as a list, part of the hashes map of an object
+           at the given block offset.
+           By default, return the whole hashes map.
+        """
+        namelen = self.namelen
+        hashes = ()
+
+        with self._get_rear_map(name, 0) as rmap:
+            if rmap:
+                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+        return hashes
+
+    def map_stor(self, name, hashes=(), blkoff=0, create=1):
+        """Store hashes in the given hashes map, replacing the old ones."""
+        namelen = self.namelen
+        with self._get_rear_map(name, 1) as rmap:
+            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+#     def map_copy(self, src, dst):
+#         """Copy a hashes map to another one, replacing it."""
+#         with self._get_rear_map(src, 0) as rmap:
+#             if rmap:
+#                 rmap.copy_to(dst)
+
+    def map_remv(self, name):
+        """Remove a hashes map. Returns true if the map was found and removed."""
+        return self._delete_rear_map(name)
+
diff --git a/pithos/backends/lib_alchemy/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/mapper.py
new file mode 100644 (file)
index 0000000..825bc9c
--- /dev/null
@@ -0,0 +1,102 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from context_file import ContextFile
+
+
+class Mapper(object):
+    """Mapper.
+       Required contstructor parameters: mappath, namelen.
+    """
+    
+    mappath = None
+    namelen = None
+
+    def __init__(self, **params):
+        self.params = params
+        self.namelen = params['namelen']
+        mappath = realpath(params['mappath'])
+        if not isdir(mappath):
+            if not exists(mappath):
+                makedirs(mappath)
+            else:
+                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+        self.mappath = mappath
+
+    def get_rear_map(self, name, create=0):
+        name = join(self.mappath, hex(int(name)))
+        return ContextFile(name, create)
+
+    def delete_rear_map(self, name):
+        name = join(self.mappath, hex(int(name)))
+        try:
+            unlink(name)
+            return 1
+        except OSError, e:
+            if e.errno != ENOENT:
+                raise
+        return 0
+
+    def map_retr(self, name, blkoff=0, nr=100000000000000):
+        """Return as a list, part of the hashes map of an object
+           at the given block offset.
+           By default, return the whole hashes map.
+        """
+        namelen = self.namelen
+        hashes = ()
+
+        with self.get_rear_map(name, 0) as rmap:
+            if rmap:
+                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+        return hashes
+
+    def map_stor(self, name, hashes=(), blkoff=0, create=1):
+        """Store hashes in the given hashes map, replacing the old ones."""
+        namelen = self.namelen
+        with self.get_rear_map(name, 1) as rmap:
+            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+#     def map_copy(self, src, dst):
+#         """Copy a hashes map to another one, replacing it."""
+#         with self.get_rear_map(src, 0) as rmap:
+#             if rmap:
+#                 rmap.copy_to(dst)
+
+    def map_remv(self, name):
+        """Remove a hashes map. Returns true if the map was found and removed."""
+        return self.delete_rear_map(name)
+
diff --git a/pithos/backends/lib_alchemy/node.py b/pithos/backends/lib_alchemy/node.py
new file mode 100644 (file)
index 0000000..3deac74
--- /dev/null
@@ -0,0 +1,759 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from time import time
+from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey
+from sqlalchemy.schema import Index
+
+from dbworker import DBWorker
+
+
+ROOTNODE  = 0
+
+( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
+
+inf = float('inf')
+
+
+def strnextling(prefix):
+    """Return the first unicode string
+       greater than but not starting with given prefix.
+       strnextling('hello') -> 'hellp'
+    """
+    if not prefix:
+        ## all strings start with the null string,
+        ## therefore we have to approximate strnextling('')
+        ## with the last unicode character supported by python
+        ## 0x10ffff for wide (32-bit unicode) python builds
+        ## 0x00ffff for narrow (16-bit unicode) python builds
+        ## We will not autodetect. 0xffff is safe enough.
+        return unichr(0xffff)
+    s = prefix[:-1]
+    c = ord(prefix[-1])
+    if c >= 0xffff:
+        raise RuntimeError
+    s += unichr(c+1)
+    return s
+
+def strprevling(prefix):
+    """Return an approximation of the last unicode string
+       less than but not starting with given prefix.
+       strprevling(u'hello') -> u'helln\\xffff'
+    """
+    if not prefix:
+        ## There is no prevling for the null string
+        return prefix
+    s = prefix[:-1]
+    c = ord(prefix[-1])
+    if c > 0:
+        s += unichr(c-1) + unichr(0xffff)
+    return s
+
+
+_propnames = {
+    'serial'    : 0,
+    'node'      : 1,
+    'size'      : 2,
+    'source'    : 3,
+    'mtime'     : 4,
+    'muser'     : 5,
+    'cluster'   : 6,
+}
+
+
+class Node(DBWorker):
+    """Nodes store path organization and have multiple versions.
+       Versions store object history and have multiple attributes.
+       Attributes store metadata.
+    """
+    
+    # TODO: Provide an interface for included and excluded clusters.
+    
+    def __init__(self, **params):
+        DBWorker.__init__(self, **params)
+        metadata = MetaData()
+        
+        #create nodes table
+        columns=[]
+        columns.append(Column('node', Integer, primary_key=True))
+        columns.append(Column('parent', Integer,
+                              ForeignKey('nodes.node',
+                                         ondelete='CASCADE',
+                                         onupdate='CASCADE'),
+                              autoincrement=False, default=0))
+        #columns.append(Column('path', String(2048), default='', nullable=False))
+        columns.append(Column('path', String(255), default='', nullable=False))
+        self.nodes = Table('nodes', metadata, *columns)
+        # place an index on path
+        Index('idx_nodes_path', self.nodes.c.path, unique=True)
+        
+        #create statistics table
+        columns=[]
+        columns.append(Column('node', Integer,
+                              ForeignKey('nodes.node',
+                                         ondelete='CASCADE',
+                                         onupdate='CASCADE'),
+                              primary_key=True))
+        columns.append(Column('population', Integer, nullable=False,
+                              autoincrement=False, default=0))
+        columns.append(Column('size', Integer, nullable=False,
+                              autoincrement=False, default=0))
+        columns.append(Column('mtime', Integer, autoincrement=False))
+        columns.append(Column('cluster', Integer, nullable=False,
+                              autoincrement=False, default=0, primary_key=True))
+        self.statistics = Table('statistics', metadata, *columns)
+        
+        #create versions table
+        columns=[]
+        columns.append(Column('serial', Integer, autoincrement=False,
+                              primary_key=True))
+        columns.append(Column('node', Integer,
+                              ForeignKey('nodes.node',
+                                         ondelete='CASCADE',
+                                         onupdate='CASCADE'),
+                              autoincrement=False))
+        columns.append(Column('size', Integer, nullable=False,
+                              autoincrement=False, default=0))
+        columns.append(Column('source', Integer, autoincrement=False))
+        columns.append(Column('mtime', Integer, autoincrement=False))
+        columns.append(Column('muser', String(255), nullable=False, default=''))
+        columns.append(Column('cluster', Integer, nullable=False,
+                              autoincrement=False, default=0))
+        self.versions = Table('versions', metadata, *columns)
+        # place an index on node
+        Index('idx_versions_node', self.versions.c.mtime)
+        # TODO: Sort out if more indexes are needed.
+        #Index('idx_versions_node', self.versions.c.node)
+        
+        #create attributes table
+        columns = []
+        columns.append(Column('serial', Integer,
+                              ForeignKey('versions.serial',
+                                         ondelete='CASCADE',
+                                         onupdate='CASCADE'),
+                              autoincrement=False,
+                              primary_key=True))
+        columns.append(Column('key', String(255), primary_key=True))
+        columns.append(Column('value', String(255)))
+        self.attributes = Table('attributes', metadata, *columns)
+        
+        metadata.create_all(self.engine)
+        
+        s = self.nodes.insert(node=ROOTNODE, parent=ROOTNODE)
+        self.conn.execute(s)
+    
+    def node_create(self, parent, path):
+        """Create a new node from the given properties.
+           Return the node identifier of the new node.
+        """
+        
+        q = ("insert into nodes (parent, path) "
+             "values (?, ?)")
+        props = (parent, path)
+        return self.execute(q, props).lastrowid
+    
+    def node_lookup(self, path):
+        """Lookup the current node of the given path.
+           Return None if the path is not found.
+        """
+        
+        q = "select node from nodes where path = ?"
+        self.execute(q, (path,))
+        r = self.fetchone()
+        if r is not None:
+            return r[0]
+        return None
+    
+    def node_get_properties(self, node):
+        """Return the node's (parent, path).
+           Return None if the node is not found.
+        """
+        
+        q = "select parent, path from nodes where node = ?"
+        self.execute(q, (node,))
+        return self.fetchone()
+    
+    def node_get_versions(self, node, keys=(), propnames=_propnames):
+        """Return the properties of all versions at node.
+           If keys is empty, return all properties in the order
+           (serial, node, size, source, mtime, muser, cluster).
+        """
+        
+        q = ("select serial, node, size, source, mtime, muser, cluster "
+             "from versions "
+             "where node = ?")
+        self.execute(q, (node,))
+        r = self.fetchall()
+        if r is None:
+            return r
+        
+        if not keys:
+            return r
+        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
+    
+    def node_count_children(self, node):
+        """Return node's child count."""
+        
+        q = "select count(node) from nodes where parent = ? and node != 0"
+        self.execute(q, (node,))
+        r = self.fetchone()
+        if r is None:
+            return 0
+        return r[0]
+    
+    def node_purge_children(self, parent, before=inf, cluster=0):
+        """Delete all versions with the specified
+           parent and cluster, and return
+           the serials of versions deleted.
+           Clears out nodes with no remaining versions.
+        """
+        
+        execute = self.execute
+        q = ("select count(serial), sum(size) from versions "
+             "where node in (select node "
+                            "from nodes "
+                            "where parent = ?) "
+             "and cluster = ? "
+             "and mtime <= ?")
+        args = (parent, cluster, before)
+        execute(q, args)
+        nr, size = self.fetchone()
+        if not nr:
+            return ()
+        mtime = time()
+        self.statistics_update(parent, -nr, -size, mtime, cluster)
+        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
+        
+        q = ("select serial from versions "
+             "where node in (select node "
+                            "from nodes "
+                            "where parent = ?) "
+             "and cluster = ? "
+             "and mtime <= ?")
+        execute(q, args)
+        serials = [r[SERIAL] for r in self.fetchall()]
+        q = ("delete from versions "
+             "where node in (select node "
+                            "from nodes "
+                            "where parent = ?) "
+             "and cluster = ? "
+             "and mtime <= ?")
+        execute(q, args)
+        q = ("delete from nodes "
+             "where node in (select node from nodes n "
+                            "where (select count(serial) "
+                                   "from versions "
+                                   "where node = n.node) = 0 "
+                            "and parent = ?)")
+        execute(q, (parent,))
+        return serials
+    
+    def node_purge(self, node, before=inf, cluster=0):
+        """Delete all versions with the specified
+           node and cluster, and return
+           the serials of versions deleted.
+           Clears out the node if it has no remaining versions.
+        """
+        
+        execute = self.execute
+        q = ("select count(serial), sum(size) from versions "
+             "where node = ? "
+             "and cluster = ? "
+             "and mtime <= ?")
+        args = (node, cluster, before)
+        execute(q, args)
+        nr, size = self.fetchone()
+        if not nr:
+            return ()
+        mtime = time()
+        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
+        
+        q = ("select serial from versions "
+             "where node = ? "
+             "and cluster = ? "
+             "and mtime <= ?")
+        execute(q, args)
+        serials = [r[SERIAL] for r in self.fetchall()]
+        q = ("delete from versions "
+             "where node = ? "
+             "and cluster = ? "
+             "and mtime <= ?")
+        execute(q, args)
+        q = ("delete from nodes "
+             "where node in (select node from nodes n "
+                            "where (select count(serial) "
+                                   "from versions "
+                                   "where node = n.node) = 0 "
+                            "and node = ?)")
+        execute(q, (node,))
+        return serials
+    
+    def node_remove(self, node):
+        """Remove the node specified.
+           Return false if the node has children or is not found.
+        """
+        
+        if self.node_count_children(node):
+            return False
+        
+        mtime = time()
+        q = ("select count(serial), sum(size), cluster "
+             "from versions "
+             "where node = ? "
+             "group by cluster")
+        self.execute(q, (node,))
+        for population, size, cluster in self.fetchall():
+            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
+        
+        q = "delete from nodes where node = ?"
+        self.execute(q, (node,))
+        return True
+    
+    def statistics_get(self, node, cluster=0):
+        """Return population, total size and last mtime
+           for all versions under node that belong to the cluster.
+        """
+        
+        q = ("select population, size, mtime from statistics "
+             "where node = ? and cluster = ?")
+        self.execute(q, (node, cluster))
+        return self.fetchone()
+    
+    def statistics_update(self, node, population, size, mtime, cluster=0):
+        """Update the statistics of the given node.
+           Statistics keep track the population, total
+           size of objects and mtime in the node's namespace.
+           May be zero or positive or negative numbers.
+        """
+        
+        qs = ("select population, size from statistics "
+              "where node = ? and cluster = ?")
+        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
+              "values (?, ?, ?, ?, ?)")
+        self.execute(qs, (node, cluster))
+        r = self.fetchone()
+        if r is None:
+            prepopulation, presize = (0, 0)
+        else:
+            prepopulation, presize = r
+        population += prepopulation
+        size += presize
+        self.execute(qu, (node, population, size, mtime, cluster))
+    
+    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
+        """Update the statistics of the given node's parent.
+           Then recursively update all parents up to the root.
+           Population is not recursive.
+        """
+        
+        while True:
+            if node == 0:
+                break
+            props = self.node_get_properties(node)
+            if props is None:
+                break
+            parent, path = props
+            self.statistics_update(parent, population, size, mtime, cluster)
+            node = parent
+            population = 0 # Population isn't recursive
+    
+    def statistics_latest(self, node, before=inf, except_cluster=0):
+        """Return population, total size and last mtime
+           for all latest versions under node that
+           do not belong to the cluster.
+        """
+        
+        execute = self.execute
+        fetchone = self.fetchone
+        
+        # The node.
+        props = self.node_get_properties(node)
+        if props is None:
+            return None
+        parent, path = props
+        
+        # The latest version.
+        q = ("select serial, node, size, source, mtime, muser, cluster "
+             "from versions "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = ? and mtime < ?) "
+             "and cluster != ?")
+        execute(q, (node, before, except_cluster))
+        props = fetchone()
+        if props is None:
+            return None
+        mtime = props[MTIME]
+        
+        # First level, just under node (get population).
+        q = ("select count(serial), sum(size), max(mtime) "
+             "from versions v "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = v.node and mtime < ?) "
+             "and cluster != ? "
+             "and node in (select node "
+                          "from nodes "
+                          "where parent = ?)")
+        execute(q, (before, except_cluster, node))
+        r = fetchone()
+        if r is None:
+            return None
+        count = r[0]
+        mtime = max(mtime, r[2])
+        if count == 0:
+            return (0, 0, mtime)
+        
+        # All children (get size and mtime).
+        # XXX: This is why the full path is stored.
+        q = ("select count(serial), sum(size), max(mtime) "
+             "from versions v "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = v.node and mtime < ?) "
+             "and cluster != ? "
+             "and node in (select node "
+                          "from nodes "
+                          "where path like ?)")
+        execute(q, (before, except_cluster, path + '%'))
+        r = fetchone()
+        if r is None:
+            return None
+        size = r[1] - props[SIZE]
+        mtime = max(mtime, r[2])
+        return (count, size, mtime)
+    
+    def version_create(self, node, size, source, muser, cluster=0):
+        """Create a new version from the given properties.
+           Return the (serial, mtime) of the new version.
+        """
+        
+        q = ("insert into versions (node, size, source, mtime, muser, cluster) "
+             "values (?, ?, ?, ?, ?, ?)")
+        mtime = time()
+        props = (node, size, source, mtime, muser, cluster)
+        serial = self.execute(q, props).lastrowid
+        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+        return serial, mtime
+    
+    def version_lookup(self, node, before=inf, cluster=0):
+        """Lookup the current version of the given node.
+           Return a list with its properties:
+           (serial, node, size, source, mtime, muser, cluster)
+           or None if the current version is not found in the given cluster.
+        """
+        
+        q = ("select serial, node, size, source, mtime, muser, cluster "
+             "from versions "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = ? and mtime < ?) "
+             "and cluster = ?")
+        self.execute(q, (node, before, cluster))
+        props = self.fetchone()
+        if props is not None:
+            return props
+        return None
+    
+    def version_get_properties(self, serial, keys=(), propnames=_propnames):
+        """Return a sequence of values for the properties of
+           the version specified by serial and the keys, in the order given.
+           If keys is empty, return all properties in the order
+           (serial, node, size, source, mtime, muser, cluster).
+        """
+        
+        q = ("select serial, node, size, source, mtime, muser, cluster "
+             "from versions "
+             "where serial = ?")
+        self.execute(q, (serial,))
+        r = self.fetchone()
+        if r is None:
+            return r
+        
+        if not keys:
+            return r
+        return [r[propnames[k]] for k in keys if k in propnames]
+    
+    def version_recluster(self, serial, cluster):
+        """Move the version into another cluster."""
+        
+        props = self.version_get_properties(serial)
+        if not props:
+            return
+        node = props[NODE]
+        size = props[SIZE]
+        oldcluster = props[CLUSTER]
+        if cluster == oldcluster:
+            return
+        
+        mtime = time()
+        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
+        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+        
+        q = "update versions set cluster = ? where serial = ?"
+        self.execute(q, (cluster, serial))
+    
+    def version_remove(self, serial):
+        """Remove the serial specified."""
+        
+        props = self.node_get_properties(serial)
+        if not props:
+            return
+        node = props[NODE]
+        size = props[SIZE]
+        cluster = props[CLUSTER]
+        
+        mtime = time()
+        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
+        
+        q = "delete from versions where serial = ?"
+        self.execute(q, (serial,))
+        return True
+    
+    def attribute_get(self, serial, keys=()):
+        """Return a list of (key, value) pairs of the version specified by serial.
+           If keys is empty, return all attributes.
+           Othwerise, return only those specified.
+        """
+        
+        execute = self.execute
+        if keys:
+            marks = ','.join('?' for k in keys)
+            q = ("select key, value from attributes "
+                 "where key in (%s) and serial = ?" % (marks,))
+            execute(q, keys + (serial,))
+        else:
+            q = "select key, value from attributes where serial = ?"
+            execute(q, (serial,))
+        return self.fetchall()
+    
+    def attribute_set(self, serial, items):
+        """Set the attributes of the version specified by serial.
+           Receive attributes as an iterable of (key, value) pairs.
+        """
+        
+        q = ("insert or replace into attributes (serial, key, value) "
+             "values (?, ?, ?)")
+        self.executemany(q, ((serial, k, v) for k, v in items))
+    
+    def attribute_del(self, serial, keys=()):
+        """Delete attributes of the version specified by serial.
+           If keys is empty, delete all attributes.
+           Otherwise delete those specified.
+        """
+        
+        if keys:
+            q = "delete from attributes where serial = ? and key = ?"
+            self.executemany(q, ((serial, key) for key in keys))
+        else:
+            q = "delete from attributes where serial = ?"
+            self.execute(q, (serial,))
+    
+    def attribute_copy(self, source, dest):
+        q = ("insert or replace into attributes "
+             "select ?, key, value from attributes "
+             "where serial = ?")
+        self.execute(q, (dest, source))
+    
+    def _construct_filters(self, filterq):
+        if not filterq:
+            return None, None
+        
+        args = filterq.split(',')
+        subq = " and a.key in ("
+        subq += ','.join(('?' for x in args))
+        subq += ")"
+        
+        return subq, args
+    
+    def _construct_paths(self, pathq):
+        if not pathq:
+            return None, None
+        
+        subq = " and ("
+        subq += ' or '.join(('n.path like ?' for x in pathq))
+        subq += ")"
+        args = tuple([x + '%' for x in pathq])
+        
+        return subq, args
+    
+    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
+        """Return a list with all keys pairs defined
+           for all latest versions under parent that
+           do not belong to the cluster.
+        """
+        
+        # TODO: Use another table to store before=inf results.
+        q = ("select distinct a.key "
+             "from attributes a, versions v, nodes n "
+             "where v.serial = (select max(serial) "
+                              "from versions "
+                              "where node = v.node and mtime < ?) "
+             "and v.cluster != ? "
+             "and v.node in (select node "
+                           "from nodes "
+                           "where parent = ?) "
+             "and a.serial = v.serial "
+             "and n.node = v.node")
+        args = (before, except_cluster, parent)
+        subq, subargs = self._construct_paths(pathq)
+        if subq is not None:
+            q += subq
+            args += subargs
+        self.execute(q, args)
+        return [r[0] for r in self.fetchall()]
+    
+    def latest_version_list(self, parent, prefix='', delimiter=None,
+                            start='', limit=10000, before=inf,
+                            except_cluster=0, pathq=[], filterq=None):
+        """Return a (list of (path, serial) tuples, list of common prefixes)
+           for the current versions of the paths with the given parent,
+           matching the following criteria.
+           
+           The property tuple for a version is returned if all
+           of these conditions are true:
+                
+                a. parent matches
+                
+                b. path > start
+                
+                c. path starts with prefix (and paths in pathq)
+                
+                d. version is the max up to before
+                
+                e. version is not in cluster
+                
+                f. the path does not have the delimiter occuring
+                   after the prefix, or ends with the delimiter
+                
+                g. serial matches the attribute filter query.
+                   
+                   A filter query is a comma-separated list of
+                   terms in one of these three forms:
+                   
+                   key
+                       an attribute with this key must exist
+                   
+                   !key
+                       an attribute with this key must not exist
+                   
+                   key ?op value
+                       the attribute with this key satisfies the value
+                       where ?op is one of ==, != <=, >=, <, >.
+           
+           The list of common prefixes includes the prefixes
+           matching up to the first delimiter after prefix,
+           and are reported only once, as "virtual directories".
+           The delimiter is included in the prefixes.
+           
+           If arguments are None, then the corresponding matching rule
+           will always match.
+           
+           Limit applies to the first list of tuples returned.
+        """
+        
+        execute = self.execute
+        
+        if not start or start < prefix:
+            start = strprevling(prefix)
+        nextling = strnextling(prefix)
+        
+        q = ("select distinct n.path, v.serial "
+             "from attributes a, versions v, nodes n "
+             "where v.serial = (select max(serial) "
+                              "from versions "
+                              "where node = v.node and mtime < ?) "
+             "and v.cluster != ? "
+             "and v.node in (select node "
+                           "from nodes "
+                           "where parent = ?) "
+             "and a.serial = v.serial "
+             "and n.node = v.node "
+             "and n.path > ? and n.path < ?")
+        args = [before, except_cluster, parent, start, nextling]
+        
+        subq, subargs = self._construct_paths(pathq)
+        if subq is not None:
+            q += subq
+            args += subargs
+        subq, subargs = self._construct_filters(filterq)
+        if subq is not None:
+            q += subq
+            args += subargs
+        else:
+            q = q.replace("attributes a, ", "")
+            q = q.replace("and a.serial = v.serial ", "")
+        q += " order by n.path"
+        
+        if not delimiter:
+            q += " limit ?"
+            args.append(limit)
+            execute(q, args)
+            return self.fetchall(), ()
+        
+        pfz = len(prefix)
+        dz = len(delimiter)
+        count = 0
+        fetchone = self.fetchone
+        prefixes = []
+        pappend = prefixes.append
+        matches = []
+        mappend = matches.append
+        
+        execute(q, args)
+        while True:
+            props = fetchone()
+            if props is None:
+                break
+            path, serial = props
+            idx = path.find(delimiter, pfz)
+            
+            if idx < 0:
+                mappend(props)
+                count += 1
+                if count >= limit:
+                    break
+                continue
+            
+            pf = path[:idx + dz]
+            pappend(pf)
+            if idx + dz == len(path):
+                mappend(props)
+                count += 1
+            if count >= limit: 
+                break
+            
+            args[3] = strnextling(pf) # New start.
+            execute(q, args)
+        
+        return matches, prefixes
diff --git a/pithos/backends/lib_alchemy/permissions.py b/pithos/backends/lib_alchemy/permissions.py
new file mode 100644 (file)
index 0000000..1dea22f
--- /dev/null
@@ -0,0 +1,149 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from sqlalchemy.sql import select, literal
+from sqlalchemy.sql.expression import join
+
+from xfeatures import XFeatures
+from groups import Groups
+from public import Public
+
+
+READ = 0
+WRITE = 1
+
+
+class Permissions(XFeatures, Groups, Public):
+    
+    def __init__(self, **params):
+        XFeatures.__init__(self, **params)
+        Groups.__init__(self, **params)
+        Public.__init__(self, **params)
+    
+    def access_grant(self, path, access, members=()):
+        """Grant members with access to path.
+           Members can also be '*' (all),
+           or some group specified as 'owner:group'."""
+        
+        if not members:
+            return
+        feature = self.xfeature_create(path)
+        if feature is None:
+            return
+        self.feature_setmany(feature, access, members)
+    
+    def access_set(self, path, permissions):
+        """Set permissions for path. The permissions dict
+           maps 'read', 'write' keys to member lists."""
+        
+        self.xfeature_destroy(path)
+        self.access_grant(path, READ, permissions.get('read', []))
+        self.access_grant(path, WRITE, permissions.get('write', []))
+    
+    def access_clear(self, path):
+        """Revoke access to path (both permissions and public)."""
+        
+        self.xfeature_destroy(path)
+        self.public_unset(path)
+    
+    def access_check(self, path, access, member):
+        """Return true if the member has this access to the path."""
+        
+        if access == READ and self.public_check(path):
+            return True
+        
+        r = self.xfeature_inherit(path)
+        if not r:
+            return False
+        fpath, feature = r
+        members = self.feature_get(feature, access)
+        if member in members or '*' in members:
+            return True
+        for owner, group in self.group_parents(member):
+            if owner + ':' + group in members:
+                return True
+        return False
+    
+    def access_inherit(self, path):
+        """Return the inherited or assigned (path, permissions) pair for path."""
+        
+        r = self.xfeature_inherit(path)
+        if not r:
+            return (path, {})
+        fpath, feature = r
+        permissions = self.feature_dict(feature)
+        if READ in permissions:
+            permissions['read'] = permissions[READ]
+            del(permissions[READ])
+        if WRITE in permissions:
+            permissions['write'] = permissions[WRITE]
+            del(permissions[WRITE])
+        return (fpath, permissions)
+    
+    def access_list(self, path):
+        """List all permission paths inherited by or inheriting from path."""
+        
+        return [x[0] for x in self.xfeature_list(path) if x[0] != path]
+    
+    def access_list_paths(self, member, prefix=None):
+        """Return the list of paths granted to member."""
+        
+        xfeatures_xfeaturevals =  self.xfeatures.join(self.xfeaturevals)
+        
+        selectable = (self.groups.c.owner + ':' + self.groups.c.name)
+        member_groups = select([selectable.label('value')],
+            self.groups.c.member == member)
+        
+        members = select([literal(member).label('value')])
+        
+        extended_member_groups = member_groups.union(members).alias()
+        inner_join = join(xfeatures_xfeaturevals,
+                    extended_member_groups,
+                    self.xfeaturevals.c.value == extended_member_groups.c.value)
+        s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
+        if prefix:
+            s = s.where(self.xfeatures.c.path.like(prefix + '%'))
+        r = self.conn.execute(s)
+        l = [row[0] for row in r.fetchall()]
+        r.close()
+        return l
+    
+    def access_list_shared(self, prefix=''):
+        """Return the list of shared paths."""
+        
+        s = select([self.xfeatures.c.path],
+            self.xfeatures.c.path.like(prefix + '%'))
+        r = self.conn.execute(s)
+        l = [row[0] for row in r.fetchall()]
+        r.close()
+        return l
\ No newline at end of file
diff --git a/pithos/backends/lib_alchemy/policy.py b/pithos/backends/lib_alchemy/policy.py
new file mode 100644 (file)
index 0000000..e6dbd39
--- /dev/null
@@ -0,0 +1,71 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select
+from dbworker import DBWorker
+
+
+class Policy(DBWorker):
+    """Paths can be assigned key, value pairs, representing policy."""
+    
+    def __init__(self, **params):
+        DBWorker.__init__(self, **params)
+        metadata = MetaData()
+        columns=[]
+        #columns.append(Column('path', String(2048), primary_key=True))
+        columns.append(Column('path', String(255), primary_key=True))
+        columns.append(Column('key', String(255), primary_key=True))
+        columns.append(Column('value', String(255)))
+        self.policies = Table('policy', metadata, *columns)
+        metadata.create_all(self.engine)
+        metadata.bind = self.engine
+    
+    def policy_set(self, path, policy):
+        s = self.policies.insert()
+        values = [{'path':path, 'key':k, 'value':v} for k,v in policy.iteritems()]
+        r = self.conn.execute(s, values)
+        r.close()
+    
+    def policy_unset(self, path):
+        s = self.policies.delete().where(self.policies.c.path==path)
+        r = self.conn.execute(s)
+        r.close()
+    
+    def policy_get(self, path):
+        s = select([self.policies.c.key, self.policies.c.value],
+            self.policies.c.path==self.policies.c.path==path)
+        r = self.conn.execute(s)
+        d = dict(r.fetchall())
+        r.close()
+        return d
\ No newline at end of file
diff --git a/pithos/backends/lib_alchemy/public.py b/pithos/backends/lib_alchemy/public.py
new file mode 100644 (file)
index 0000000..63aeebe
--- /dev/null
@@ -0,0 +1,67 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from dbworker import DBWorker
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select
+
+class Public(DBWorker):
+    """Paths can be marked as public."""
+    
+    def __init__(self, **params):
+        DBWorker.__init__(self, **params)
+        metadata = MetaData()
+        columns=[]
+        #columns.append(Column('path', String(2048), primary_key=True))
+        columns.append(Column('path', String(255), primary_key=True))
+        self.public = Table('public', metadata, *columns)
+        metadata.create_all(self.engine)
+        metadata.bind = self.engine
+    
+    
+    def public_set(self, path):
+        s = self.public.insert()
+        r = self.conn.execute(s, path = path)
+        r.close()
+    
+    def public_unset(self, path):
+        s = self.public.delete().where(self.public.c.path == path)
+        r = self.conn.execute(s)
+        r.close()
+    
+    def public_check(self, path):
+        s = select([self.public.c.path], self.public.c.path == path)
+        r = self.conn.execute(s)
+        l = r.fetchone()
+        r.close()
+        return bool(l)
\ No newline at end of file
diff --git a/pithos/backends/lib_alchemy/xfeatures.py b/pithos/backends/lib_alchemy/xfeatures.py
new file mode 100644 (file)
index 0000000..8ff0d5c
--- /dev/null
@@ -0,0 +1,197 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from collections import defaultdict
+from sqlalchemy import Table, Column, String, Integer, MetaData, ForeignKey
+from sqlalchemy.sql import select, and_
+from sqlalchemy.schema import Index
+from sqlalchemy.sql.expression import desc
+
+from dbworker import DBWorker
+
+
+class XFeatures(DBWorker):
+    """XFeatures are path properties that allow non-nested
+       inheritance patterns. Currently used for storing permissions.
+    """
+    
+    def __init__(self, **params):
+        DBWorker.__init__(self, **params)
+        metadata = MetaData()
+        columns=[]
+        columns.append(Column('feature_id', Integer, primary_key=True))
+        columns.append(Column('path', String(2048)))
+        self.xfeatures = Table('xfeatures', metadata, *columns)
+        # place an index on path
+        Index('idx_features_path', self.xfeatures.c.path)
+        
+        columns=[]
+        columns.append(Column('feature_id', Integer,
+                              ForeignKey('xfeatures.feature_id',
+                                         ondelete='CASCADE'),
+                              primary_key=True))
+        columns.append(Column('key', Integer, autoincrement=False,
+                              primary_key=True))
+        columns.append(Column('value', String(255), primary_key=True))
+        self.xfeaturevals = Table('xfeaturevals', metadata, *columns)
+        
+        metadata.create_all(self.engine)
+        metadata.bind = self.engine
+    
+    def xfeature_inherit(self, path):
+        """Return the (path, feature) inherited by the path, or None."""
+        
+        s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id])
+        s = s.where(self.xfeatures.c.path <= path)
+        s = s.order_by(desc(self.xfeatures.c.path)).limit(1)
+        r = self.conn.execute(s)
+        row = r.fetchone()
+        r.close()
+        if row and path.startswith(row[0]):
+            return row
+        else:
+            return None
+    
+    def xfeature_list(self, path):
+        """Return the list of the (prefix, feature) pairs matching path.
+           A prefix matches path if either the prefix includes the path,
+           or the path includes the prefix.
+        """
+        
+        inherited = self.xfeature_inherit(path)
+        if inherited:
+            return [inherited]
+        
+        s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id])
+        s = s.where(and_(self.xfeatures.c.path.like(path + '%'),
+                     self.xfeatures.c.path != path))
+        s = s.order_by(self.xfeatures.c.path)
+        r = self.conn.execute(s)
+        l = r.fetchall()
+        r.close()
+        return l
+    
+    def xfeature_create(self, path):
+        """Create and return a feature for path.
+           If the path already inherits a feature or
+           bestows to paths already inheriting a feature,
+           create no feature and return None.
+           If the path has a feature, return it.
+        """
+        
+        prefixes = self.xfeature_list(path)
+        pl = len(prefixes)
+        if (pl > 1) or (pl == 1 and prefixes[0][0] != path):
+            return None
+        if pl == 1 and prefixes[0][0] == path:
+            return prefixes[0][1]
+        s = self.xfeatures.insert()
+        r = self.conn.execute(s, path=path)
+        inserted_primary_key = r.inserted_primary_key[0]
+        r.close()
+        return inserted_primary_key
+    
+    def xfeature_destroy(self, path):
+        """Destroy a feature and all its key, value pairs."""
+        
+        s = self.xfeatures.delete().where(self.xfeatures.c.path == path)
+        r = self.conn.execute(s)
+        r.close()
+    
+    def feature_dict(self, feature):
+        """Return a dict mapping keys to list of values for feature."""
+        
+        s = select([self.xfeaturevals.c.key, self.xfeaturevals.c.value])
+        s = s.where(self.xfeaturevals.c.feature_id == feature)
+        r = self.conn.execute(s)
+        d = defaultdict(list)
+        for key, value in r.fetchall():
+            d[key].append(value)
+        r.close()
+        return d
+    
+    def feature_set(self, feature, key, value):
+        """Associate a key, value pair with a feature."""
+        
+        s = self.xfeaturevals.insert()
+        r = self.conn.execute(s, feature_id=feature, key=key, value=value)
+        r.close()
+    
+    def feature_setmany(self, feature, key, values):
+        """Associate the given key, and values with a feature."""
+        
+        s = self.xfeaturevals.insert()
+        values = [{'feature_id':feature, 'key':key, 'value':v} for v in values]
+        r = self.conn.execute(s, values)
+        r.close()
+    
+    def feature_unset(self, feature, key, value):
+        """Disassociate a key, value pair from a feature."""
+        
+        s = self.xfeaturevals.delete()
+        s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+                     self.xfeaturevals.c.key == key,
+                     self.xfeaturevals.c.value == value))
+        r = self.conn.execute(s)
+        r.close()
+    
+    def feature_unsetmany(self, feature, key, values):
+        """Disassociate the key for the values given, from a feature."""
+        
+        for v in values:
+            conditional = and_(self.xfeaturevals.c.feature_id == feature,
+                               self.xfeaturevals.c.key == key,
+                               self.xfeaturevals.c.value == v)
+            s = self.xfeaturevals.delete().where(conditional)
+            r = self.conn.execute(s)
+            r.close()
+        
+    def feature_get(self, feature, key):
+        """Return the list of values for a key of a feature."""
+        
+        s = select([self.xfeaturevals.c.value])
+        s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+                     self.xfeaturevals.c.key == key))
+        r = self.conn.execute(s)
+        l = [row[0] for row in r.fetchall()]
+        r.close()
+        return l
+    
+    def feature_clear(self, feature, key):
+        """Delete all key, value pairs for a key of a feature."""
+        
+        s = self.xfeaturevals.delete()
+        s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+                     self.xfeaturevals.c.key == key))
+        r = self.conn.execute(s)
+        r.close()
diff --git a/pithos/backends/modular_alchemy.py b/pithos/backends/modular_alchemy.py
new file mode 100644 (file)
index 0000000..0ca00cc
--- /dev/null
@@ -0,0 +1,844 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+import os
+import time
+import sqlite3
+import logging
+import hashlib
+import binascii
+
+from base import NotAllowedError, BaseBackend
+from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
+from lib_alchemy.permissions import Permissions, READ, WRITE
+from lib_alchemy.policy import Policy
+from lib_alchemy.hashfiler import Mapper, Blocker
+from sqlalchemy import create_engine
+
+( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
+
+inf = float('inf')
+
+
+logger = logging.getLogger(__name__)
+
+def backend_method(func=None, autocommit=1):
+    if func is None:
+        def fn(func):
+            return backend_method(func, autocommit)
+        return fn
+
+    if not autocommit:
+        return func
+    def fn(self, *args, **kw):
+        self.con.execute('begin deferred')
+        try:
+            ret = func(self, *args, **kw)
+            self.con.commit()
+            return ret
+        except:
+            self.con.rollback()
+            raise
+    return fn
+
+
+class ModularBackend(BaseBackend):
+    """A modular backend.
+    
+    Uses modules for SQL functions and storage.
+    """
+    
+    def __init__(self, db):
+        self.hash_algorithm = 'sha256'
+        self.block_size = 4 * 1024 * 1024 # 4MB
+        
+        self.default_policy = {'quota': 0, 'versioning': 'auto'}
+        
+        basepath = os.path.split(db)[0]
+        if basepath and not os.path.exists(basepath):
+            os.makedirs(basepath)
+        if not os.path.isdir(basepath):
+            raise RuntimeError("Cannot open database at '%s'" % (db,))
+        
+        dbuser = 'pithos'
+        dbpass = 'archipelagos'
+        dbhost = '62.217.112.56'
+        dbname = 'pithosdb'
+        connection_str = 'mysql://%s:%s@%s/%s' %(dbuser, dbpass, dbhost, dbname)
+        engine = create_engine(connection_str, echo=True)
+        
+        params = {'blocksize': self.block_size,
+                  'blockpath': basepath + '/blocks',
+                  'hashtype': self.hash_algorithm}
+        self.blocker = Blocker(**params)
+        
+        params = {'mappath': basepath + '/maps',
+                  'namelen': self.blocker.hashlen}
+        self.mapper = Mapper(**params)
+        
+        params = {'connection': engine.connect(),
+                  'engine': engine}
+        self.permissions = Permissions(**params)
+        self.policy = Policy(**params)
+        self.node = Node(**params)
+        
+        self.con.commit()
+    
+    @backend_method
+    def list_accounts(self, user, marker=None, limit=10000):
+        """Return a list of accounts the user can access."""
+        
+        logger.debug("list_accounts: %s %s", user, marker, limit)
+        allowed = self._allowed_accounts(user)
+        start, limit = self._list_limits(allowed, marker, limit)
+        return allowed[start:start + limit]
+    
+    @backend_method
+    def get_account_meta(self, user, account, until=None):
+        """Return a dictionary with the account metadata."""
+        
+        logger.debug("get_account_meta: %s %s", account, until)
+        path, node = self._lookup_account(account, user == account)
+        if user != account:
+            if until or node is None or account not in self._allowed_accounts(user):
+                raise NotAllowedError
+        try:
+            props = self._get_properties(node, until)
+            mtime = props[MTIME]
+        except NameError:
+            props = None
+            mtime = until
+        count, bytes, tstamp = self._get_statistics(node, until)
+        tstamp = max(tstamp, mtime)
+        if until is None:
+            modified = tstamp
+        else:
+            modified = self._get_statistics(node)[2] # Overall last modification.
+            modified = max(modified, mtime)
+        
+        if user != account:
+            meta = {'name': account}
+        else:
+            meta = {}
+            if props is not None:
+                meta.update(dict(self.node.attribute_get(props[SERIAL])))
+            if until is not None:
+                meta.update({'until_timestamp': tstamp})
+            meta.update({'name': account, 'count': count, 'bytes': bytes})
+        meta.update({'modified': modified})
+        return meta
+    
+    @backend_method
+    def update_account_meta(self, user, account, meta, replace=False):
+        """Update the metadata associated with the account."""
+        
+        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+        if user != account:
+            raise NotAllowedError
+        path, node = self._lookup_account(account, True)
+        self._put_metadata(user, node, meta, replace, False)
+    
+    @backend_method
+    def get_account_groups(self, user, account):
+        """Return a dictionary with the user groups defined for this account."""
+        
+        logger.debug("get_account_groups: %s", account)
+        if user != account:
+            if account not in self._allowed_accounts(user):
+                raise NotAllowedError
+            return {}
+        self._lookup_account(account, True)
+        return self.permissions.group_dict(account)
+    
+    @backend_method
+    def update_account_groups(self, user, account, groups, replace=False):
+        """Update the groups associated with the account."""
+        
+        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
+        if user != account:
+            raise NotAllowedError
+        self._lookup_account(account, True)
+        self._check_groups(groups)
+        if replace:
+            self.permissions.group_destroy(account)
+        for k, v in groups.iteritems():
+            if not replace: # If not already deleted.
+                self.permissions.group_delete(account, k)
+            if v:
+                self.permissions.group_addmany(account, k, v)
+    
+    @backend_method
+    def put_account(self, user, account):
+        """Create a new account with the given name."""
+        
+        logger.debug("put_account: %s", account)
+        if user != account:
+            raise NotAllowedError
+        node = self.node.node_lookup(account)
+        if node is not None:
+            raise NameError('Account already exists')
+        self._put_path(user, ROOTNODE, account)
+    
+    @backend_method
+    def delete_account(self, user, account):
+        """Delete the account with the given name."""
+        
+        logger.debug("delete_account: %s", account)
+        if user != account:
+            raise NotAllowedError
+        node = self.node.node_lookup(account)
+        if node is None:
+            return
+        if not self.node.node_remove(node):
+            raise IndexError('Account is not empty')
+        self.permissions.group_destroy(account)
+    
+    @backend_method
+    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
+        """Return a list of containers existing under an account."""
+        
+        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
+        if user != account:
+            if until or account not in self._allowed_accounts(user):
+                raise NotAllowedError
+            allowed = self._allowed_containers(user, account)
+            start, limit = self._list_limits(allowed, marker, limit)
+            return allowed[start:start + limit]
+        if shared:
+            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
+            start, limit = self._list_limits(allowed, marker, limit)
+            return allowed[start:start + limit]
+        node = self.node.node_lookup(account)
+        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+    
+    @backend_method
+    def get_container_meta(self, user, account, container, until=None):
+        """Return a dictionary with the container metadata."""
+        
+        logger.debug("get_container_meta: %s %s %s", account, container, until)
+        if user != account:
+            if until or container not in self._allowed_containers(user, account):
+                raise NotAllowedError
+        path, node = self._lookup_container(account, container)
+        props = self._get_properties(node, until)
+        mtime = props[MTIME]
+        count, bytes, tstamp = self._get_statistics(node, until)
+        tstamp = max(tstamp, mtime)
+        if until is None:
+            modified = tstamp
+        else:
+            modified = self._get_statistics(node)[2] # Overall last modification.
+            modified = max(modified, mtime)
+        
+        if user != account:
+            meta = {'name': container}
+        else:
+            meta = dict(self.node.attribute_get(props[SERIAL]))
+            if until is not None:
+                meta.update({'until_timestamp': tstamp})
+            meta.update({'name': container, 'count': count, 'bytes': bytes})
+        meta.update({'modified': modified})
+        return meta
+    
+    @backend_method
+    def update_container_meta(self, user, account, container, meta, replace=False):
+        """Update the metadata associated with the container."""
+        
+        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+        if user != account:
+            raise NotAllowedError
+        path, node = self._lookup_container(account, container)
+        self._put_metadata(user, node, meta, replace, False)
+    
+    @backend_method
+    def get_container_policy(self, user, account, container):
+        """Return a dictionary with the container policy."""
+        
+        logger.debug("get_container_policy: %s %s", account, container)
+        if user != account:
+            if container not in self._allowed_containers(user, account):
+                raise NotAllowedError
+            return {}
+        path = self._lookup_container(account, container)[0]
+        return self.policy.policy_get(path)
+    
+    @backend_method
+    def update_container_policy(self, user, account, container, policy, replace=False):
+        """Update the policy associated with the account."""
+        
+        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
+        if user != account:
+            raise NotAllowedError
+        path = self._lookup_container(account, container)[0]
+        self._check_policy(policy)
+        if replace:
+            for k, v in self.default_policy.iteritems():
+                if k not in policy:
+                    policy[k] = v
+        self.policy.policy_set(path, policy)
+    
+    @backend_method
+    def put_container(self, user, account, container, policy=None):
+        """Create a new container with the given name."""
+        
+        logger.debug("put_container: %s %s %s", account, container, policy)
+        if user != account:
+            raise NotAllowedError
+        try:
+            path, node = self._lookup_container(account, container)
+        except NameError:
+            pass
+        else:
+            raise NameError('Container already exists')
+        if policy:
+            self._check_policy(policy)
+        path = '/'.join((account, container))
+        self._put_path(user, self._lookup_account(account, True)[1], path)
+        for k, v in self.default_policy.iteritems():
+            if k not in policy:
+                policy[k] = v
+        self.policy.policy_set(path, policy)
+    
+    @backend_method
+    def delete_container(self, user, account, container, until=None):
+        """Delete/purge the container with the given name."""
+        
+        logger.debug("delete_container: %s %s %s", account, container, until)
+        if user != account:
+            raise NotAllowedError
+        path, node = self._lookup_container(account, container)
+        
+        if until is not None:
+            versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+            for v in versions:
+                self.mapper.map_remv(v)
+            self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            return
+        
+        if self._get_statistics(node)[0] > 0:
+            raise IndexError('Container is not empty')
+        versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+        for v in versions:
+            self.mapper.map_remv(v)
+        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+        self.node.node_remove(node)
+        self.policy.policy_unset(path)
+    
+    @backend_method
+    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+        """Return a list of objects existing under a container."""
+        
+        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+        allowed = []
+        if user != account:
+            if until:
+                raise NotAllowedError
+            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+            if not allowed:
+                raise NotAllowedError
+        else:
+            if shared:
+                allowed = self.permissions.access_list_shared('/'.join((account, container)))
+        path, node = self._lookup_container(account, container)
+        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+    
+    @backend_method
+    def list_object_meta(self, user, account, container, until=None):
+        """Return a list with all the container's object meta keys."""
+        
+        logger.debug("list_object_meta: %s %s %s", account, container, until)
+        allowed = []
+        if user != account:
+            if until:
+                raise NotAllowedError
+            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+            if not allowed:
+                raise NotAllowedError
+        path, node = self._lookup_container(account, container)
+        before = until if until is not None else inf
+        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+    
+    @backend_method
+    def get_object_meta(self, user, account, container, name, version=None):
+        """Return a dictionary with the object metadata."""
+        
+        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+        self._can_read(user, account, container, name)
+        path, node = self._lookup_object(account, container, name)
+        props = self._get_version(node, version)
+        if version is None:
+            modified = props[MTIME]
+        else:
+            modified = self._get_version(node)[MTIME] # Overall last modification.
+        
+        meta = dict(self.node.attribute_get(props[SERIAL]))
+        meta.update({'name': name, 'bytes': props[SIZE]})
+        meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
+        meta.update({'modified': modified, 'modified_by': props[MUSER]})
+        return meta
+    
+    @backend_method
+    def update_object_meta(self, user, account, container, name, meta, replace=False):
+        """Update the metadata associated with the object."""
+        
+        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+        self._can_write(user, account, container, name)
+        path, node = self._lookup_object(account, container, name)
+        self._put_metadata(user, node, meta, replace)
+    
+    @backend_method
+    def get_object_permissions(self, user, account, container, name):
+        """Return the path from which this object gets its permissions from,\
+        along with a dictionary containing the permissions."""
+        
+        logger.debug("get_object_permissions: %s %s %s", account, container, name)
+        self._can_read(user, account, container, name)
+        path = self._lookup_object(account, container, name)[0]
+        return self.permissions.access_inherit(path)
+    
+    @backend_method
+    def update_object_permissions(self, user, account, container, name, permissions):
+        """Update the permissions associated with the object."""
+        
+        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
+        if user != account:
+            raise NotAllowedError
+        path = self._lookup_object(account, container, name)[0]
+        self._check_permissions(path, permissions)
+        self.permissions.access_set(path, permissions)
+    
+    @backend_method
+    def get_object_public(self, user, account, container, name):
+        """Return the public URL of the object if applicable."""
+        
+        logger.debug("get_object_public: %s %s %s", account, container, name)
+        self._can_read(user, account, container, name)
+        path = self._lookup_object(account, container, name)[0]
+        if self.permissions.public_check(path):
+            return '/public/' + path
+        return None
+    
+    @backend_method
+    def update_object_public(self, user, account, container, name, public):
+        """Update the public status of the object."""
+        
+        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
+        self._can_write(user, account, container, name)
+        path = self._lookup_object(account, container, name)[0]
+        if not public:
+            self.permissions.public_unset(path)
+        else:
+            self.permissions.public_set(path)
+    
+    @backend_method
+    def get_object_hashmap(self, user, account, container, name, version=None):
+        """Return the object's size and a list with partial hashes."""
+        
+        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
+        self._can_read(user, account, container, name)
+        path, node = self._lookup_object(account, container, name)
+        props = self._get_version(node, version)
+        hashmap = self.mapper.map_retr(props[SERIAL])
+        return props[SIZE], [binascii.hexlify(x) for x in hashmap]
+    
+    @backend_method
+    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+        """Create/update an object with the specified size and partial hashes."""
+        
+        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+        if permissions is not None and user != account:
+            raise NotAllowedError
+        self._can_write(user, account, container, name)
+        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
+        if missing:
+            ie = IndexError()
+            ie.data = missing
+            raise ie
+        if permissions is not None:
+            self._check_permissions(path, permissions)
+        path, node = self._put_object_node(account, container, name)
+        src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
+        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
+        if not replace_meta and src_version_id is not None:
+            self.node.attribute_copy(src_version_id, dest_version_id)
+        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+        if permissions is not None:
+            self.permissions.access_set(path, permissions)
+    
+    @backend_method
+    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+        """Copy an object's data and metadata."""
+        
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+        if permissions is not None and user != account:
+            raise NotAllowedError
+        self._can_read(user, account, src_container, src_name)
+        self._can_write(user, account, dest_container, dest_name)
+        src_path, src_node = self._lookup_object(account, src_container, src_name)
+        if permissions is not None:
+            self._check_permissions(dest_path, permissions)
+        dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
+        src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
+        if src_version_id is not None:
+            self._copy_data(src_version_id, dest_version_id)
+        if not replace_meta and src_version_id is not None:
+            self.node.attribute_copy(src_version_id, dest_version_id)
+        self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+        if permissions is not None:
+            self.permissions.access_set(dest_path, permissions)
+    
+    @backend_method
+    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+        """Move an object's data and metadata."""
+        
+        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
+        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+        self.delete_object(user, account, src_container, src_name)
+    
+    @backend_method
+    def delete_object(self, user, account, container, name, until=None):
+        """Delete/purge an object."""
+        
+        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+        if user != account:
+            raise NotAllowedError
+        
+        if until is not None:
+            path = '/'.join((account, container, name))
+            node = self.node.node_lookup(path)
+            if node is None:
+                return
+            versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
+            for v in versions:
+                self.mapper.map_remv(v)
+            self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            try:
+                props = self._get_version(node)
+            except NameError:
+                pass
+            else:
+                self.permissions.access_clear(path)
+            return
+        
+        path, node = self._lookup_object(account, container, name)
+        self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
+        self.permissions.access_clear(path)
+    
+    @backend_method
+    def list_versions(self, user, account, container, name):
+        """Return a list of all (version, version_timestamp) tuples for an object."""
+        
+        logger.debug("list_versions: %s %s %s", account, container, name)
+        self._can_read(user, account, container, name)
+        return self.node.node_get_versions(node, ['serial', 'mtime'])
+    
+    @backend_method(autocommit=0)
+    def get_block(self, hash):
+        """Return a block's data."""
+        
+        logger.debug("get_block: %s", hash)
+        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
+        if not blocks:
+            raise NameError('Block does not exist')
+        return blocks[0]
+    
+    @backend_method(autocommit=0)
+    def put_block(self, data):
+        """Create a block and return the hash."""
+        
+        logger.debug("put_block: %s", len(data))
+        hashes, absent = self.blocker.block_stor((data,))
+        return binascii.hexlify(hashes[0])
+    
+    @backend_method(autocommit=0)
+    def update_block(self, hash, data, offset=0):
+        """Update a known block and return the hash."""
+        
+        logger.debug("update_block: %s %s %s", hash, len(data), offset)
+        if offset == 0 and len(data) == self.block_size:
+            return self.put_block(data)
+        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+        return binascii.hexlify(h)
+    
+    def _check_policy(self, policy):
+        for k in policy.keys():
+            if policy[k] == '':
+                policy[k] = self.default_policy.get(k)
+        for k, v in policy.iteritems():
+            if k == 'quota':
+                q = int(v) # May raise ValueError.
+                if q < 0:
+                    raise ValueError
+            elif k == 'versioning':
+                if v not in ['auto', 'manual', 'none']:
+                    raise ValueError
+            else:
+                raise ValueError
+    
+    def _sql_until(self, parent, until=None):
+        """Return the sql to get the latest versions until the timestamp given."""
+        
+        if until is None:
+            until = time.time()
+        sql = ("select v.serial, n.path, v.mtime, v.size "
+               "from versions v, nodes n "
+               "where v.serial = (select max(serial) "
+                                 "from versions "
+                                 "where node = v.node and mtime < %s) "
+               "and v.cluster != %s "
+               "and v.node = n.node "
+               "and v.node in (select node "
+                              "from nodes "
+                              "where parent = %s)")
+        return sql % (until, CLUSTER_DELETED, parent)
+    
+    def _list_limits(self, listing, marker, limit):
+        start = 0
+        if marker:
+            try:
+                start = listing.index(marker) + 1
+            except ValueError:
+                pass
+        if not limit or limit > 10000:
+            limit = 10000
+        return start, limit
+    
+    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+        cont_prefix = path + '/'
+        if keys and len(keys) > 0:
+#             sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
+#                         m.version_id = o.version_id and m.key in (%s)'''
+#             sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
+#             param = (cont_prefix + prefix + '%',) + tuple(keys)
+#             if allowed:
+#                 sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
+#                 param += tuple([x + '%' for x in allowed])
+#             sql += ' order by o.name'
+            return []
+        else:
+            sql = 'select path, serial from (%s) where path like ?'
+            sql = sql % self._sql_until(parent, until)
+            param = (cont_prefix + prefix + '%',)
+            if allowed:
+                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
+                param += tuple([x + '%' for x in allowed])
+            sql += ' order by path'
+        c = self.con.execute(sql, param)
+        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
+        if delimiter:
+            pseudo_objects = []
+            for x in objects:
+                pseudo_name = x[0]
+                i = pseudo_name.find(delimiter, len(prefix))
+                if not virtual:
+                    # If the delimiter is not found, or the name ends
+                    # with the delimiter's first occurence.
+                    if i == -1 or len(pseudo_name) == i + len(delimiter):
+                        pseudo_objects.append(x)
+                else:
+                    # If the delimiter is found, keep up to (and including) the delimiter.
+                    if i != -1:
+                        pseudo_name = pseudo_name[:i + len(delimiter)]
+                    if pseudo_name not in [y[0] for y in pseudo_objects]:
+                        if pseudo_name == x[0]:
+                            pseudo_objects.append(x)
+                        else:
+                            pseudo_objects.append((pseudo_name, None))
+            objects = pseudo_objects
+        
+        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+        return objects[start:start + limit]
+    
+    # Path functions.
+    
+    def _put_object_node(self, account, container, name):
+        path, parent = self._lookup_container(account, container)
+        path = '/'.join((path, name))
+        node = self.node.node_lookup(path)
+        if node is None:
+            node = self.node.node_create(parent, path)
+        return path, node
+    
+    def _put_path(self, user, parent, path):
+        node = self.node.node_create(parent, path)
+        self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
+        return node
+    
+    def _lookup_account(self, account, create=True):
+        node = self.node.node_lookup(account)
+        if node is None and create:
+            node = self._put_path(account, ROOTNODE, account) # User is account.
+        return account, node
+    
+    def _lookup_container(self, account, container):
+        path = '/'.join((account, container))
+        node = self.node.node_lookup(path)
+        if node is None:
+            raise NameError('Container does not exist')
+        return path, node
+    
+    def _lookup_object(self, account, container, name):
+        path = '/'.join((account, container, name))
+        node = self.node.node_lookup(path)
+        if node is None:
+            raise NameError('Object does not exist')
+        return path, node
+    
+    def _get_properties(self, node, until=None):
+        """Return properties until the timestamp given."""
+        
+        before = until if until is not None else inf
+        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
+        if props is None and until is not None:
+            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
+        if props is None:
+            raise NameError('Path does not exist')
+        return props
+    
+    def _get_statistics(self, node, until=None):
+        """Return count, sum of size and latest timestamp of everything under node."""
+        
+        if until is None:
+            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
+        else:
+            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
+        if stats is None:
+            stats = (0, 0, 0)
+        return stats
+    
+    def _get_version(self, node, version=None):
+        if version is None:
+            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+            if props is None:
+                raise NameError('Object does not exist')
+        else:
+            props = self.node.version_get_properties(version)
+            if props is None or props[CLUSTER] == CLUSTER_DELETED:
+                raise IndexError('Version does not exist')
+        return props
+    
+    def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
+        
+        # Get source serial and size.
+        if src_version is not None:
+            src_props = self._get_version(src_node, src_version)
+            src_version_id = src_props[SERIAL]
+            size = src_props[SIZE]
+        else:
+            # Latest or create from scratch.
+            try:
+                src_props = self._get_version(src_node)
+                src_version_id = src_props[SERIAL]
+                size = src_props[SIZE]
+            except NameError:
+                src_version_id = None
+                size = 0
+        if dest_size is not None:
+            size = dest_size
+        
+        # Move the latest version at destination to CLUSTER_HISTORY and create new.
+        if src_node == dest_node and src_version is None and src_version_id is not None:
+            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+        else:
+            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
+            if dest_props is not None:
+                self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
+        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
+        
+        return src_version_id, dest_version_id
+    
+    def _copy_data(self, src_version, dest_version):
+        hashmap = self.mapper.map_retr(src_version)
+        self.mapper.map_stor(dest_version, hashmap)
+    
+    def _get_metadata(self, version):
+        if version is None:
+            return {}
+        return dict(self.node.attribute_get(version))
+    
+    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+        """Create a new version and store metadata."""
+        
+        src_version_id, dest_version_id = self._copy_version(user, node, None, node)
+        if not replace:
+            if src_version_id is not None:
+                self.node.attribute_copy(src_version_id, dest_version_id)
+            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
+            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
+        else:
+            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+        if copy_data and src_version_id is not None:
+            self._copy_data(src_version_id, dest_version_id)
+    
+    # Access control functions.
+    
+    def _check_groups(self, groups):
+        # raise ValueError('Bad characters in groups')
+        pass
+    
+    def _check_permissions(self, path, permissions):
+        # raise ValueError('Bad characters in permissions')
+        
+        # Check for existing permissions.
+        paths = self.permissions.access_list(path)
+        if paths:
+            ae = AttributeError()
+            ae.data = paths
+            raise ae
+    
+    def _can_read(self, user, account, container, name):
+        if user == account:
+            return True
+        path = '/'.join((account, container, name))
+        if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
+            raise NotAllowedError
+    
+    def _can_write(self, user, account, container, name):
+        if user == account:
+            return True
+        path = '/'.join((account, container, name))
+        if not self.permissions.access_check(path, WRITE, user):
+            raise NotAllowedError
+    
+    def _allowed_accounts(self, user):
+        allow = set()
+        for path in self.permissions.access_list_paths(user):
+            allow.add(path.split('/', 1)[0])
+        return sorted(allow)
+    
+    def _allowed_containers(self, user, account):
+        allow = set()
+        for path in self.permissions.access_list_paths(user, account):
+            allow.add(path.split('/', 2)[1])
+        return sorted(allow)