Update settings.py and database (now a folder).
import time
import sqlite3
import logging
-import types
import hashlib
-import shutil
-import pickle
+import binascii
from base import NotAllowedError, BaseBackend
+from pithos.lib.hashfiler import Mapper, Blocker
logger = logging.getLogger(__name__)
Uses SQLite for storage.
"""
- # TODO: Automatic/manual clean-up after a time interval.
-
def __init__(self, db):
- self.hash_algorithm = 'sha1'
- self.block_size = 128 * 1024 # 128KB
+ self.hash_algorithm = 'sha256'
+ self.block_size = 4 * 1024 * 1024 # 4MB
self.default_policy = {'quota': 0, 'versioning': 'auto'}
basepath = os.path.split(db)[0]
if basepath and not os.path.exists(basepath):
os.makedirs(basepath)
+ if not os.path.isdir(basepath):
+ raise RuntimeError("Cannot open database at '%s'" % (db,))
- self.con = sqlite3.connect(db, check_same_thread=False)
+ self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
sql = '''pragma foreign_keys = on'''
self.con.execute(sql)
foreign key (version_id) references versions(version_id)
on delete cascade)'''
self.con.execute(sql)
- sql = '''create table if not exists hashmaps (
- version_id integer,
- pos integer,
- block_id text,
- primary key (version_id, pos)
- foreign key (version_id) references versions(version_id)
- on delete cascade)'''
- self.con.execute(sql)
- sql = '''create table if not exists blocks (
- block_id text, data blob, primary key (block_id))'''
- self.con.execute(sql)
sql = '''create table if not exists policy (
name text, key text, value text, primary key (name, key))'''
name text, primary key (name))'''
self.con.execute(sql)
self.con.commit()
+
+ params = {'blocksize': self.block_size,
+ 'blockpath': basepath + '/blocks',
+ 'hashtype': self.hash_algorithm}
+ self.blocker = Blocker(**params)
+
+ params = {'mappath': basepath + '/maps',
+ 'namelen': self.blocker.hashlen}
+ self.mapper = Mapper(**params)
def get_account_meta(self, user, account, until=None):
"""Return a dictionary with the account metadata."""
logger.debug("update_account_meta: %s %s %s", account, meta, replace)
if user != account:
raise NotAllowedError
- self._put_metadata(user, account, meta, replace)
+ self._put_metadata(user, account, meta, replace, False)
def get_account_groups(self, user, account):
"""Return a dictionary with the user groups defined for this account."""
if user != account:
raise NotAllowedError
path, version_id, mtime = self._get_containerinfo(account, container)
- self._put_metadata(user, path, meta, replace)
+ self._put_metadata(user, path, meta, replace, False)
def get_container_policy(self, user, account, container):
"""Return a dictionary with the container policy."""
self.con.execute(sql, (path, path + '/%',))
sql = 'delete from policy where name = ?'
self.con.execute(sql, (path,))
- self._copy_version(user, account, account, True, True) # New account version (for timestamp update).
+ self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
"""Return a list of objects existing under a container."""
logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
self._can_read(user, account, container, name)
path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
- sql = 'select block_id from hashmaps where version_id = ? order by pos asc'
- c = self.con.execute(sql, (version_id,))
- hashmap = [x[0] for x in c.fetchall()]
- return size, hashmap
+ hashmap = self.mapper.map_retr(version_id)
+ return size, [binascii.hexlify(x) for x in hashmap]
def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
- missing = []
- for i in range(len(hashmap)):
- sql = 'select count(*) from blocks where block_id = ?'
- c = self.con.execute(sql, (hashmap[i],))
- if c.fetchone()[0] == 0:
- missing.append(hashmap[i])
+ missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
if missing:
ie = IndexError()
ie.data = missing
src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
sql = 'update versions set size = ? where version_id = ?'
self.con.execute(sql, (size, dest_version_id))
- # TODO: Check for block_id existence.
- for i in range(len(hashmap)):
- sql = 'insert or replace into hashmaps (version_id, pos, block_id) values (?, ?, ?)'
- self.con.execute(sql, (dest_version_id, i, hashmap[i]))
+ self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
for k, v in meta.iteritems():
sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
self.con.execute(sql, (dest_version_id, k, v))
"""Return a block's data."""
logger.debug("get_block: %s", hash)
- c = self.con.execute('select data from blocks where block_id = ?', (hash,))
- row = c.fetchone()
- if row:
- return str(row[0])
- else:
+ blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
+ if not blocks:
raise NameError('Block does not exist')
+ return blocks[0]
def put_block(self, data):
"""Create a block and return the hash."""
logger.debug("put_block: %s", len(data))
- h = hashlib.new(self.hash_algorithm)
- h.update(data.rstrip('\x00'))
- hash = h.hexdigest()
- sql = 'insert or ignore into blocks (block_id, data) values (?, ?)'
- self.con.execute(sql, (hash, buffer(data)))
- self.con.commit()
- return hash
+ hashes, absent = self.blocker.block_stor((data,))
+ return binascii.hexlify(hashes[0])
def update_block(self, hash, data, offset=0):
"""Update a known block and return the hash."""
logger.debug("update_block: %s %s %s", hash, len(data), offset)
if offset == 0 and len(data) == self.block_size:
return self.put_block(data)
- src_data = self.get_block(hash)
- bs = self.block_size
- if offset < 0 or offset > bs or offset + len(data) > bs:
- raise IndexError('Offset or data outside block limits')
- dest_data = src_data[:offset] + data + src_data[offset + len(data):]
- return self.put_block(dest_data)
+ h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+ return binascii.hexlify(h)
def _sql_until(self, until=None):
"""Return the sql to get the latest versions until the timestamp given."""
sql = sql % dest_version_id
self.con.execute(sql, (src_version_id,))
if copy_data and src_version_id is not None:
- sql = 'insert into hashmaps select %s, pos, block_id from hashmaps where version_id = ?'
- sql = sql % dest_version_id
- self.con.execute(sql, (src_version_id,))
+ # TODO: Copy properly.
+ hashmap = self.mapper.map_retr(src_version_id)
+ self.mapper.map_stor(dest_version_id, hashmap)
self.con.commit()
return src_version_id, dest_version_id
c = self.con.execute(sql, (version,))
return dict(c.fetchall())
- def _put_metadata(self, user, path, meta, replace=False):
+ def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
"""Create a new version and store metadata."""
- src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, True)
+ src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
for k, v in meta.iteritems():
if not replace and v == '':
sql = 'delete from metadata where version_id = ? and key = ?'
return objects[start:start + limit]
def _del_version(self, version):
- sql = 'delete from hashmaps where version_id = ?'
- self.con.execute(sql, (version,))
+ self.mapper.map_remv(version)
sql = 'delete from versions where version_id = ?'
self.con.execute(sql, (version,))
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from pithos.lib.hashfiler.context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+ """Blocker.
+ Required contstructor parameters: blocksize, blockpath, hashtype.
+ """
+
+ blocksize = None
+ blockpath = None
+ hashtype = None
+
+ def __init__(self, **params):
+ blocksize = params['blocksize']
+ blockpath = params['blockpath']
+ blockpath = realpath(blockpath)
+ if not isdir(blockpath):
+ if not exists(blockpath):
+ makedirs(blockpath)
+ else:
+ raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+ hashtype = params['hashtype']
+ try:
+ hasher = newhasher(hashtype)
+ except ValueError:
+ msg = "Variable hashtype '%s' is not available from hashlib"
+ raise ValueError(msg % (hashtype,))
+
+ hasher.update("")
+ emptyhash = hasher.digest()
+
+ self.blocksize = blocksize
+ self.blockpath = blockpath
+ self.hashtype = hashtype
+ self.hashlen = len(emptyhash)
+ self.emptyhash = emptyhash
+
+ def get_rear_block(self, blkhash, create=0):
+ name = join(self.blockpath, hexlify(blkhash))
+ return ContextFile(name, create)
+
+ def check_rear_block(self, blkhash):
+ name = join(self.blockpath, hexlify(blkhash))
+ return exists(name)
+
+ def block_hash(self, data):
+ """Hash a block of data"""
+ hasher = newhasher(self.hashtype)
+ hasher.update(data.rstrip('\x00'))
+ return hasher.digest()
+
+ def block_ping(self, hashes):
+ """Check hashes for existence and
+ return those missing from block storage.
+ """
+ missing = []
+ append = missing.append
+ for i, h in enumerate(hashes):
+ if not self.check_rear_block(h):
+ append(i)
+ return missing
+
+ def block_retr(self, hashes):
+ """Retrieve blocks from storage by their hashes."""
+ blocksize = self.blocksize
+ blocks = []
+ append = blocks.append
+ block = None
+
+ for h in hashes:
+ with self.get_rear_block(h, 0) as rbl:
+ if not rbl:
+ break
+ for block in rbl.sync_read_chunks(blocksize, 1, 0):
+ break # there should be just one block there
+ if not block:
+ break
+ append(block)
+
+ return blocks
+
+ def block_stor(self, blocklist):
+ """Store a bunch of blocks and return (hashes, missing).
+ Hashes is a list of the hashes of the blocks,
+ missing is a list of indices in that list indicating
+ which blocks were missing from the store.
+ """
+ block_hash = self.block_hash
+ hashlist = [block_hash(b) for b in blocklist]
+ mf = None
+ missing = self.block_ping(hashlist)
+ for i in missing:
+ with self.get_rear_block(hashlist[i], 1) as rbl:
+ rbl.sync_write(blocklist[i]) #XXX: verify?
+
+ return hashlist, missing
+
+ def block_delta(self, blkhash, offdata=()):
+ """Construct and store a new block from a given block
+ and a list of (offset, data) 'patches'. Return:
+ (the hash of the new block, if the block already existed)
+ """
+ if not offdata:
+ return None, None
+
+ blocksize = self.blocksize
+ block = self.block_retr((blkhash,))
+ if not block:
+ return None, None
+
+ block = block[0]
+ newblock = ''
+ idx = 0
+ size = 0
+ trunc = 0
+ for off, data in offdata:
+ if not data:
+ trunc = 1
+ break
+ newblock += block[idx:off] + data
+ size += off - idx + len(data)
+ if size >= blocksize:
+ break
+ off = size
+
+ if not trunc:
+ newblock += block[size:len(block)]
+
+ h, a = self.block_stor((newblock,))
+ return h[0], 1 if a else 0
+
+ def block_hash_file(self, openfile):
+ """Return the list of hashes (hashes map)
+ for the blocks in a buffered file.
+ Helper method, does not affect store.
+ """
+ hashes = []
+ append = hashes.append
+ block_hash = self.block_hash
+
+ for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+ append(block_hash(block))
+
+ return hashes
+
+ def block_stor_file(self, openfile):
+ """Read blocks from buffered file object and store them. Return:
+ (bytes read, list of hashes, list of hashes that were missing)
+ """
+ blocksize = self.blocksize
+ block_stor = self.block_stor
+ hashlist = []
+ hextend = hashlist.extend
+ storedlist = []
+ sextend = storedlist.extend
+ lastsize = 0
+
+ for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+ hl, sl = block_stor((block,))
+ hextend(hl)
+ sextend(sl)
+ lastsize = len(block)
+
+ size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+ return size, hashlist, storedlist
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+ global _zeros
+ size = len(_zeros)
+ if nr == size:
+ return _zeros
+
+ if nr > size:
+ _zeros += '\0' * (nr - size)
+ return _zeros
+
+ if nr < size:
+ _zeros = _zeros[:nr]
+ return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+ """Write given chunks to the given buffered file object.
+ Writes never span across chunk boundaries.
+ If size is given stop after or pad until size bytes have been written.
+ """
+ fwrite = openfile.write
+ seek = openfile.seek
+ padding = 0
+
+ try:
+ seek(offset * chunksize)
+ except IOError, e:
+ seek = None
+ for x in xrange(offset):
+ fwrite(zeros(chunksize))
+
+ cursize = offset * chunksize
+
+ for chunk in chunks:
+ if padding:
+ if seek:
+ seek(padding -1, SEEK_CUR)
+ fwrite("\x00")
+ else:
+ fwrite(buffer(zeros(chunksize), 0, padding))
+ if size is not None and cursize + chunksize >= size:
+ chunk = chunk[:chunksize - (cursize - size)]
+ fwrite(chunk)
+ cursize += len(chunk)
+ break
+ fwrite(chunk)
+ padding = chunksize - len(chunk)
+
+ padding = size - cursize if size is not None else 0
+ if padding <= 0:
+ return
+
+ q, r = divmod(padding, chunksize)
+ for x in xrange(q):
+ fwrite(zeros(chunksize))
+ fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+ """Read and yield groups of chunks from a buffered file object at offset.
+ Reads never span accros chunksize boundaries.
+ """
+ fread = openfile.read
+ remains = offset * chunksize
+ seek = openfile.seek
+ try:
+ seek(remains)
+ except IOError, e:
+ seek = None
+ while 1:
+ s = fread(remains)
+ remains -= len(s)
+ if remains <= 0:
+ break
+
+ while nr:
+ remains = chunksize
+ chunk = ''
+ while 1:
+ s = fread(remains)
+ if not s:
+ if chunk:
+ yield chunk
+ return
+ chunk += s
+ remains -= len(s)
+ if remains <= 0:
+ break
+ yield chunk
+ nr -= 1
+
+
+class ContextFile(object):
+ __slots__ = ("name", "fdesc", "create")
+
+ def __init__(self, name, create=0):
+ self.name = name
+ self.fdesc = None
+ self.create = create
+ #self.dirty = 0
+
+ def __enter__(self):
+ name = self.name
+ try:
+ fdesc = open(name, 'rb+')
+ except IOError, e:
+ if not self.create or e.errno != ENOENT:
+ raise
+ fdesc = open(name, 'w+')
+
+ self.fdesc = fdesc
+ return self
+
+ def __exit__(self, exc, arg, trace):
+ fdesc = self.fdesc
+ if fdesc is not None:
+ #if self.dirty:
+ # fsync(fdesc.fileno())
+ fdesc.close()
+ return False # propagate exceptions
+
+ def seek(self, offset, whence=SEEK_SET):
+ return self.fdesc.seek(offset, whence)
+
+ def tell(self):
+ return self.fdesc.tell()
+
+ def truncate(self, size):
+ self.fdesc.truncate(size)
+
+ def sync_write(self, data):
+ #self.dirty = 1
+ self.fdesc.write(data)
+
+ def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+ #self.dirty = 1
+ return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+ def sync_read(self, size):
+ read = self.fdesc.read
+ data = ''
+ while 1:
+ s = read(size)
+ if not s:
+ break
+ data += s
+ return data
+
+ def sync_read_chunks(self, chunksize, nr, offset=0):
+ return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from pithos.lib.hashfiler.context_file import ContextFile
+
+
+class Mapper(object):
+ """Mapper.
+ Required contstructor parameters: mappath, namelen.
+ """
+
+ mappath = None
+ namelen = None
+
+ def __init__(self, **params):
+ self.params = params
+ self.namelen = params['namelen']
+ mappath = realpath(params['mappath'])
+ if not isdir(mappath):
+ if not exists(mappath):
+ makedirs(mappath)
+ else:
+ raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+ self.mappath = mappath
+
+ def get_rear_map(self, name, create=0):
+ name = join(self.mappath, hex(int(name)))
+ return ContextFile(name, create)
+
+ def delete_rear_map(self, name):
+ name = join(self.mappath, hex(int(name)))
+ try:
+ unlink(name)
+ return 1
+ except OSError, e:
+ if e.errno != ENOENT:
+ raise
+ return 0
+
+ def map_retr(self, name, blkoff=0, nr=100000000000000):
+ """Return as a list, part of the hashes map of an object
+ at the given block offset.
+ By default, return the whole hashes map.
+ """
+ namelen = self.namelen
+ hashes = ()
+
+ with self.get_rear_map(name, 0) as rmap:
+ if rmap:
+ hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+ return hashes
+
+ def map_stor(self, name, hashes=(), blkoff=0, create=1):
+ """Store hashes in the given hashes map, replacing the old ones."""
+ namelen = self.namelen
+ with self.get_rear_map(name, 1) as rmap:
+ rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+# def map_copy(self, src, dst):
+# """Copy a hashes map to another one, replacing it."""
+# with self.get_rear_map(src, 0) as rmap:
+# if rmap:
+# rmap.copy_to(dst)
+
+ def map_remv(self, name):
+ """Remove a hashes map. Returns true if the map was found and removed."""
+ return self.delete_rear_map(name)
+
# The backend to use and its initilization options.
if TEST:
- BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/testpithos.db'),))
+ BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/test/'),))
else:
- BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos.db'),))
+ BACKEND = ('SimpleBackend', (os.path.join(PROJECT_PATH, 'data/pithos/'),))
# Local time zone for this installation. Choices can be found here:
# http://en.wikipedia.org/wiki/List_of_tz_zones_by_name