backend components in SQLAlchemy: Progress IV
authorSofia Papagiannaki <papagian@gmail.com>
Thu, 1 Sep 2011 15:36:37 +0000 (18:36 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Thu, 1 Sep 2011 15:36:37 +0000 (18:36 +0300)
- switch to postgresql
- merge latest backend fixes

14 files changed:
pithos/backends/lib_alchemy/hashfiler/__init__.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/blocker.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/context_file.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py [deleted file]
pithos/backends/lib_alchemy/hashfiler/mapper.py [deleted file]
pithos/backends/lib_alchemy/node.py
pithos/backends/lib_alchemy/policy.py
pithos/backends/lib_alchemy/public.py
pithos/backends/lib_alchemy/xfeatures.py
pithos/backends/modular_alchemy.py
pithos/settings.py.dist

diff --git a/pithos/backends/lib_alchemy/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/__init__.py
deleted file mode 100644 (file)
index aa3b929..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from blocker import Blocker
-from mapper import Mapper
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/blocker.py
deleted file mode 100644 (file)
index 47293bc..0000000
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os import makedirs
-from os.path import isdir, realpath, exists, join
-from hashlib import new as newhasher
-from binascii import hexlify
-
-from context_file import ContextFile, file_sync_read_chunks
-
-
-class Blocker(object):
-    """Blocker.
-       Required contstructor parameters: blocksize, blockpath, hashtype.
-    """
-
-    blocksize = None
-    blockpath = None
-    hashtype = None
-
-    def __init__(self, **params):
-        blocksize = params['blocksize']
-        blockpath = params['blockpath']
-        blockpath = realpath(blockpath)
-        if not isdir(blockpath):
-            if not exists(blockpath):
-                makedirs(blockpath)
-            else:
-                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
-
-        hashtype = params['hashtype']
-        try:
-            hasher = newhasher(hashtype)
-        except ValueError:
-            msg = "Variable hashtype '%s' is not available from hashlib"
-            raise ValueError(msg % (hashtype,))
-
-        hasher.update("")
-        emptyhash = hasher.digest()
-
-        self.blocksize = blocksize
-        self.blockpath = blockpath
-        self.hashtype = hashtype
-        self.hashlen = len(emptyhash)
-        self.emptyhash = emptyhash
-
-    def get_rear_block(self, blkhash, create=0):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        if not exists(dir):
-            makedirs(dir)
-        name = join(dir, filename)
-        return ContextFile(name, create)
-
-    def check_rear_block(self, blkhash):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        name = join(dir, filename)
-        return exists(name)
-
-    def block_hash(self, data):
-        """Hash a block of data"""
-        hasher = newhasher(self.hashtype)
-        hasher.update(data.rstrip('\x00'))
-        return hasher.digest()
-
-    def block_ping(self, hashes):
-        """Check hashes for existence and
-           return those missing from block storage.
-        """
-        missing = []
-        append = missing.append
-        for i, h in enumerate(hashes):
-            if not self.check_rear_block(h):
-                append(i)
-        return missing
-
-    def block_retr(self, hashes):
-        """Retrieve blocks from storage by their hashes."""
-        blocksize = self.blocksize
-        blocks = []
-        append = blocks.append
-        block = None
-
-        for h in hashes:
-            with self.get_rear_block(h, 0) as rbl:
-                if not rbl:
-                    break
-                for block in rbl.sync_read_chunks(blocksize, 1, 0):
-                    break # there should be just one block there
-            if not block:
-                break
-            append(block)
-
-        return blocks
-
-    def block_stor(self, blocklist):
-        """Store a bunch of blocks and return (hashes, missing).
-           Hashes is a list of the hashes of the blocks,
-           missing is a list of indices in that list indicating
-           which blocks were missing from the store.
-        """
-        block_hash = self.block_hash
-        hashlist = [block_hash(b) for b in blocklist]
-        mf = None
-        missing = self.block_ping(hashlist)
-        for i in missing:
-            with self.get_rear_block(hashlist[i], 1) as rbl:
-                 rbl.sync_write(blocklist[i]) #XXX: verify?
-
-        return hashlist, missing
-
-    def block_delta(self, blkhash, offdata=()):
-        """Construct and store a new block from a given block
-           and a list of (offset, data) 'patches'. Return:
-           (the hash of the new block, if the block already existed)
-        """
-        if not offdata:
-            return None, None
-
-        blocksize = self.blocksize
-        block = self.block_retr((blkhash,))
-        if not block:
-            return None, None
-
-        block = block[0]
-        newblock = ''
-        idx = 0
-        size = 0
-        trunc = 0
-        for off, data in offdata:
-            if not data:
-                trunc = 1
-                break
-            newblock += block[idx:off] + data
-            size += off - idx + len(data)
-            if size >= blocksize:
-                break
-            off = size
-
-        if not trunc:
-            newblock += block[size:len(block)]
-
-        h, a = self.block_stor((newblock,))
-        return h[0], 1 if a else 0
-
-    def block_hash_file(self, openfile):
-        """Return the list of hashes (hashes map)
-           for the blocks in a buffered file.
-           Helper method, does not affect store.
-        """
-        hashes = []
-        append = hashes.append
-        block_hash = self.block_hash
-
-        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
-            append(block_hash(block))
-
-        return hashes
-
-    def block_stor_file(self, openfile):
-        """Read blocks from buffered file object and store them. Return:
-           (bytes read, list of hashes, list of hashes that were missing)
-        """
-        blocksize = self.blocksize
-        block_stor = self.block_stor
-        hashlist = []
-        hextend = hashlist.extend
-        storedlist = []
-        sextend = storedlist.extend
-        lastsize = 0
-
-        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
-            hl, sl = block_stor((block,))
-            hextend(hl)
-            sextend(sl)
-            lastsize = len(block)
-
-        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
-        return size, hashlist, storedlist
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/context_file.py
deleted file mode 100644 (file)
index 0a16b2a..0000000
+++ /dev/null
@@ -1,191 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os import SEEK_CUR, SEEK_SET, fsync
-from errno import ENOENT
-
-
-_zeros = ''
-
-
-def zeros(nr):
-    global _zeros
-    size = len(_zeros)
-    if nr == size:
-        return _zeros
-
-    if nr > size:
-        _zeros += '\0' * (nr - size)
-        return _zeros
-
-    if nr < size:
-        _zeros = _zeros[:nr]
-        return _zeros
-
-
-def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
-    """Write given chunks to the given buffered file object.
-       Writes never span across chunk boundaries.
-       If size is given stop after or pad until size bytes have been written.
-    """
-    fwrite = openfile.write
-    seek = openfile.seek
-    padding = 0
-
-    try:
-        seek(offset * chunksize)
-    except IOError, e:
-        seek = None
-        for x in xrange(offset):
-            fwrite(zeros(chunksize))
-
-    cursize = offset * chunksize
-
-    for chunk in chunks:
-        if padding:
-            if seek:
-                seek(padding -1, SEEK_CUR)
-                fwrite("\x00")
-            else:
-                fwrite(buffer(zeros(chunksize), 0, padding))
-        if size is not None and cursize + chunksize >= size:
-            chunk = chunk[:chunksize - (cursize - size)]
-            fwrite(chunk)
-            cursize += len(chunk)
-            break
-        fwrite(chunk)
-        padding = chunksize - len(chunk)
-
-    padding = size - cursize if size is not None else 0
-    if padding <= 0:
-        return
-
-    q, r = divmod(padding, chunksize)
-    for x in xrange(q):
-        fwrite(zeros(chunksize))
-    fwrite(buffer(zeros(chunksize), 0, r))
-
-
-def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
-    """Read and yield groups of chunks from a buffered file object at offset.
-       Reads never span accros chunksize boundaries.
-    """
-    fread = openfile.read
-    remains = offset * chunksize
-    seek = openfile.seek
-    try:
-        seek(remains)
-    except IOError, e:
-        seek = None
-        while 1:
-            s = fread(remains)
-            remains -= len(s)
-            if remains <= 0:
-                break
-
-    while nr:
-        remains = chunksize
-        chunk = ''
-        while 1:
-            s = fread(remains)
-            if not s:
-                if chunk:
-                    yield chunk
-                return
-            chunk += s
-            remains -= len(s)
-            if remains <= 0:
-                break
-        yield chunk
-        nr -= 1
-
-
-class ContextFile(object):
-    __slots__ = ("name", "fdesc", "create")
-
-    def __init__(self, name, create=0):
-        self.name = name
-        self.fdesc = None
-        self.create = create
-        #self.dirty = 0
-
-    def __enter__(self):
-        name = self.name
-        try:
-            fdesc = open(name, 'rb+')
-        except IOError, e:
-            if not self.create or e.errno != ENOENT:
-                raise
-            fdesc = open(name, 'w+')
-
-        self.fdesc = fdesc
-        return self
-
-    def __exit__(self, exc, arg, trace):
-        fdesc = self.fdesc
-        if fdesc is not None:
-            #if self.dirty:
-            #    fsync(fdesc.fileno())
-            fdesc.close()
-        return False # propagate exceptions
-
-    def seek(self, offset, whence=SEEK_SET):
-        return self.fdesc.seek(offset, whence)
-
-    def tell(self):
-        return self.fdesc.tell()
-
-    def truncate(self, size):
-        self.fdesc.truncate(size)
-
-    def sync_write(self, data):
-        #self.dirty = 1
-        self.fdesc.write(data)
-
-    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
-        #self.dirty = 1
-        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
-
-    def sync_read(self, size):
-        read = self.fdesc.read
-        data = ''
-        while 1:
-            s = read(size)
-            if not s:
-                break
-            data += s
-        return data
-
-    def sync_read_chunks(self, chunksize, nr, offset=0):
-        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/__init__.py
deleted file mode 100644 (file)
index d3cae88..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from blocker import Blocker
-from mapper import Mapper
-
-__all__ = ["Blocker", "Mapper"]
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/blocker.py
deleted file mode 100644 (file)
index 59fd31f..0000000
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os import makedirs
-from os.path import isdir, realpath, exists, join
-from hashlib import new as newhasher
-from binascii import hexlify
-
-from context_file import ContextFile, file_sync_read_chunks
-
-
-class Blocker(object):
-    """Blocker.
-       Required contstructor parameters: blocksize, blockpath, hashtype.
-    """
-
-    blocksize = None
-    blockpath = None
-    hashtype = None
-
-    def __init__(self, **params):
-        blocksize = params['blocksize']
-        blockpath = params['blockpath']
-        blockpath = realpath(blockpath)
-        if not isdir(blockpath):
-            if not exists(blockpath):
-                makedirs(blockpath)
-            else:
-                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
-
-        hashtype = params['hashtype']
-        try:
-            hasher = newhasher(hashtype)
-        except ValueError:
-            msg = "Variable hashtype '%s' is not available from hashlib"
-            raise ValueError(msg % (hashtype,))
-
-        hasher.update("")
-        emptyhash = hasher.digest()
-
-        self.blocksize = blocksize
-        self.blockpath = blockpath
-        self.hashtype = hashtype
-        self.hashlen = len(emptyhash)
-        self.emptyhash = emptyhash
-
-    def _get_rear_block(self, blkhash, create=0):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        if not exists(dir):
-            makedirs(dir)
-        name = join(dir, filename)
-        return ContextFile(name, create)
-
-    def _check_rear_block(self, blkhash):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        name = join(dir, filename)
-        return exists(name)
-
-    def block_hash(self, data):
-        """Hash a block of data"""
-        hasher = newhasher(self.hashtype)
-        hasher.update(data.rstrip('\x00'))
-        return hasher.digest()
-
-    def block_ping(self, hashes):
-        """Check hashes for existence and
-           return those missing from block storage.
-        """
-        missing = []
-        append = missing.append
-        for i, h in enumerate(hashes):
-            if not self._check_rear_block(h):
-                append(i)
-        return missing
-
-    def block_retr(self, hashes):
-        """Retrieve blocks from storage by their hashes."""
-        blocksize = self.blocksize
-        blocks = []
-        append = blocks.append
-        block = None
-
-        for h in hashes:
-            with self._get_rear_block(h, 0) as rbl:
-                if not rbl:
-                    break
-                for block in rbl.sync_read_chunks(blocksize, 1, 0):
-                    break # there should be just one block there
-            if not block:
-                break
-            append(block)
-
-        return blocks
-
-    def block_stor(self, blocklist):
-        """Store a bunch of blocks and return (hashes, missing).
-           Hashes is a list of the hashes of the blocks,
-           missing is a list of indices in that list indicating
-           which blocks were missing from the store.
-        """
-        block_hash = self.block_hash
-        hashlist = [block_hash(b) for b in blocklist]
-        mf = None
-        missing = self.block_ping(hashlist)
-        for i in missing:
-            with self._get_rear_block(hashlist[i], 1) as rbl:
-                 rbl.sync_write(blocklist[i]) #XXX: verify?
-
-        return hashlist, missing
-
-    def block_delta(self, blkhash, offdata=()):
-        """Construct and store a new block from a given block
-           and a list of (offset, data) 'patches'. Return:
-           (the hash of the new block, if the block already existed)
-        """
-        if not offdata:
-            return None, None
-
-        blocksize = self.blocksize
-        block = self.block_retr((blkhash,))
-        if not block:
-            return None, None
-
-        block = block[0]
-        newblock = ''
-        idx = 0
-        size = 0
-        trunc = 0
-        for off, data in offdata:
-            if not data:
-                trunc = 1
-                break
-            newblock += block[idx:off] + data
-            size += off - idx + len(data)
-            if size >= blocksize:
-                break
-            off = size
-
-        if not trunc:
-            newblock += block[size:len(block)]
-
-        h, a = self.block_stor((newblock,))
-        return h[0], 1 if a else 0
-
-    def block_hash_file(self, openfile):
-        """Return the list of hashes (hashes map)
-           for the blocks in a buffered file.
-           Helper method, does not affect store.
-        """
-        hashes = []
-        append = hashes.append
-        block_hash = self.block_hash
-
-        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
-            append(block_hash(block))
-
-        return hashes
-
-    def block_stor_file(self, openfile):
-        """Read blocks from buffered file object and store them. Return:
-           (bytes read, list of hashes, list of hashes that were missing)
-        """
-        blocksize = self.blocksize
-        block_stor = self.block_stor
-        hashlist = []
-        hextend = hashlist.extend
-        storedlist = []
-        sextend = storedlist.extend
-        lastsize = 0
-
-        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
-            hl, sl = block_stor((block,))
-            hextend(hl)
-            sextend(sl)
-            lastsize = len(block)
-
-        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
-        return size, hashlist, storedlist
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/context_file.py
deleted file mode 100644 (file)
index 0a16b2a..0000000
+++ /dev/null
@@ -1,191 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os import SEEK_CUR, SEEK_SET, fsync
-from errno import ENOENT
-
-
-_zeros = ''
-
-
-def zeros(nr):
-    global _zeros
-    size = len(_zeros)
-    if nr == size:
-        return _zeros
-
-    if nr > size:
-        _zeros += '\0' * (nr - size)
-        return _zeros
-
-    if nr < size:
-        _zeros = _zeros[:nr]
-        return _zeros
-
-
-def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
-    """Write given chunks to the given buffered file object.
-       Writes never span across chunk boundaries.
-       If size is given stop after or pad until size bytes have been written.
-    """
-    fwrite = openfile.write
-    seek = openfile.seek
-    padding = 0
-
-    try:
-        seek(offset * chunksize)
-    except IOError, e:
-        seek = None
-        for x in xrange(offset):
-            fwrite(zeros(chunksize))
-
-    cursize = offset * chunksize
-
-    for chunk in chunks:
-        if padding:
-            if seek:
-                seek(padding -1, SEEK_CUR)
-                fwrite("\x00")
-            else:
-                fwrite(buffer(zeros(chunksize), 0, padding))
-        if size is not None and cursize + chunksize >= size:
-            chunk = chunk[:chunksize - (cursize - size)]
-            fwrite(chunk)
-            cursize += len(chunk)
-            break
-        fwrite(chunk)
-        padding = chunksize - len(chunk)
-
-    padding = size - cursize if size is not None else 0
-    if padding <= 0:
-        return
-
-    q, r = divmod(padding, chunksize)
-    for x in xrange(q):
-        fwrite(zeros(chunksize))
-    fwrite(buffer(zeros(chunksize), 0, r))
-
-
-def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
-    """Read and yield groups of chunks from a buffered file object at offset.
-       Reads never span accros chunksize boundaries.
-    """
-    fread = openfile.read
-    remains = offset * chunksize
-    seek = openfile.seek
-    try:
-        seek(remains)
-    except IOError, e:
-        seek = None
-        while 1:
-            s = fread(remains)
-            remains -= len(s)
-            if remains <= 0:
-                break
-
-    while nr:
-        remains = chunksize
-        chunk = ''
-        while 1:
-            s = fread(remains)
-            if not s:
-                if chunk:
-                    yield chunk
-                return
-            chunk += s
-            remains -= len(s)
-            if remains <= 0:
-                break
-        yield chunk
-        nr -= 1
-
-
-class ContextFile(object):
-    __slots__ = ("name", "fdesc", "create")
-
-    def __init__(self, name, create=0):
-        self.name = name
-        self.fdesc = None
-        self.create = create
-        #self.dirty = 0
-
-    def __enter__(self):
-        name = self.name
-        try:
-            fdesc = open(name, 'rb+')
-        except IOError, e:
-            if not self.create or e.errno != ENOENT:
-                raise
-            fdesc = open(name, 'w+')
-
-        self.fdesc = fdesc
-        return self
-
-    def __exit__(self, exc, arg, trace):
-        fdesc = self.fdesc
-        if fdesc is not None:
-            #if self.dirty:
-            #    fsync(fdesc.fileno())
-            fdesc.close()
-        return False # propagate exceptions
-
-    def seek(self, offset, whence=SEEK_SET):
-        return self.fdesc.seek(offset, whence)
-
-    def tell(self):
-        return self.fdesc.tell()
-
-    def truncate(self, size):
-        self.fdesc.truncate(size)
-
-    def sync_write(self, data):
-        #self.dirty = 1
-        self.fdesc.write(data)
-
-    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
-        #self.dirty = 1
-        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
-
-    def sync_read(self, size):
-        read = self.fdesc.read
-        data = ''
-        while 1:
-            s = read(size)
-            if not s:
-                break
-            data += s
-        return data
-
-    def sync_read_chunks(self, chunksize, nr, offset=0):
-        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/hashfiler/mapper.py
deleted file mode 100644 (file)
index d9caaee..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os.path import realpath, join, exists, isdir
-from os import makedirs, unlink
-from errno import ENOENT
-
-from context_file import ContextFile
-
-
-class Mapper(object):
-    """Mapper.
-       Required contstructor parameters: mappath, namelen.
-    """
-    
-    mappath = None
-    namelen = None
-
-    def __init__(self, **params):
-        self.params = params
-        self.namelen = params['namelen']
-        mappath = realpath(params['mappath'])
-        if not isdir(mappath):
-            if not exists(mappath):
-                makedirs(mappath)
-            else:
-                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
-        self.mappath = mappath
-
-    def _get_rear_map(self, name, create=0):
-        name = join(self.mappath, hex(int(name)))
-        return ContextFile(name, create)
-
-    def _delete_rear_map(self, name):
-        name = join(self.mappath, hex(int(name)))
-        try:
-            unlink(name)
-            return 1
-        except OSError, e:
-            if e.errno != ENOENT:
-                raise
-        return 0
-
-    def map_retr(self, name, blkoff=0, nr=100000000000000):
-        """Return as a list, part of the hashes map of an object
-           at the given block offset.
-           By default, return the whole hashes map.
-        """
-        namelen = self.namelen
-        hashes = ()
-
-        with self._get_rear_map(name, 0) as rmap:
-            if rmap:
-                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
-        return hashes
-
-    def map_stor(self, name, hashes=(), blkoff=0, create=1):
-        """Store hashes in the given hashes map, replacing the old ones."""
-        namelen = self.namelen
-        with self._get_rear_map(name, 1) as rmap:
-            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
-
-#     def map_copy(self, src, dst):
-#         """Copy a hashes map to another one, replacing it."""
-#         with self._get_rear_map(src, 0) as rmap:
-#             if rmap:
-#                 rmap.copy_to(dst)
-
-    def map_remv(self, name):
-        """Remove a hashes map. Returns true if the map was found and removed."""
-        return self._delete_rear_map(name)
-
diff --git a/pithos/backends/lib_alchemy/hashfiler/mapper.py b/pithos/backends/lib_alchemy/hashfiler/mapper.py
deleted file mode 100644 (file)
index 825bc9c..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
-# 
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-# 
-#   1. Redistributions of source code must retain the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer.
-# 
-#   2. Redistributions in binary form must reproduce the above
-#      copyright notice, this list of conditions and the following
-#      disclaimer in the documentation and/or other materials
-#      provided with the distribution.
-# 
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-# 
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-from os.path import realpath, join, exists, isdir
-from os import makedirs, unlink
-from errno import ENOENT
-
-from context_file import ContextFile
-
-
-class Mapper(object):
-    """Mapper.
-       Required contstructor parameters: mappath, namelen.
-    """
-    
-    mappath = None
-    namelen = None
-
-    def __init__(self, **params):
-        self.params = params
-        self.namelen = params['namelen']
-        mappath = realpath(params['mappath'])
-        if not isdir(mappath):
-            if not exists(mappath):
-                makedirs(mappath)
-            else:
-                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
-        self.mappath = mappath
-
-    def get_rear_map(self, name, create=0):
-        name = join(self.mappath, hex(int(name)))
-        return ContextFile(name, create)
-
-    def delete_rear_map(self, name):
-        name = join(self.mappath, hex(int(name)))
-        try:
-            unlink(name)
-            return 1
-        except OSError, e:
-            if e.errno != ENOENT:
-                raise
-        return 0
-
-    def map_retr(self, name, blkoff=0, nr=100000000000000):
-        """Return as a list, part of the hashes map of an object
-           at the given block offset.
-           By default, return the whole hashes map.
-        """
-        namelen = self.namelen
-        hashes = ()
-
-        with self.get_rear_map(name, 0) as rmap:
-            if rmap:
-                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
-        return hashes
-
-    def map_stor(self, name, hashes=(), blkoff=0, create=1):
-        """Store hashes in the given hashes map, replacing the old ones."""
-        namelen = self.namelen
-        with self.get_rear_map(name, 1) as rmap:
-            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
-
-#     def map_copy(self, src, dst):
-#         """Copy a hashes map to another one, replacing it."""
-#         with self.get_rear_map(src, 0) as rmap:
-#             if rmap:
-#                 rmap.copy_to(dst)
-
-    def map_remv(self, name):
-        """Remove a hashes map. Returns true if the map was found and removed."""
-        return self.delete_rear_map(name)
-
index 6a3d949..6d9a36d 100644 (file)
 # or implied, of GRNET S.A.
 
 from time import time
-from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey
+from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
 from sqlalchemy.schema import Index, Sequence
 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
-
+from duplicate import insertOnDuplicate
 from dbworker import DBWorker
 
-ROOTNODE  = 1
+ROOTNODE  = 0
 
 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
 
@@ -76,7 +76,8 @@ def strprevling(prefix):
     s = prefix[:-1]
     c = ord(prefix[-1])
     if c > 0:
-        s += unichr(c-1) + unichr(0xffff)
+        #s += unichr(c-1) + unichr(0xffff)
+        s += unichr(c-1)
     return s
 
 
@@ -111,8 +112,7 @@ class Node(DBWorker):
                                          ondelete='CASCADE',
                                          onupdate='CASCADE'),
                               autoincrement=False))
-        #columns.append(Column('path', String(2048), default='', nullable=False))
-        columns.append(Column('path', String(255), default='', nullable=False))
+        columns.append(Column('path', String(2048), default='', nullable=False))
         self.nodes = Table('nodes', metadata, *columns)
         # place an index on path
         Index('idx_nodes_path', self.nodes.c.path, unique=True)
@@ -126,7 +126,7 @@ class Node(DBWorker):
                               primary_key=True))
         columns.append(Column('population', Integer, nullable=False, default=0))
         columns.append(Column('size', Integer, nullable=False, default=0))
-        columns.append(Column('mtime', Integer))
+        columns.append(Column('mtime', Float))
         columns.append(Column('cluster', Integer, nullable=False, default=0,
                               primary_key=True))
         self.statistics = Table('statistics', metadata, *columns)
@@ -140,7 +140,7 @@ class Node(DBWorker):
                                          onupdate='CASCADE')))
         columns.append(Column('size', Integer, nullable=False, default=0))
         columns.append(Column('source', Integer))
-        columns.append(Column('mtime', Integer))
+        columns.append(Column('mtime', Float))
         columns.append(Column('muser', String(255), nullable=False, default=''))
         columns.append(Column('cluster', Integer, nullable=False, default=0))
         self.versions = Table('versions', metadata, *columns)
@@ -162,8 +162,8 @@ class Node(DBWorker):
         
         metadata.create_all(self.engine)
         
-        s = self.nodes.select().where(and_(self.nodes.c.node == 1,
-                                           self.nodes.c.parent == 1))
+        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
+                                           self.nodes.c.parent == ROOTNODE))
         r = self.conn.execute(s).fetchone()
         if not r:
             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
@@ -387,8 +387,17 @@ class Node(DBWorker):
         population += prepopulation
         size += presize
         
-        ins = self.statistics.insert().values(node, population, size, mtime, cluster)
-        self.conn.execute(ins).close()
+        #insert or replace
+        u = self.statistics.update().where(and_(self.statistics.c.node==node,
+                                           self.statistics.c.cluster==cluster))
+        u = u.values(population=population, size=size, mtime=mtime)
+        rp = self.conn.execute(u)
+        rp.close()
+        if rp.rowcount == 0:
+            ins = self.statistics.insert()
+            ins = ins.values(node=node, population=population, size=size,
+                             mtime=mtime, cluster=cluster)
+            self.conn.execute(ins).close()
     
     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
         """Update the statistics of the given node's parent.
@@ -413,9 +422,6 @@ class Node(DBWorker):
            do not belong to the cluster.
         """
         
-        execute = self.execute
-        fetchone = self.fetchone
-        
         # The node.
         props = self.node_get_properties(node)
         if props is None:
@@ -603,7 +609,6 @@ class Node(DBWorker):
         """Set the attributes of the version specified by serial.
            Receive attributes as an iterable of (key, value) pairs.
         """
-        
         values = [{'serial':serial, 'key':k, 'value':v} for k, v in items]
         self.conn.execute(self.attributes.insert(), values).close()
     
@@ -724,8 +729,7 @@ class Node(DBWorker):
            Limit applies to the first list of tuples returned.
         """
         
-        execute = self.execute
-        
+        print '#', locals()
         if not start or start < prefix:
             start = strprevling(prefix)
         nextling = strnextling(prefix)
@@ -747,7 +751,6 @@ class Node(DBWorker):
         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
         conj = []
         for x in pathq:
-            print '#', x
             conj.append(n.c.path.like(x + '%'))
         
         if conj:
@@ -768,7 +771,6 @@ class Node(DBWorker):
         pfz = len(prefix)
         dz = len(delimiter)
         count = 0
-        fetchone = self.fetchone
         prefixes = []
         pappend = prefixes.append
         matches = []
@@ -789,11 +791,12 @@ class Node(DBWorker):
                     break
                 continue
             
-            pf = path[:idx + dz]
-            pappend(pf)
             if idx + dz == len(path):
                 mappend(props)
                 count += 1
+                continue # Get one more, in case there is a path.
+            pf = path[:idx + dz]
+            pappend(pf)
             if count >= limit: 
                 break
             
index e6dbd39..6fcb063 100644 (file)
@@ -43,8 +43,7 @@ class Policy(DBWorker):
         DBWorker.__init__(self, **params)
         metadata = MetaData()
         columns=[]
-        #columns.append(Column('path', String(2048), primary_key=True))
-        columns.append(Column('path', String(255), primary_key=True))
+        columns.append(Column('path', String(2048), primary_key=True))
         columns.append(Column('key', String(255), primary_key=True))
         columns.append(Column('value', String(255)))
         self.policies = Table('policy', metadata, *columns)
index 63aeebe..14415a1 100644 (file)
@@ -42,8 +42,7 @@ class Public(DBWorker):
         DBWorker.__init__(self, **params)
         metadata = MetaData()
         columns=[]
-        #columns.append(Column('path', String(2048), primary_key=True))
-        columns.append(Column('path', String(255), primary_key=True))
+        columns.append(Column('path', String(2048), primary_key=True))
         self.public = Table('public', metadata, *columns)
         metadata.create_all(self.engine)
         metadata.bind = self.engine
index 70901cb..dca069b 100644 (file)
@@ -60,7 +60,8 @@ class XFeatures(DBWorker):
                               ForeignKey('xfeatures.feature_id',
                                          ondelete='CASCADE'),
                               primary_key=True))
-        columns.append(Column('key', Integer, primary_key=True))
+        columns.append(Column('key', Integer, primary_key=True,
+                              autoincrement=False))
         columns.append(Column('value', String(255), primary_key=True))
         self.xfeaturevals = Table('xfeaturevals', metadata, *columns)
         
index 0ca00cc..1079d56 100644 (file)
@@ -42,8 +42,8 @@ from base import NotAllowedError, BaseBackend
 from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
 from lib_alchemy.permissions import Permissions, READ, WRITE
 from lib_alchemy.policy import Policy
-from lib_alchemy.hashfiler import Mapper, Blocker
 from sqlalchemy import create_engine
+from lib.hashfiler import Mapper, Blocker
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -61,13 +61,13 @@ def backend_method(func=None, autocommit=1):
     if not autocommit:
         return func
     def fn(self, *args, **kw):
-        self.con.execute('begin deferred')
+        trans = self.con.begin()
         try:
             ret = func(self, *args, **kw)
-            self.con.commit()
+            trans.commit()
             return ret
         except:
-            self.con.rollback()
+            trans.rollback()
             raise
     return fn
 
@@ -78,7 +78,7 @@ class ModularBackend(BaseBackend):
     Uses modules for SQL functions and storage.
     """
     
-    def __init__(self, db):
+    def __init__(self, db, db_options):
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
@@ -90,12 +90,9 @@ class ModularBackend(BaseBackend):
         if not os.path.isdir(basepath):
             raise RuntimeError("Cannot open database at '%s'" % (db,))
         
-        dbuser = 'pithos'
-        dbpass = 'archipelagos'
-        dbhost = '62.217.112.56'
-        dbname = 'pithosdb'
-        connection_str = 'mysql://%s:%s@%s/%s' %(dbuser, dbpass, dbhost, dbname)
+        connection_str = 'postgresql://%s:%s@%s/%s' % db_options
         engine = create_engine(connection_str, echo=True)
+        self.con = engine.connect()
         
         params = {'blocksize': self.block_size,
                   'blockpath': basepath + '/blocks',
@@ -106,19 +103,17 @@ class ModularBackend(BaseBackend):
                   'namelen': self.blocker.hashlen}
         self.mapper = Mapper(**params)
         
-        params = {'connection': engine.connect(),
+        params = {'connection': self.con,
                   'engine': engine}
         self.permissions = Permissions(**params)
         self.policy = Policy(**params)
         self.node = Node(**params)
-        
-        self.con.commit()
     
     @backend_method
     def list_accounts(self, user, marker=None, limit=10000):
         """Return a list of accounts the user can access."""
         
-        logger.debug("list_accounts: %s %s", user, marker, limit)
+        logger.debug("list_accounts: %s %s %s", user, marker, limit)
         allowed = self._allowed_accounts(user)
         start, limit = self._list_limits(allowed, marker, limit)
         return allowed[start:start + limit]
@@ -369,6 +364,8 @@ class ModularBackend(BaseBackend):
         else:
             if shared:
                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
+                if not allowed:
+                    return []
         path, node = self._lookup_container(account, container)
         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
     
@@ -561,6 +558,7 @@ class ModularBackend(BaseBackend):
         
         logger.debug("list_versions: %s %s %s", account, container, name)
         self._can_read(user, account, container, name)
+        path, node = self._lookup_object(account, container, name)
         return self.node.node_get_versions(node, ['serial', 'mtime'])
     
     @backend_method(autocommit=0)
@@ -575,7 +573,7 @@ class ModularBackend(BaseBackend):
     
     @backend_method(autocommit=0)
     def put_block(self, data):
-        """Create a block and return the hash."""
+        """Store a block and return the hash."""
         
         logger.debug("put_block: %s", len(data))
         hashes, absent = self.blocker.block_stor((data,))
@@ -591,95 +589,6 @@ class ModularBackend(BaseBackend):
         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
         return binascii.hexlify(h)
     
-    def _check_policy(self, policy):
-        for k in policy.keys():
-            if policy[k] == '':
-                policy[k] = self.default_policy.get(k)
-        for k, v in policy.iteritems():
-            if k == 'quota':
-                q = int(v) # May raise ValueError.
-                if q < 0:
-                    raise ValueError
-            elif k == 'versioning':
-                if v not in ['auto', 'manual', 'none']:
-                    raise ValueError
-            else:
-                raise ValueError
-    
-    def _sql_until(self, parent, until=None):
-        """Return the sql to get the latest versions until the timestamp given."""
-        
-        if until is None:
-            until = time.time()
-        sql = ("select v.serial, n.path, v.mtime, v.size "
-               "from versions v, nodes n "
-               "where v.serial = (select max(serial) "
-                                 "from versions "
-                                 "where node = v.node and mtime < %s) "
-               "and v.cluster != %s "
-               "and v.node = n.node "
-               "and v.node in (select node "
-                              "from nodes "
-                              "where parent = %s)")
-        return sql % (until, CLUSTER_DELETED, parent)
-    
-    def _list_limits(self, listing, marker, limit):
-        start = 0
-        if marker:
-            try:
-                start = listing.index(marker) + 1
-            except ValueError:
-                pass
-        if not limit or limit > 10000:
-            limit = 10000
-        return start, limit
-    
-    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
-        cont_prefix = path + '/'
-        if keys and len(keys) > 0:
-#             sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
-#                         m.version_id = o.version_id and m.key in (%s)'''
-#             sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
-#             param = (cont_prefix + prefix + '%',) + tuple(keys)
-#             if allowed:
-#                 sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
-#                 param += tuple([x + '%' for x in allowed])
-#             sql += ' order by o.name'
-            return []
-        else:
-            sql = 'select path, serial from (%s) where path like ?'
-            sql = sql % self._sql_until(parent, until)
-            param = (cont_prefix + prefix + '%',)
-            if allowed:
-                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
-                param += tuple([x + '%' for x in allowed])
-            sql += ' order by path'
-        c = self.con.execute(sql, param)
-        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
-        if delimiter:
-            pseudo_objects = []
-            for x in objects:
-                pseudo_name = x[0]
-                i = pseudo_name.find(delimiter, len(prefix))
-                if not virtual:
-                    # If the delimiter is not found, or the name ends
-                    # with the delimiter's first occurence.
-                    if i == -1 or len(pseudo_name) == i + len(delimiter):
-                        pseudo_objects.append(x)
-                else:
-                    # If the delimiter is found, keep up to (and including) the delimiter.
-                    if i != -1:
-                        pseudo_name = pseudo_name[:i + len(delimiter)]
-                    if pseudo_name not in [y[0] for y in pseudo_objects]:
-                        if pseudo_name == x[0]:
-                            pseudo_objects.append(x)
-                        else:
-                            pseudo_objects.append((pseudo_name, None))
-            objects = pseudo_objects
-        
-        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
-        return objects[start:start + limit]
-    
     # Path functions.
     
     def _put_object_node(self, account, container, name):
@@ -801,6 +710,49 @@ class ModularBackend(BaseBackend):
         if copy_data and src_version_id is not None:
             self._copy_data(src_version_id, dest_version_id)
     
+    def _list_limits(self, listing, marker, limit):
+        start = 0
+        if marker:
+            try:
+                start = listing.index(marker) + 1
+            except ValueError:
+                pass
+        if not limit or limit > 10000:
+            limit = 10000
+        return start, limit
+    
+    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+        cont_prefix = path + '/'
+        prefix = cont_prefix + prefix
+        start = cont_prefix + marker if marker else None
+        before = until if until is not None else inf
+        filterq = ','.join(keys) if keys else None
+        
+        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
+        objects.extend([(p, None) for p in prefixes] if virtual else [])
+        objects.sort()
+        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
+        
+        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+        return objects[start:start + limit]
+    
+    # Policy functions.
+    
+    def _check_policy(self, policy):
+        for k in policy.keys():
+            if policy[k] == '':
+                policy[k] = self.default_policy.get(k)
+        for k, v in policy.iteritems():
+            if k == 'quota':
+                q = int(v) # May raise ValueError.
+                if q < 0:
+                    raise ValueError
+            elif k == 'versioning':
+                if v not in ['auto', 'manual', 'none']:
+                    raise ValueError
+            else:
+                raise ValueError
+    
     # Access control functions.
     
     def _check_groups(self, groups):
index d2f37d5..e1139f1 100644 (file)
@@ -65,10 +65,15 @@ DATABASES = {
 }
 
 # The backend to use and its initilization options.
+dbuser = ''
+dbpass = ''
+dbhost = ''
+dbname = ''
+db_options = (dbuser, dbpass, dbhost, dbname,)
 if TEST:
-    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/test/'),))
+    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/test/'), db_options,))
 else:
-    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos/'),))
+    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos/'), db_options,))
 
 # Local time zone for this installation. Choices can be found here:
 # http://en.wikipedia.org/wiki/List_of_tz_zones_by_name