Use Mapper and Blocker from the hashfiler lib to store hashmaps and hashes.
authorAntony Chazapis <chazapis@gmail.com>
Sat, 16 Jul 2011 18:03:38 +0000 (21:03 +0300)
committerAntony Chazapis <chazapis@gmail.com>
Sat, 16 Jul 2011 18:03:38 +0000 (21:03 +0300)
Update settings.py and database (now a folder).

pithos/backends/simple.py
pithos/lib/hashfiler/__init__.py [new file with mode: 0644]
pithos/lib/hashfiler/blocker.py [new file with mode: 0644]
pithos/lib/hashfiler/context_file.py [new file with mode: 0644]
pithos/lib/hashfiler/mapper.py [new file with mode: 0644]
pithos/settings.py.dist

index 44a944b..a0f898d 100644 (file)
@@ -35,12 +35,11 @@ import os
 import time
 import sqlite3
 import logging
-import types
 import hashlib
-import shutil
-import pickle
+import binascii
 
 from base import NotAllowedError, BaseBackend
+from pithos.lib.hashfiler import Mapper, Blocker
 
 
 logger = logging.getLogger(__name__)
@@ -52,19 +51,19 @@ class SimpleBackend(BaseBackend):
     Uses SQLite for storage.
     """
     
-    # TODO: Automatic/manual clean-up after a time interval.
-    
     def __init__(self, db):
-        self.hash_algorithm = 'sha1'
-        self.block_size = 128 * 1024 # 128KB
+        self.hash_algorithm = 'sha256'
+        self.block_size = 4 * 1024 * 1024 # 4MB
         
         self.default_policy = {'quota': 0, 'versioning': 'auto'}
         
         basepath = os.path.split(db)[0]
         if basepath and not os.path.exists(basepath):
             os.makedirs(basepath)
+        if not os.path.isdir(basepath):
+            raise RuntimeError("Cannot open database at '%s'" % (db,))
         
-        self.con = sqlite3.connect(db, check_same_thread=False)
+        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
         
         sql = '''pragma foreign_keys = on'''
         self.con.execute(sql)
@@ -85,17 +84,6 @@ class SimpleBackend(BaseBackend):
                     foreign key (version_id) references versions(version_id)
                     on delete cascade)'''
         self.con.execute(sql)
-        sql = '''create table if not exists hashmaps (
-                    version_id integer,
-                    pos integer,
-                    block_id text,
-                    primary key (version_id, pos)
-                    foreign key (version_id) references versions(version_id)
-                    on delete cascade)'''
-        self.con.execute(sql)
-        sql = '''create table if not exists blocks (
-                    block_id text, data blob, primary key (block_id))'''
-        self.con.execute(sql)
         
         sql = '''create table if not exists policy (
                     name text, key text, value text, primary key (name, key))'''
@@ -111,6 +99,15 @@ class SimpleBackend(BaseBackend):
                     name text, primary key (name))'''
         self.con.execute(sql)
         self.con.commit()
+        
+        params = {'blocksize': self.block_size,
+                  'blockpath': basepath + '/blocks',
+                  'hashtype': self.hash_algorithm}
+        self.blocker = Blocker(**params)
+        
+        params = {'mappath': basepath + '/maps',
+                  'namelen': self.blocker.hashlen}
+        self.mapper = Mapper(**params)
     
     def get_account_meta(self, user, account, until=None):
         """Return a dictionary with the account metadata."""
@@ -154,7 +151,7 @@ class SimpleBackend(BaseBackend):
         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
         if user != account:
             raise NotAllowedError
-        self._put_metadata(user, account, meta, replace)
+        self._put_metadata(user, account, meta, replace, False)
     
     def get_account_groups(self, user, account):
         """Return a dictionary with the user groups defined for this account."""
@@ -254,7 +251,7 @@ class SimpleBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, version_id, mtime = self._get_containerinfo(account, container)
-        self._put_metadata(user, path, meta, replace)
+        self._put_metadata(user, path, meta, replace, False)
     
     def get_container_policy(self, user, account, container):
         """Return a dictionary with the container policy."""
@@ -329,7 +326,7 @@ class SimpleBackend(BaseBackend):
         self.con.execute(sql, (path, path + '/%',))
         sql = 'delete from policy where name = ?'
         self.con.execute(sql, (path,))
-        self._copy_version(user, account, account, True, True) # New account version (for timestamp update).
+        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
     
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
         """Return a list of objects existing under a container."""
@@ -421,10 +418,8 @@ class SimpleBackend(BaseBackend):
         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
         self._can_read(user, account, container, name)
         path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
-        sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
-        c = self.con.execute(sql, (version_id,))
-        hashmap = [x[0] for x in c.fetchall()]
-        return size, hashmap
+        hashmap = self.mapper.map_retr(version_id)
+        return size, [binascii.hexlify(x) for x in hashmap]
     
     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
         """Create/update an object with the specified size and partial hashes."""
@@ -433,12 +428,7 @@ class SimpleBackend(BaseBackend):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
-        missing = []
-        for i in range(len(hashmap)):
-            sql = 'select count(*) from blocks where block_id = ?'
-            c = self.con.execute(sql, (hashmap[i],))
-            if c.fetchone()[0] == 0:
-                missing.append(hashmap[i])
+        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
         if missing:
             ie = IndexError()
             ie.data = missing
@@ -450,10 +440,7 @@ class SimpleBackend(BaseBackend):
         src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
         sql = 'update versions set size = ? where version_id = ?'
         self.con.execute(sql, (size, dest_version_id))
-        # TODO: Check for block_id existence.
-        for i in range(len(hashmap)):
-            sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
-            self.con.execute(sql, (dest_version_id, i, hashmap[i]))
+        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
         for k, v in meta.iteritems():
             sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
             self.con.execute(sql, (dest_version_id, k, v))
@@ -536,24 +523,17 @@ class SimpleBackend(BaseBackend):
         """Return a block's data."""
         
         logger.debug("get_block: %s", hash)
-        c = self.con.execute('select data from blocks where block_id = ?', (hash,))
-        row = c.fetchone()
-        if row:
-            return str(row[0])
-        else:
+        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
+        if not blocks:
             raise NameError('Block does not exist')
+        return blocks[0]
     
     def put_block(self, data):
         """Create a block and return the hash."""
         
         logger.debug("put_block: %s", len(data))
-        h = hashlib.new(self.hash_algorithm)
-        h.update(data.rstrip('\x00'))
-        hash = h.hexdigest()
-        sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
-        self.con.execute(sql, (hash, buffer(data)))
-        self.con.commit()
-        return hash
+        hashes, absent = self.blocker.block_stor((data,))
+        return binascii.hexlify(hashes[0])
     
     def update_block(self, hash, data, offset=0):
         """Update a known block and return the hash."""
@@ -561,12 +541,8 @@ class SimpleBackend(BaseBackend):
         logger.debug("update_block: %s %s %s", hash, len(data), offset)
         if offset == 0 and len(data) == self.block_size:
             return self.put_block(data)
-        src_data = self.get_block(hash)
-        bs = self.block_size
-        if offset < 0 or offset > bs or offset + len(data) > bs:
-            raise IndexError('Offset or data outside block limits')
-        dest_data = src_data[:offset] + data + src_data[offset + len(data):]
-        return self.put_block(dest_data)
+        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+        return binascii.hexlify(h)
     
     def _sql_until(self, until=None):
         """Return the sql to get the latest versions until the timestamp given."""
@@ -631,9 +607,9 @@ class SimpleBackend(BaseBackend):
             sql = sql % dest_version_id
             self.con.execute(sql, (src_version_id,))
         if copy_data and src_version_id is not None:
-            sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
-            sql = sql % dest_version_id
-            self.con.execute(sql, (src_version_id,))
+            # TODO: Copy properly.
+            hashmap = self.mapper.map_retr(src_version_id)
+            self.mapper.map_stor(dest_version_id, hashmap)
         self.con.commit()
         return src_version_id, dest_version_id
     
@@ -678,10 +654,10 @@ class SimpleBackend(BaseBackend):
         c = self.con.execute(sql, (version,))
         return dict(c.fetchall())
     
-    def _put_metadata(self, user, path, meta, replace=False):
+    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
         """Create a new version and store metadata."""
         
-        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, True)
+        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
         for k, v in meta.iteritems():
             if not replace and v == '':
                 sql = 'delete from metadata where version_id = ? and key = ?'
@@ -865,7 +841,6 @@ class SimpleBackend(BaseBackend):
         return objects[start:start + limit]
     
     def _del_version(self, version):
-        sql = 'delete from hashmaps where version_id = ?'
-        self.con.execute(sql, (version,))
+        self.mapper.map_remv(version)
         sql = 'delete from versions where version_id = ?'
         self.con.execute(sql, (version,))
diff --git a/pithos/lib/hashfiler/__init__.py b/pithos/lib/hashfiler/__init__.py
new file mode 100644 (file)
index 0000000..aa3b929
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
diff --git a/pithos/lib/hashfiler/blocker.py b/pithos/lib/hashfiler/blocker.py
new file mode 100644 (file)
index 0000000..6010009
--- /dev/null
@@ -0,0 +1,205 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from pithos.lib.hashfiler.context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+    """Blocker.
+       Required contstructor parameters: blocksize, blockpath, hashtype.
+    """
+
+    blocksize = None
+    blockpath = None
+    hashtype = None
+
+    def __init__(self, **params):
+        blocksize = params['blocksize']
+        blockpath = params['blockpath']
+        blockpath = realpath(blockpath)
+        if not isdir(blockpath):
+            if not exists(blockpath):
+                makedirs(blockpath)
+            else:
+                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+        hashtype = params['hashtype']
+        try:
+            hasher = newhasher(hashtype)
+        except ValueError:
+            msg = "Variable hashtype '%s' is not available from hashlib"
+            raise ValueError(msg % (hashtype,))
+
+        hasher.update("")
+        emptyhash = hasher.digest()
+
+        self.blocksize = blocksize
+        self.blockpath = blockpath
+        self.hashtype = hashtype
+        self.hashlen = len(emptyhash)
+        self.emptyhash = emptyhash
+
+    def get_rear_block(self, blkhash, create=0):
+        name = join(self.blockpath, hexlify(blkhash))
+        return ContextFile(name, create)
+
+    def check_rear_block(self, blkhash):
+        name = join(self.blockpath, hexlify(blkhash))
+        return exists(name)
+
+    def block_hash(self, data):
+        """Hash a block of data"""
+        hasher = newhasher(self.hashtype)
+        hasher.update(data.rstrip('\x00'))
+        return hasher.digest()
+
+    def block_ping(self, hashes):
+        """Check hashes for existence and
+           return those missing from block storage.
+        """
+        missing = []
+        append = missing.append
+        for i, h in enumerate(hashes):
+            if not self.check_rear_block(h):
+                append(i)
+        return missing
+
+    def block_retr(self, hashes):
+        """Retrieve blocks from storage by their hashes."""
+        blocksize = self.blocksize
+        blocks = []
+        append = blocks.append
+        block = None
+
+        for h in hashes:
+            with self.get_rear_block(h, 0) as rbl:
+                if not rbl:
+                    break
+                for block in rbl.sync_read_chunks(blocksize, 1, 0):
+                    break # there should be just one block there
+            if not block:
+                break
+            append(block)
+
+        return blocks
+
+    def block_stor(self, blocklist):
+        """Store a bunch of blocks and return (hashes, missing).
+           Hashes is a list of the hashes of the blocks,
+           missing is a list of indices in that list indicating
+           which blocks were missing from the store.
+        """
+        block_hash = self.block_hash
+        hashlist = [block_hash(b) for b in blocklist]
+        mf = None
+        missing = self.block_ping(hashlist)
+        for i in missing:
+            with self.get_rear_block(hashlist[i], 1) as rbl:
+                 rbl.sync_write(blocklist[i]) #XXX: verify?
+
+        return hashlist, missing
+
+    def block_delta(self, blkhash, offdata=()):
+        """Construct and store a new block from a given block
+           and a list of (offset, data) 'patches'. Return:
+           (the hash of the new block, if the block already existed)
+        """
+        if not offdata:
+            return None, None
+
+        blocksize = self.blocksize
+        block = self.block_retr((blkhash,))
+        if not block:
+            return None, None
+
+        block = block[0]
+        newblock = ''
+        idx = 0
+        size = 0
+        trunc = 0
+        for off, data in offdata:
+            if not data:
+                trunc = 1
+                break
+            newblock += block[idx:off] + data
+            size += off - idx + len(data)
+            if size >= blocksize:
+                break
+            off = size
+
+        if not trunc:
+            newblock += block[size:len(block)]
+
+        h, a = self.block_stor((newblock,))
+        return h[0], 1 if a else 0
+
+    def block_hash_file(self, openfile):
+        """Return the list of hashes (hashes map)
+           for the blocks in a buffered file.
+           Helper method, does not affect store.
+        """
+        hashes = []
+        append = hashes.append
+        block_hash = self.block_hash
+
+        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+            append(block_hash(block))
+
+        return hashes
+
+    def block_stor_file(self, openfile):
+        """Read blocks from buffered file object and store them. Return:
+           (bytes read, list of hashes, list of hashes that were missing)
+        """
+        blocksize = self.blocksize
+        block_stor = self.block_stor
+        hashlist = []
+        hextend = hashlist.extend
+        storedlist = []
+        sextend = storedlist.extend
+        lastsize = 0
+
+        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+            hl, sl = block_stor((block,))
+            hextend(hl)
+            sextend(sl)
+            lastsize = len(block)
+
+        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+        return size, hashlist, storedlist
+
diff --git a/pithos/lib/hashfiler/context_file.py b/pithos/lib/hashfiler/context_file.py
new file mode 100644 (file)
index 0000000..0a16b2a
--- /dev/null
@@ -0,0 +1,191 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+    global _zeros
+    size = len(_zeros)
+    if nr == size:
+        return _zeros
+
+    if nr > size:
+        _zeros += '\0' * (nr - size)
+        return _zeros
+
+    if nr < size:
+        _zeros = _zeros[:nr]
+        return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+    """Write given chunks to the given buffered file object.
+       Writes never span across chunk boundaries.
+       If size is given stop after or pad until size bytes have been written.
+    """
+    fwrite = openfile.write
+    seek = openfile.seek
+    padding = 0
+
+    try:
+        seek(offset * chunksize)
+    except IOError, e:
+        seek = None
+        for x in xrange(offset):
+            fwrite(zeros(chunksize))
+
+    cursize = offset * chunksize
+
+    for chunk in chunks:
+        if padding:
+            if seek:
+                seek(padding -1, SEEK_CUR)
+                fwrite("\x00")
+            else:
+                fwrite(buffer(zeros(chunksize), 0, padding))
+        if size is not None and cursize + chunksize >= size:
+            chunk = chunk[:chunksize - (cursize - size)]
+            fwrite(chunk)
+            cursize += len(chunk)
+            break
+        fwrite(chunk)
+        padding = chunksize - len(chunk)
+
+    padding = size - cursize if size is not None else 0
+    if padding <= 0:
+        return
+
+    q, r = divmod(padding, chunksize)
+    for x in xrange(q):
+        fwrite(zeros(chunksize))
+    fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+    """Read and yield groups of chunks from a buffered file object at offset.
+       Reads never span accros chunksize boundaries.
+    """
+    fread = openfile.read
+    remains = offset * chunksize
+    seek = openfile.seek
+    try:
+        seek(remains)
+    except IOError, e:
+        seek = None
+        while 1:
+            s = fread(remains)
+            remains -= len(s)
+            if remains <= 0:
+                break
+
+    while nr:
+        remains = chunksize
+        chunk = ''
+        while 1:
+            s = fread(remains)
+            if not s:
+                if chunk:
+                    yield chunk
+                return
+            chunk += s
+            remains -= len(s)
+            if remains <= 0:
+                break
+        yield chunk
+        nr -= 1
+
+
+class ContextFile(object):
+    __slots__ = ("name", "fdesc", "create")
+
+    def __init__(self, name, create=0):
+        self.name = name
+        self.fdesc = None
+        self.create = create
+        #self.dirty = 0
+
+    def __enter__(self):
+        name = self.name
+        try:
+            fdesc = open(name, 'rb+')
+        except IOError, e:
+            if not self.create or e.errno != ENOENT:
+                raise
+            fdesc = open(name, 'w+')
+
+        self.fdesc = fdesc
+        return self
+
+    def __exit__(self, exc, arg, trace):
+        fdesc = self.fdesc
+        if fdesc is not None:
+            #if self.dirty:
+            #    fsync(fdesc.fileno())
+            fdesc.close()
+        return False # propagate exceptions
+
+    def seek(self, offset, whence=SEEK_SET):
+        return self.fdesc.seek(offset, whence)
+
+    def tell(self):
+        return self.fdesc.tell()
+
+    def truncate(self, size):
+        self.fdesc.truncate(size)
+
+    def sync_write(self, data):
+        #self.dirty = 1
+        self.fdesc.write(data)
+
+    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+        #self.dirty = 1
+        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+    def sync_read(self, size):
+        read = self.fdesc.read
+        data = ''
+        while 1:
+            s = read(size)
+            if not s:
+                break
+            data += s
+        return data
+
+    def sync_read_chunks(self, chunksize, nr, offset=0):
+        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
diff --git a/pithos/lib/hashfiler/mapper.py b/pithos/lib/hashfiler/mapper.py
new file mode 100644 (file)
index 0000000..eebfd69
--- /dev/null
@@ -0,0 +1,102 @@
+# Copyright 2011 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from pithos.lib.hashfiler.context_file import ContextFile
+
+
+class Mapper(object):
+    """Mapper.
+       Required contstructor parameters: mappath, namelen.
+    """
+    
+    mappath = None
+    namelen = None
+
+    def __init__(self, **params):
+        self.params = params
+        self.namelen = params['namelen']
+        mappath = realpath(params['mappath'])
+        if not isdir(mappath):
+            if not exists(mappath):
+                makedirs(mappath)
+            else:
+                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+        self.mappath = mappath
+
+    def get_rear_map(self, name, create=0):
+        name = join(self.mappath, hex(int(name)))
+        return ContextFile(name, create)
+
+    def delete_rear_map(self, name):
+        name = join(self.mappath, hex(int(name)))
+        try:
+            unlink(name)
+            return 1
+        except OSError, e:
+            if e.errno != ENOENT:
+                raise
+        return 0
+
+    def map_retr(self, name, blkoff=0, nr=100000000000000):
+        """Return as a list, part of the hashes map of an object
+           at the given block offset.
+           By default, return the whole hashes map.
+        """
+        namelen = self.namelen
+        hashes = ()
+
+        with self.get_rear_map(name, 0) as rmap:
+            if rmap:
+                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+        return hashes
+
+    def map_stor(self, name, hashes=(), blkoff=0, create=1):
+        """Store hashes in the given hashes map, replacing the old ones."""
+        namelen = self.namelen
+        with self.get_rear_map(name, 1) as rmap:
+            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+#     def map_copy(self, src, dst):
+#         """Copy a hashes map to another one, replacing it."""
+#         with self.get_rear_map(src, 0) as rmap:
+#             if rmap:
+#                 rmap.copy_to(dst)
+
+    def map_remv(self, name):
+        """Remove a hashes map. Returns true if the map was found and removed."""
+        return self.delete_rear_map(name)
+
index 4c56988..4fb92bc 100644 (file)
@@ -66,9 +66,9 @@ DATABASES = {
 
 # The backend to use and its initilization options.
 if TEST:
-    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/testpithos.db'),))
+    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/test/'),))
 else:
-    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos.db'),))
+    BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos/'),))
 
 # Local time zone for this installation. Choices can be found here:
 # http://en.wikipedia.org/wiki/List_of_tz_zones_by_name