on update cascade
on delete cascade ) """)
execute(""" create index if not exists idx_versions_node
- on nodes(node) """)
+ on versions(node) """)
# TODO: Sort out if more indexes are needed.
# execute(""" create index if not exists idx_versions_mtime
- # on nodes(mtime) """)
+ # on versions(mtime) """)
execute(""" create table if not exists attributes
( serial integer,
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+
+class DBWorker(object):
+ """Database connection handler."""
+
+ def __init__(self, **params):
+ self.params = params
+ self.conn = params['connection']
+ self.engine = params['engine']
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from collections import defaultdict
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select, and_
+from sqlalchemy.schema import Index
+from dbworker import DBWorker
+
+class Groups(DBWorker):
+ """Groups are named collections of members, belonging to an owner."""
+
+ def __init__(self, **params):
+ DBWorker.__init__(self, **params)
+ metadata = MetaData()
+ columns=[]
+ columns.append(Column('owner', String(255), primary_key=True))
+ columns.append(Column('name', String(255), primary_key=True))
+ columns.append(Column('member', String(255), primary_key=True))
+ self.groups = Table('groups', metadata, *columns)
+
+ # place an index on member
+ Index('idx_groups_member', self.groups.c.member)
+
+ metadata.create_all(self.engine)
+ metadata.bind = self.engine
+
+ def group_names(self, owner):
+ """List all group names belonging to owner."""
+
+ s = select([self.groups.c.name],
+ self.groups.c.owner==owner).distinct()
+ r = self.conn.execute(s)
+ l = [row[0] for row in r.fetchall()]
+ r.close()
+ return l
+
+ def group_dict(self, owner):
+ """Return a dict mapping group names to member lists for owner."""
+
+ s = select([self.groups.c.name, self.groups.c.member],
+ self.groups.c.owner==owner)
+ r = self.conn.execute(s)
+ d = defaultdict(list)
+ for group, member in r.fetchall():
+ d[group].append(member)
+ r.close()
+ return d
+
+ def group_add(self, owner, group, member):
+ """Add a member to a group."""
+
+ s = self.groups.insert()
+ r = self.conn.execute(s, owner=owner, name=group, member=member)
+ r.close()
+
+ def group_addmany(self, owner, group, members):
+ """Add members to a group."""
+
+ s = self.groups.insert()
+ values = [{'owner':owner, 'name':group, 'member':member} for member in members]
+ r = self.conn.execute(s, values)
+ r.close()
+
+ def group_remove(self, owner, group, member):
+ """Remove a member from a group."""
+
+ s = self.groups.delete().where(and_(self.groups.c.owner==owner,
+ self.groups.c.name==group,
+ self.groups.c.member==member))
+ r = self.conn.execute(s)
+ r.close()
+
+ def group_delete(self, owner, group):
+ """Delete a group."""
+
+ s = self.groups.delete().where(and_(self.groups.c.owner==owner,
+ self.groups.c.name==group))
+ r = self.conn.execute(s)
+ r.close()
+
+ def group_destroy(self, owner):
+ """Delete all groups belonging to owner."""
+
+ s = self.groups.delete().where(self.groups.c.owner==owner)
+ r = self.conn.execute(s)
+ r.close()
+
+ def group_members(self, owner, group):
+ """Return the list of members of a group."""
+
+ s = select([self.groups.c.member], and_(self.groups.c.owner==owner,
+ self.groups.c.name==group))
+ r = self.conn.execute(s)
+ l = [row[0] for row in r.fetchall()]
+ r.close()
+ return l
+
+ def group_check(self, owner, group, member):
+ """Check if a member is in a group."""
+
+ s = select([self.groups.c.member], and_(self.groups.c.owner==owner,
+ self.groups.c.name==group,
+ self.groups.c.member==member))
+ r = self.conn.execute(s)
+ l = r.fetchone()
+ r.close()
+ return bool(l)
+
+ def group_parents(self, member):
+ """Return all (owner, group) tuples that contain member."""
+
+ s = select([self.groups.c.owner, self.groups.c.name],
+ self.groups.c.member==member)
+ r = self.conn.execute(s)
+ l = r.fetchall()
+ r.close()
+ return l
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+ """Blocker.
+ Required contstructor parameters: blocksize, blockpath, hashtype.
+ """
+
+ blocksize = None
+ blockpath = None
+ hashtype = None
+
+ def __init__(self, **params):
+ blocksize = params['blocksize']
+ blockpath = params['blockpath']
+ blockpath = realpath(blockpath)
+ if not isdir(blockpath):
+ if not exists(blockpath):
+ makedirs(blockpath)
+ else:
+ raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+ hashtype = params['hashtype']
+ try:
+ hasher = newhasher(hashtype)
+ except ValueError:
+ msg = "Variable hashtype '%s' is not available from hashlib"
+ raise ValueError(msg % (hashtype,))
+
+ hasher.update("")
+ emptyhash = hasher.digest()
+
+ self.blocksize = blocksize
+ self.blockpath = blockpath
+ self.hashtype = hashtype
+ self.hashlen = len(emptyhash)
+ self.emptyhash = emptyhash
+
+ def get_rear_block(self, blkhash, create=0):
+ filename = hexlify(blkhash)
+ dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+ if not exists(dir):
+ makedirs(dir)
+ name = join(dir, filename)
+ return ContextFile(name, create)
+
+ def check_rear_block(self, blkhash):
+ filename = hexlify(blkhash)
+ dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+ name = join(dir, filename)
+ return exists(name)
+
+ def block_hash(self, data):
+ """Hash a block of data"""
+ hasher = newhasher(self.hashtype)
+ hasher.update(data.rstrip('\x00'))
+ return hasher.digest()
+
+ def block_ping(self, hashes):
+ """Check hashes for existence and
+ return those missing from block storage.
+ """
+ missing = []
+ append = missing.append
+ for i, h in enumerate(hashes):
+ if not self.check_rear_block(h):
+ append(i)
+ return missing
+
+ def block_retr(self, hashes):
+ """Retrieve blocks from storage by their hashes."""
+ blocksize = self.blocksize
+ blocks = []
+ append = blocks.append
+ block = None
+
+ for h in hashes:
+ with self.get_rear_block(h, 0) as rbl:
+ if not rbl:
+ break
+ for block in rbl.sync_read_chunks(blocksize, 1, 0):
+ break # there should be just one block there
+ if not block:
+ break
+ append(block)
+
+ return blocks
+
+ def block_stor(self, blocklist):
+ """Store a bunch of blocks and return (hashes, missing).
+ Hashes is a list of the hashes of the blocks,
+ missing is a list of indices in that list indicating
+ which blocks were missing from the store.
+ """
+ block_hash = self.block_hash
+ hashlist = [block_hash(b) for b in blocklist]
+ mf = None
+ missing = self.block_ping(hashlist)
+ for i in missing:
+ with self.get_rear_block(hashlist[i], 1) as rbl:
+ rbl.sync_write(blocklist[i]) #XXX: verify?
+
+ return hashlist, missing
+
+ def block_delta(self, blkhash, offdata=()):
+ """Construct and store a new block from a given block
+ and a list of (offset, data) 'patches'. Return:
+ (the hash of the new block, if the block already existed)
+ """
+ if not offdata:
+ return None, None
+
+ blocksize = self.blocksize
+ block = self.block_retr((blkhash,))
+ if not block:
+ return None, None
+
+ block = block[0]
+ newblock = ''
+ idx = 0
+ size = 0
+ trunc = 0
+ for off, data in offdata:
+ if not data:
+ trunc = 1
+ break
+ newblock += block[idx:off] + data
+ size += off - idx + len(data)
+ if size >= blocksize:
+ break
+ off = size
+
+ if not trunc:
+ newblock += block[size:len(block)]
+
+ h, a = self.block_stor((newblock,))
+ return h[0], 1 if a else 0
+
+ def block_hash_file(self, openfile):
+ """Return the list of hashes (hashes map)
+ for the blocks in a buffered file.
+ Helper method, does not affect store.
+ """
+ hashes = []
+ append = hashes.append
+ block_hash = self.block_hash
+
+ for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+ append(block_hash(block))
+
+ return hashes
+
+ def block_stor_file(self, openfile):
+ """Read blocks from buffered file object and store them. Return:
+ (bytes read, list of hashes, list of hashes that were missing)
+ """
+ blocksize = self.blocksize
+ block_stor = self.block_stor
+ hashlist = []
+ hextend = hashlist.extend
+ storedlist = []
+ sextend = storedlist.extend
+ lastsize = 0
+
+ for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+ hl, sl = block_stor((block,))
+ hextend(hl)
+ sextend(sl)
+ lastsize = len(block)
+
+ size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+ return size, hashlist, storedlist
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+ global _zeros
+ size = len(_zeros)
+ if nr == size:
+ return _zeros
+
+ if nr > size:
+ _zeros += '\0' * (nr - size)
+ return _zeros
+
+ if nr < size:
+ _zeros = _zeros[:nr]
+ return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+ """Write given chunks to the given buffered file object.
+ Writes never span across chunk boundaries.
+ If size is given stop after or pad until size bytes have been written.
+ """
+ fwrite = openfile.write
+ seek = openfile.seek
+ padding = 0
+
+ try:
+ seek(offset * chunksize)
+ except IOError, e:
+ seek = None
+ for x in xrange(offset):
+ fwrite(zeros(chunksize))
+
+ cursize = offset * chunksize
+
+ for chunk in chunks:
+ if padding:
+ if seek:
+ seek(padding -1, SEEK_CUR)
+ fwrite("\x00")
+ else:
+ fwrite(buffer(zeros(chunksize), 0, padding))
+ if size is not None and cursize + chunksize >= size:
+ chunk = chunk[:chunksize - (cursize - size)]
+ fwrite(chunk)
+ cursize += len(chunk)
+ break
+ fwrite(chunk)
+ padding = chunksize - len(chunk)
+
+ padding = size - cursize if size is not None else 0
+ if padding <= 0:
+ return
+
+ q, r = divmod(padding, chunksize)
+ for x in xrange(q):
+ fwrite(zeros(chunksize))
+ fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+ """Read and yield groups of chunks from a buffered file object at offset.
+ Reads never span accros chunksize boundaries.
+ """
+ fread = openfile.read
+ remains = offset * chunksize
+ seek = openfile.seek
+ try:
+ seek(remains)
+ except IOError, e:
+ seek = None
+ while 1:
+ s = fread(remains)
+ remains -= len(s)
+ if remains <= 0:
+ break
+
+ while nr:
+ remains = chunksize
+ chunk = ''
+ while 1:
+ s = fread(remains)
+ if not s:
+ if chunk:
+ yield chunk
+ return
+ chunk += s
+ remains -= len(s)
+ if remains <= 0:
+ break
+ yield chunk
+ nr -= 1
+
+
+class ContextFile(object):
+ __slots__ = ("name", "fdesc", "create")
+
+ def __init__(self, name, create=0):
+ self.name = name
+ self.fdesc = None
+ self.create = create
+ #self.dirty = 0
+
+ def __enter__(self):
+ name = self.name
+ try:
+ fdesc = open(name, 'rb+')
+ except IOError, e:
+ if not self.create or e.errno != ENOENT:
+ raise
+ fdesc = open(name, 'w+')
+
+ self.fdesc = fdesc
+ return self
+
+ def __exit__(self, exc, arg, trace):
+ fdesc = self.fdesc
+ if fdesc is not None:
+ #if self.dirty:
+ # fsync(fdesc.fileno())
+ fdesc.close()
+ return False # propagate exceptions
+
+ def seek(self, offset, whence=SEEK_SET):
+ return self.fdesc.seek(offset, whence)
+
+ def tell(self):
+ return self.fdesc.tell()
+
+ def truncate(self, size):
+ self.fdesc.truncate(size)
+
+ def sync_write(self, data):
+ #self.dirty = 1
+ self.fdesc.write(data)
+
+ def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+ #self.dirty = 1
+ return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+ def sync_read(self, size):
+ read = self.fdesc.read
+ data = ''
+ while 1:
+ s = read(size)
+ if not s:
+ break
+ data += s
+ return data
+
+ def sync_read_chunks(self, chunksize, nr, offset=0):
+ return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from blocker import Blocker
+from mapper import Mapper
+
+__all__ = ["Blocker", "Mapper"]
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import makedirs
+from os.path import isdir, realpath, exists, join
+from hashlib import new as newhasher
+from binascii import hexlify
+
+from context_file import ContextFile, file_sync_read_chunks
+
+
+class Blocker(object):
+ """Blocker.
+ Required contstructor parameters: blocksize, blockpath, hashtype.
+ """
+
+ blocksize = None
+ blockpath = None
+ hashtype = None
+
+ def __init__(self, **params):
+ blocksize = params['blocksize']
+ blockpath = params['blockpath']
+ blockpath = realpath(blockpath)
+ if not isdir(blockpath):
+ if not exists(blockpath):
+ makedirs(blockpath)
+ else:
+ raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
+
+ hashtype = params['hashtype']
+ try:
+ hasher = newhasher(hashtype)
+ except ValueError:
+ msg = "Variable hashtype '%s' is not available from hashlib"
+ raise ValueError(msg % (hashtype,))
+
+ hasher.update("")
+ emptyhash = hasher.digest()
+
+ self.blocksize = blocksize
+ self.blockpath = blockpath
+ self.hashtype = hashtype
+ self.hashlen = len(emptyhash)
+ self.emptyhash = emptyhash
+
+ def _get_rear_block(self, blkhash, create=0):
+ filename = hexlify(blkhash)
+ dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+ if not exists(dir):
+ makedirs(dir)
+ name = join(dir, filename)
+ return ContextFile(name, create)
+
+ def _check_rear_block(self, blkhash):
+ filename = hexlify(blkhash)
+ dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
+ name = join(dir, filename)
+ return exists(name)
+
+ def block_hash(self, data):
+ """Hash a block of data"""
+ hasher = newhasher(self.hashtype)
+ hasher.update(data.rstrip('\x00'))
+ return hasher.digest()
+
+ def block_ping(self, hashes):
+ """Check hashes for existence and
+ return those missing from block storage.
+ """
+ missing = []
+ append = missing.append
+ for i, h in enumerate(hashes):
+ if not self._check_rear_block(h):
+ append(i)
+ return missing
+
+ def block_retr(self, hashes):
+ """Retrieve blocks from storage by their hashes."""
+ blocksize = self.blocksize
+ blocks = []
+ append = blocks.append
+ block = None
+
+ for h in hashes:
+ with self._get_rear_block(h, 0) as rbl:
+ if not rbl:
+ break
+ for block in rbl.sync_read_chunks(blocksize, 1, 0):
+ break # there should be just one block there
+ if not block:
+ break
+ append(block)
+
+ return blocks
+
+ def block_stor(self, blocklist):
+ """Store a bunch of blocks and return (hashes, missing).
+ Hashes is a list of the hashes of the blocks,
+ missing is a list of indices in that list indicating
+ which blocks were missing from the store.
+ """
+ block_hash = self.block_hash
+ hashlist = [block_hash(b) for b in blocklist]
+ mf = None
+ missing = self.block_ping(hashlist)
+ for i in missing:
+ with self._get_rear_block(hashlist[i], 1) as rbl:
+ rbl.sync_write(blocklist[i]) #XXX: verify?
+
+ return hashlist, missing
+
+ def block_delta(self, blkhash, offdata=()):
+ """Construct and store a new block from a given block
+ and a list of (offset, data) 'patches'. Return:
+ (the hash of the new block, if the block already existed)
+ """
+ if not offdata:
+ return None, None
+
+ blocksize = self.blocksize
+ block = self.block_retr((blkhash,))
+ if not block:
+ return None, None
+
+ block = block[0]
+ newblock = ''
+ idx = 0
+ size = 0
+ trunc = 0
+ for off, data in offdata:
+ if not data:
+ trunc = 1
+ break
+ newblock += block[idx:off] + data
+ size += off - idx + len(data)
+ if size >= blocksize:
+ break
+ off = size
+
+ if not trunc:
+ newblock += block[size:len(block)]
+
+ h, a = self.block_stor((newblock,))
+ return h[0], 1 if a else 0
+
+ def block_hash_file(self, openfile):
+ """Return the list of hashes (hashes map)
+ for the blocks in a buffered file.
+ Helper method, does not affect store.
+ """
+ hashes = []
+ append = hashes.append
+ block_hash = self.block_hash
+
+ for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
+ append(block_hash(block))
+
+ return hashes
+
+ def block_stor_file(self, openfile):
+ """Read blocks from buffered file object and store them. Return:
+ (bytes read, list of hashes, list of hashes that were missing)
+ """
+ blocksize = self.blocksize
+ block_stor = self.block_stor
+ hashlist = []
+ hextend = hashlist.extend
+ storedlist = []
+ sextend = storedlist.extend
+ lastsize = 0
+
+ for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
+ hl, sl = block_stor((block,))
+ hextend(hl)
+ sextend(sl)
+ lastsize = len(block)
+
+ size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
+ return size, hashlist, storedlist
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os import SEEK_CUR, SEEK_SET, fsync
+from errno import ENOENT
+
+
+_zeros = ''
+
+
+def zeros(nr):
+ global _zeros
+ size = len(_zeros)
+ if nr == size:
+ return _zeros
+
+ if nr > size:
+ _zeros += '\0' * (nr - size)
+ return _zeros
+
+ if nr < size:
+ _zeros = _zeros[:nr]
+ return _zeros
+
+
+def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
+ """Write given chunks to the given buffered file object.
+ Writes never span across chunk boundaries.
+ If size is given stop after or pad until size bytes have been written.
+ """
+ fwrite = openfile.write
+ seek = openfile.seek
+ padding = 0
+
+ try:
+ seek(offset * chunksize)
+ except IOError, e:
+ seek = None
+ for x in xrange(offset):
+ fwrite(zeros(chunksize))
+
+ cursize = offset * chunksize
+
+ for chunk in chunks:
+ if padding:
+ if seek:
+ seek(padding -1, SEEK_CUR)
+ fwrite("\x00")
+ else:
+ fwrite(buffer(zeros(chunksize), 0, padding))
+ if size is not None and cursize + chunksize >= size:
+ chunk = chunk[:chunksize - (cursize - size)]
+ fwrite(chunk)
+ cursize += len(chunk)
+ break
+ fwrite(chunk)
+ padding = chunksize - len(chunk)
+
+ padding = size - cursize if size is not None else 0
+ if padding <= 0:
+ return
+
+ q, r = divmod(padding, chunksize)
+ for x in xrange(q):
+ fwrite(zeros(chunksize))
+ fwrite(buffer(zeros(chunksize), 0, r))
+
+
+def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
+ """Read and yield groups of chunks from a buffered file object at offset.
+ Reads never span accros chunksize boundaries.
+ """
+ fread = openfile.read
+ remains = offset * chunksize
+ seek = openfile.seek
+ try:
+ seek(remains)
+ except IOError, e:
+ seek = None
+ while 1:
+ s = fread(remains)
+ remains -= len(s)
+ if remains <= 0:
+ break
+
+ while nr:
+ remains = chunksize
+ chunk = ''
+ while 1:
+ s = fread(remains)
+ if not s:
+ if chunk:
+ yield chunk
+ return
+ chunk += s
+ remains -= len(s)
+ if remains <= 0:
+ break
+ yield chunk
+ nr -= 1
+
+
+class ContextFile(object):
+ __slots__ = ("name", "fdesc", "create")
+
+ def __init__(self, name, create=0):
+ self.name = name
+ self.fdesc = None
+ self.create = create
+ #self.dirty = 0
+
+ def __enter__(self):
+ name = self.name
+ try:
+ fdesc = open(name, 'rb+')
+ except IOError, e:
+ if not self.create or e.errno != ENOENT:
+ raise
+ fdesc = open(name, 'w+')
+
+ self.fdesc = fdesc
+ return self
+
+ def __exit__(self, exc, arg, trace):
+ fdesc = self.fdesc
+ if fdesc is not None:
+ #if self.dirty:
+ # fsync(fdesc.fileno())
+ fdesc.close()
+ return False # propagate exceptions
+
+ def seek(self, offset, whence=SEEK_SET):
+ return self.fdesc.seek(offset, whence)
+
+ def tell(self):
+ return self.fdesc.tell()
+
+ def truncate(self, size):
+ self.fdesc.truncate(size)
+
+ def sync_write(self, data):
+ #self.dirty = 1
+ self.fdesc.write(data)
+
+ def sync_write_chunks(self, chunksize, offset, chunks, size=None):
+ #self.dirty = 1
+ return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, size)
+
+ def sync_read(self, size):
+ read = self.fdesc.read
+ data = ''
+ while 1:
+ s = read(size)
+ if not s:
+ break
+ data += s
+ return data
+
+ def sync_read_chunks(self, chunksize, nr, offset=0):
+ return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from context_file import ContextFile
+
+
+class Mapper(object):
+ """Mapper.
+ Required contstructor parameters: mappath, namelen.
+ """
+
+ mappath = None
+ namelen = None
+
+ def __init__(self, **params):
+ self.params = params
+ self.namelen = params['namelen']
+ mappath = realpath(params['mappath'])
+ if not isdir(mappath):
+ if not exists(mappath):
+ makedirs(mappath)
+ else:
+ raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+ self.mappath = mappath
+
+ def _get_rear_map(self, name, create=0):
+ name = join(self.mappath, hex(int(name)))
+ return ContextFile(name, create)
+
+ def _delete_rear_map(self, name):
+ name = join(self.mappath, hex(int(name)))
+ try:
+ unlink(name)
+ return 1
+ except OSError, e:
+ if e.errno != ENOENT:
+ raise
+ return 0
+
+ def map_retr(self, name, blkoff=0, nr=100000000000000):
+ """Return as a list, part of the hashes map of an object
+ at the given block offset.
+ By default, return the whole hashes map.
+ """
+ namelen = self.namelen
+ hashes = ()
+
+ with self._get_rear_map(name, 0) as rmap:
+ if rmap:
+ hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+ return hashes
+
+ def map_stor(self, name, hashes=(), blkoff=0, create=1):
+ """Store hashes in the given hashes map, replacing the old ones."""
+ namelen = self.namelen
+ with self._get_rear_map(name, 1) as rmap:
+ rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+# def map_copy(self, src, dst):
+# """Copy a hashes map to another one, replacing it."""
+# with self._get_rear_map(src, 0) as rmap:
+# if rmap:
+# rmap.copy_to(dst)
+
+ def map_remv(self, name):
+ """Remove a hashes map. Returns true if the map was found and removed."""
+ return self._delete_rear_map(name)
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from os.path import realpath, join, exists, isdir
+from os import makedirs, unlink
+from errno import ENOENT
+
+from context_file import ContextFile
+
+
+class Mapper(object):
+ """Mapper.
+ Required contstructor parameters: mappath, namelen.
+ """
+
+ mappath = None
+ namelen = None
+
+ def __init__(self, **params):
+ self.params = params
+ self.namelen = params['namelen']
+ mappath = realpath(params['mappath'])
+ if not isdir(mappath):
+ if not exists(mappath):
+ makedirs(mappath)
+ else:
+ raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
+ self.mappath = mappath
+
+ def get_rear_map(self, name, create=0):
+ name = join(self.mappath, hex(int(name)))
+ return ContextFile(name, create)
+
+ def delete_rear_map(self, name):
+ name = join(self.mappath, hex(int(name)))
+ try:
+ unlink(name)
+ return 1
+ except OSError, e:
+ if e.errno != ENOENT:
+ raise
+ return 0
+
+ def map_retr(self, name, blkoff=0, nr=100000000000000):
+ """Return as a list, part of the hashes map of an object
+ at the given block offset.
+ By default, return the whole hashes map.
+ """
+ namelen = self.namelen
+ hashes = ()
+
+ with self.get_rear_map(name, 0) as rmap:
+ if rmap:
+ hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
+ return hashes
+
+ def map_stor(self, name, hashes=(), blkoff=0, create=1):
+ """Store hashes in the given hashes map, replacing the old ones."""
+ namelen = self.namelen
+ with self.get_rear_map(name, 1) as rmap:
+ rmap.sync_write_chunks(namelen, blkoff, hashes, None)
+
+# def map_copy(self, src, dst):
+# """Copy a hashes map to another one, replacing it."""
+# with self.get_rear_map(src, 0) as rmap:
+# if rmap:
+# rmap.copy_to(dst)
+
+ def map_remv(self, name):
+ """Remove a hashes map. Returns true if the map was found and removed."""
+ return self.delete_rear_map(name)
+
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from time import time
+from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey
+from sqlalchemy.schema import Index
+
+from dbworker import DBWorker
+
+
+ROOTNODE = 0
+
+( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
+
+inf = float('inf')
+
+
+def strnextling(prefix):
+ """Return the first unicode string
+ greater than but not starting with given prefix.
+ strnextling('hello') -> 'hellp'
+ """
+ if not prefix:
+ ## all strings start with the null string,
+ ## therefore we have to approximate strnextling('')
+ ## with the last unicode character supported by python
+ ## 0x10ffff for wide (32-bit unicode) python builds
+ ## 0x00ffff for narrow (16-bit unicode) python builds
+ ## We will not autodetect. 0xffff is safe enough.
+ return unichr(0xffff)
+ s = prefix[:-1]
+ c = ord(prefix[-1])
+ if c >= 0xffff:
+ raise RuntimeError
+ s += unichr(c+1)
+ return s
+
+def strprevling(prefix):
+ """Return an approximation of the last unicode string
+ less than but not starting with given prefix.
+ strprevling(u'hello') -> u'helln\\xffff'
+ """
+ if not prefix:
+ ## There is no prevling for the null string
+ return prefix
+ s = prefix[:-1]
+ c = ord(prefix[-1])
+ if c > 0:
+ s += unichr(c-1) + unichr(0xffff)
+ return s
+
+
+_propnames = {
+ 'serial' : 0,
+ 'node' : 1,
+ 'size' : 2,
+ 'source' : 3,
+ 'mtime' : 4,
+ 'muser' : 5,
+ 'cluster' : 6,
+}
+
+
+class Node(DBWorker):
+ """Nodes store path organization and have multiple versions.
+ Versions store object history and have multiple attributes.
+ Attributes store metadata.
+ """
+
+ # TODO: Provide an interface for included and excluded clusters.
+
+ def __init__(self, **params):
+ DBWorker.__init__(self, **params)
+ metadata = MetaData()
+
+ #create nodes table
+ columns=[]
+ columns.append(Column('node', Integer, primary_key=True))
+ columns.append(Column('parent', Integer,
+ ForeignKey('nodes.node',
+ ondelete='CASCADE',
+ onupdate='CASCADE'),
+ autoincrement=False, default=0))
+ #columns.append(Column('path', String(2048), default='', nullable=False))
+ columns.append(Column('path', String(255), default='', nullable=False))
+ self.nodes = Table('nodes', metadata, *columns)
+ # place an index on path
+ Index('idx_nodes_path', self.nodes.c.path, unique=True)
+
+ #create statistics table
+ columns=[]
+ columns.append(Column('node', Integer,
+ ForeignKey('nodes.node',
+ ondelete='CASCADE',
+ onupdate='CASCADE'),
+ primary_key=True))
+ columns.append(Column('population', Integer, nullable=False,
+ autoincrement=False, default=0))
+ columns.append(Column('size', Integer, nullable=False,
+ autoincrement=False, default=0))
+ columns.append(Column('mtime', Integer, autoincrement=False))
+ columns.append(Column('cluster', Integer, nullable=False,
+ autoincrement=False, default=0, primary_key=True))
+ self.statistics = Table('statistics', metadata, *columns)
+
+ #create versions table
+ columns=[]
+ columns.append(Column('serial', Integer, autoincrement=False,
+ primary_key=True))
+ columns.append(Column('node', Integer,
+ ForeignKey('nodes.node',
+ ondelete='CASCADE',
+ onupdate='CASCADE'),
+ autoincrement=False))
+ columns.append(Column('size', Integer, nullable=False,
+ autoincrement=False, default=0))
+ columns.append(Column('source', Integer, autoincrement=False))
+ columns.append(Column('mtime', Integer, autoincrement=False))
+ columns.append(Column('muser', String(255), nullable=False, default=''))
+ columns.append(Column('cluster', Integer, nullable=False,
+ autoincrement=False, default=0))
+ self.versions = Table('versions', metadata, *columns)
+ # place an index on node
+ Index('idx_versions_node', self.versions.c.mtime)
+ # TODO: Sort out if more indexes are needed.
+ #Index('idx_versions_node', self.versions.c.node)
+
+ #create attributes table
+ columns = []
+ columns.append(Column('serial', Integer,
+ ForeignKey('versions.serial',
+ ondelete='CASCADE',
+ onupdate='CASCADE'),
+ autoincrement=False,
+ primary_key=True))
+ columns.append(Column('key', String(255), primary_key=True))
+ columns.append(Column('value', String(255)))
+ self.attributes = Table('attributes', metadata, *columns)
+
+ metadata.create_all(self.engine)
+
+ s = self.nodes.insert(node=ROOTNODE, parent=ROOTNODE)
+ self.conn.execute(s)
+
+ def node_create(self, parent, path):
+ """Create a new node from the given properties.
+ Return the node identifier of the new node.
+ """
+
+ q = ("insert into nodes (parent, path) "
+ "values (?, ?)")
+ props = (parent, path)
+ return self.execute(q, props).lastrowid
+
+ def node_lookup(self, path):
+ """Lookup the current node of the given path.
+ Return None if the path is not found.
+ """
+
+ q = "select node from nodes where path = ?"
+ self.execute(q, (path,))
+ r = self.fetchone()
+ if r is not None:
+ return r[0]
+ return None
+
+ def node_get_properties(self, node):
+ """Return the node's (parent, path).
+ Return None if the node is not found.
+ """
+
+ q = "select parent, path from nodes where node = ?"
+ self.execute(q, (node,))
+ return self.fetchone()
+
+ def node_get_versions(self, node, keys=(), propnames=_propnames):
+ """Return the properties of all versions at node.
+ If keys is empty, return all properties in the order
+ (serial, node, size, source, mtime, muser, cluster).
+ """
+
+ q = ("select serial, node, size, source, mtime, muser, cluster "
+ "from versions "
+ "where node = ?")
+ self.execute(q, (node,))
+ r = self.fetchall()
+ if r is None:
+ return r
+
+ if not keys:
+ return r
+ return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
+
+ def node_count_children(self, node):
+ """Return node's child count."""
+
+ q = "select count(node) from nodes where parent = ? and node != 0"
+ self.execute(q, (node,))
+ r = self.fetchone()
+ if r is None:
+ return 0
+ return r[0]
+
+ def node_purge_children(self, parent, before=inf, cluster=0):
+ """Delete all versions with the specified
+ parent and cluster, and return
+ the serials of versions deleted.
+ Clears out nodes with no remaining versions.
+ """
+
+ execute = self.execute
+ q = ("select count(serial), sum(size) from versions "
+ "where node in (select node "
+ "from nodes "
+ "where parent = ?) "
+ "and cluster = ? "
+ "and mtime <= ?")
+ args = (parent, cluster, before)
+ execute(q, args)
+ nr, size = self.fetchone()
+ if not nr:
+ return ()
+ mtime = time()
+ self.statistics_update(parent, -nr, -size, mtime, cluster)
+ self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
+
+ q = ("select serial from versions "
+ "where node in (select node "
+ "from nodes "
+ "where parent = ?) "
+ "and cluster = ? "
+ "and mtime <= ?")
+ execute(q, args)
+ serials = [r[SERIAL] for r in self.fetchall()]
+ q = ("delete from versions "
+ "where node in (select node "
+ "from nodes "
+ "where parent = ?) "
+ "and cluster = ? "
+ "and mtime <= ?")
+ execute(q, args)
+ q = ("delete from nodes "
+ "where node in (select node from nodes n "
+ "where (select count(serial) "
+ "from versions "
+ "where node = n.node) = 0 "
+ "and parent = ?)")
+ execute(q, (parent,))
+ return serials
+
+ def node_purge(self, node, before=inf, cluster=0):
+ """Delete all versions with the specified
+ node and cluster, and return
+ the serials of versions deleted.
+ Clears out the node if it has no remaining versions.
+ """
+
+ execute = self.execute
+ q = ("select count(serial), sum(size) from versions "
+ "where node = ? "
+ "and cluster = ? "
+ "and mtime <= ?")
+ args = (node, cluster, before)
+ execute(q, args)
+ nr, size = self.fetchone()
+ if not nr:
+ return ()
+ mtime = time()
+ self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
+
+ q = ("select serial from versions "
+ "where node = ? "
+ "and cluster = ? "
+ "and mtime <= ?")
+ execute(q, args)
+ serials = [r[SERIAL] for r in self.fetchall()]
+ q = ("delete from versions "
+ "where node = ? "
+ "and cluster = ? "
+ "and mtime <= ?")
+ execute(q, args)
+ q = ("delete from nodes "
+ "where node in (select node from nodes n "
+ "where (select count(serial) "
+ "from versions "
+ "where node = n.node) = 0 "
+ "and node = ?)")
+ execute(q, (node,))
+ return serials
+
+ def node_remove(self, node):
+ """Remove the node specified.
+ Return false if the node has children or is not found.
+ """
+
+ if self.node_count_children(node):
+ return False
+
+ mtime = time()
+ q = ("select count(serial), sum(size), cluster "
+ "from versions "
+ "where node = ? "
+ "group by cluster")
+ self.execute(q, (node,))
+ for population, size, cluster in self.fetchall():
+ self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
+
+ q = "delete from nodes where node = ?"
+ self.execute(q, (node,))
+ return True
+
+ def statistics_get(self, node, cluster=0):
+ """Return population, total size and last mtime
+ for all versions under node that belong to the cluster.
+ """
+
+ q = ("select population, size, mtime from statistics "
+ "where node = ? and cluster = ?")
+ self.execute(q, (node, cluster))
+ return self.fetchone()
+
+ def statistics_update(self, node, population, size, mtime, cluster=0):
+ """Update the statistics of the given node.
+ Statistics keep track the population, total
+ size of objects and mtime in the node's namespace.
+ May be zero or positive or negative numbers.
+ """
+
+ qs = ("select population, size from statistics "
+ "where node = ? and cluster = ?")
+ qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
+ "values (?, ?, ?, ?, ?)")
+ self.execute(qs, (node, cluster))
+ r = self.fetchone()
+ if r is None:
+ prepopulation, presize = (0, 0)
+ else:
+ prepopulation, presize = r
+ population += prepopulation
+ size += presize
+ self.execute(qu, (node, population, size, mtime, cluster))
+
+ def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
+ """Update the statistics of the given node's parent.
+ Then recursively update all parents up to the root.
+ Population is not recursive.
+ """
+
+ while True:
+ if node == 0:
+ break
+ props = self.node_get_properties(node)
+ if props is None:
+ break
+ parent, path = props
+ self.statistics_update(parent, population, size, mtime, cluster)
+ node = parent
+ population = 0 # Population isn't recursive
+
+ def statistics_latest(self, node, before=inf, except_cluster=0):
+ """Return population, total size and last mtime
+ for all latest versions under node that
+ do not belong to the cluster.
+ """
+
+ execute = self.execute
+ fetchone = self.fetchone
+
+ # The node.
+ props = self.node_get_properties(node)
+ if props is None:
+ return None
+ parent, path = props
+
+ # The latest version.
+ q = ("select serial, node, size, source, mtime, muser, cluster "
+ "from versions "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = ? and mtime < ?) "
+ "and cluster != ?")
+ execute(q, (node, before, except_cluster))
+ props = fetchone()
+ if props is None:
+ return None
+ mtime = props[MTIME]
+
+ # First level, just under node (get population).
+ q = ("select count(serial), sum(size), max(mtime) "
+ "from versions v "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) "
+ "and cluster != ? "
+ "and node in (select node "
+ "from nodes "
+ "where parent = ?)")
+ execute(q, (before, except_cluster, node))
+ r = fetchone()
+ if r is None:
+ return None
+ count = r[0]
+ mtime = max(mtime, r[2])
+ if count == 0:
+ return (0, 0, mtime)
+
+ # All children (get size and mtime).
+ # XXX: This is why the full path is stored.
+ q = ("select count(serial), sum(size), max(mtime) "
+ "from versions v "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) "
+ "and cluster != ? "
+ "and node in (select node "
+ "from nodes "
+ "where path like ?)")
+ execute(q, (before, except_cluster, path + '%'))
+ r = fetchone()
+ if r is None:
+ return None
+ size = r[1] - props[SIZE]
+ mtime = max(mtime, r[2])
+ return (count, size, mtime)
+
+ def version_create(self, node, size, source, muser, cluster=0):
+ """Create a new version from the given properties.
+ Return the (serial, mtime) of the new version.
+ """
+
+ q = ("insert into versions (node, size, source, mtime, muser, cluster) "
+ "values (?, ?, ?, ?, ?, ?)")
+ mtime = time()
+ props = (node, size, source, mtime, muser, cluster)
+ serial = self.execute(q, props).lastrowid
+ self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+ return serial, mtime
+
+ def version_lookup(self, node, before=inf, cluster=0):
+ """Lookup the current version of the given node.
+ Return a list with its properties:
+ (serial, node, size, source, mtime, muser, cluster)
+ or None if the current version is not found in the given cluster.
+ """
+
+ q = ("select serial, node, size, source, mtime, muser, cluster "
+ "from versions "
+ "where serial = (select max(serial) "
+ "from versions "
+ "where node = ? and mtime < ?) "
+ "and cluster = ?")
+ self.execute(q, (node, before, cluster))
+ props = self.fetchone()
+ if props is not None:
+ return props
+ return None
+
+ def version_get_properties(self, serial, keys=(), propnames=_propnames):
+ """Return a sequence of values for the properties of
+ the version specified by serial and the keys, in the order given.
+ If keys is empty, return all properties in the order
+ (serial, node, size, source, mtime, muser, cluster).
+ """
+
+ q = ("select serial, node, size, source, mtime, muser, cluster "
+ "from versions "
+ "where serial = ?")
+ self.execute(q, (serial,))
+ r = self.fetchone()
+ if r is None:
+ return r
+
+ if not keys:
+ return r
+ return [r[propnames[k]] for k in keys if k in propnames]
+
+ def version_recluster(self, serial, cluster):
+ """Move the version into another cluster."""
+
+ props = self.version_get_properties(serial)
+ if not props:
+ return
+ node = props[NODE]
+ size = props[SIZE]
+ oldcluster = props[CLUSTER]
+ if cluster == oldcluster:
+ return
+
+ mtime = time()
+ self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
+ self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+
+ q = "update versions set cluster = ? where serial = ?"
+ self.execute(q, (cluster, serial))
+
+ def version_remove(self, serial):
+ """Remove the serial specified."""
+
+ props = self.node_get_properties(serial)
+ if not props:
+ return
+ node = props[NODE]
+ size = props[SIZE]
+ cluster = props[CLUSTER]
+
+ mtime = time()
+ self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
+
+ q = "delete from versions where serial = ?"
+ self.execute(q, (serial,))
+ return True
+
+ def attribute_get(self, serial, keys=()):
+ """Return a list of (key, value) pairs of the version specified by serial.
+ If keys is empty, return all attributes.
+ Othwerise, return only those specified.
+ """
+
+ execute = self.execute
+ if keys:
+ marks = ','.join('?' for k in keys)
+ q = ("select key, value from attributes "
+ "where key in (%s) and serial = ?" % (marks,))
+ execute(q, keys + (serial,))
+ else:
+ q = "select key, value from attributes where serial = ?"
+ execute(q, (serial,))
+ return self.fetchall()
+
+ def attribute_set(self, serial, items):
+ """Set the attributes of the version specified by serial.
+ Receive attributes as an iterable of (key, value) pairs.
+ """
+
+ q = ("insert or replace into attributes (serial, key, value) "
+ "values (?, ?, ?)")
+ self.executemany(q, ((serial, k, v) for k, v in items))
+
+ def attribute_del(self, serial, keys=()):
+ """Delete attributes of the version specified by serial.
+ If keys is empty, delete all attributes.
+ Otherwise delete those specified.
+ """
+
+ if keys:
+ q = "delete from attributes where serial = ? and key = ?"
+ self.executemany(q, ((serial, key) for key in keys))
+ else:
+ q = "delete from attributes where serial = ?"
+ self.execute(q, (serial,))
+
+ def attribute_copy(self, source, dest):
+ q = ("insert or replace into attributes "
+ "select ?, key, value from attributes "
+ "where serial = ?")
+ self.execute(q, (dest, source))
+
+ def _construct_filters(self, filterq):
+ if not filterq:
+ return None, None
+
+ args = filterq.split(',')
+ subq = " and a.key in ("
+ subq += ','.join(('?' for x in args))
+ subq += ")"
+
+ return subq, args
+
+ def _construct_paths(self, pathq):
+ if not pathq:
+ return None, None
+
+ subq = " and ("
+ subq += ' or '.join(('n.path like ?' for x in pathq))
+ subq += ")"
+ args = tuple([x + '%' for x in pathq])
+
+ return subq, args
+
+ def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
+ """Return a list with all keys pairs defined
+ for all latest versions under parent that
+ do not belong to the cluster.
+ """
+
+ # TODO: Use another table to store before=inf results.
+ q = ("select distinct a.key "
+ "from attributes a, versions v, nodes n "
+ "where v.serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) "
+ "and v.cluster != ? "
+ "and v.node in (select node "
+ "from nodes "
+ "where parent = ?) "
+ "and a.serial = v.serial "
+ "and n.node = v.node")
+ args = (before, except_cluster, parent)
+ subq, subargs = self._construct_paths(pathq)
+ if subq is not None:
+ q += subq
+ args += subargs
+ self.execute(q, args)
+ return [r[0] for r in self.fetchall()]
+
+ def latest_version_list(self, parent, prefix='', delimiter=None,
+ start='', limit=10000, before=inf,
+ except_cluster=0, pathq=[], filterq=None):
+ """Return a (list of (path, serial) tuples, list of common prefixes)
+ for the current versions of the paths with the given parent,
+ matching the following criteria.
+
+ The property tuple for a version is returned if all
+ of these conditions are true:
+
+ a. parent matches
+
+ b. path > start
+
+ c. path starts with prefix (and paths in pathq)
+
+ d. version is the max up to before
+
+ e. version is not in cluster
+
+ f. the path does not have the delimiter occuring
+ after the prefix, or ends with the delimiter
+
+ g. serial matches the attribute filter query.
+
+ A filter query is a comma-separated list of
+ terms in one of these three forms:
+
+ key
+ an attribute with this key must exist
+
+ !key
+ an attribute with this key must not exist
+
+ key ?op value
+ the attribute with this key satisfies the value
+ where ?op is one of ==, != <=, >=, <, >.
+
+ The list of common prefixes includes the prefixes
+ matching up to the first delimiter after prefix,
+ and are reported only once, as "virtual directories".
+ The delimiter is included in the prefixes.
+
+ If arguments are None, then the corresponding matching rule
+ will always match.
+
+ Limit applies to the first list of tuples returned.
+ """
+
+ execute = self.execute
+
+ if not start or start < prefix:
+ start = strprevling(prefix)
+ nextling = strnextling(prefix)
+
+ q = ("select distinct n.path, v.serial "
+ "from attributes a, versions v, nodes n "
+ "where v.serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) "
+ "and v.cluster != ? "
+ "and v.node in (select node "
+ "from nodes "
+ "where parent = ?) "
+ "and a.serial = v.serial "
+ "and n.node = v.node "
+ "and n.path > ? and n.path < ?")
+ args = [before, except_cluster, parent, start, nextling]
+
+ subq, subargs = self._construct_paths(pathq)
+ if subq is not None:
+ q += subq
+ args += subargs
+ subq, subargs = self._construct_filters(filterq)
+ if subq is not None:
+ q += subq
+ args += subargs
+ else:
+ q = q.replace("attributes a, ", "")
+ q = q.replace("and a.serial = v.serial ", "")
+ q += " order by n.path"
+
+ if not delimiter:
+ q += " limit ?"
+ args.append(limit)
+ execute(q, args)
+ return self.fetchall(), ()
+
+ pfz = len(prefix)
+ dz = len(delimiter)
+ count = 0
+ fetchone = self.fetchone
+ prefixes = []
+ pappend = prefixes.append
+ matches = []
+ mappend = matches.append
+
+ execute(q, args)
+ while True:
+ props = fetchone()
+ if props is None:
+ break
+ path, serial = props
+ idx = path.find(delimiter, pfz)
+
+ if idx < 0:
+ mappend(props)
+ count += 1
+ if count >= limit:
+ break
+ continue
+
+ pf = path[:idx + dz]
+ pappend(pf)
+ if idx + dz == len(path):
+ mappend(props)
+ count += 1
+ if count >= limit:
+ break
+
+ args[3] = strnextling(pf) # New start.
+ execute(q, args)
+
+ return matches, prefixes
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from sqlalchemy.sql import select, literal
+from sqlalchemy.sql.expression import join
+
+from xfeatures import XFeatures
+from groups import Groups
+from public import Public
+
+
+READ = 0
+WRITE = 1
+
+
+class Permissions(XFeatures, Groups, Public):
+
+ def __init__(self, **params):
+ XFeatures.__init__(self, **params)
+ Groups.__init__(self, **params)
+ Public.__init__(self, **params)
+
+ def access_grant(self, path, access, members=()):
+ """Grant members with access to path.
+ Members can also be '*' (all),
+ or some group specified as 'owner:group'."""
+
+ if not members:
+ return
+ feature = self.xfeature_create(path)
+ if feature is None:
+ return
+ self.feature_setmany(feature, access, members)
+
+ def access_set(self, path, permissions):
+ """Set permissions for path. The permissions dict
+ maps 'read', 'write' keys to member lists."""
+
+ self.xfeature_destroy(path)
+ self.access_grant(path, READ, permissions.get('read', []))
+ self.access_grant(path, WRITE, permissions.get('write', []))
+
+ def access_clear(self, path):
+ """Revoke access to path (both permissions and public)."""
+
+ self.xfeature_destroy(path)
+ self.public_unset(path)
+
+ def access_check(self, path, access, member):
+ """Return true if the member has this access to the path."""
+
+ if access == READ and self.public_check(path):
+ return True
+
+ r = self.xfeature_inherit(path)
+ if not r:
+ return False
+ fpath, feature = r
+ members = self.feature_get(feature, access)
+ if member in members or '*' in members:
+ return True
+ for owner, group in self.group_parents(member):
+ if owner + ':' + group in members:
+ return True
+ return False
+
+ def access_inherit(self, path):
+ """Return the inherited or assigned (path, permissions) pair for path."""
+
+ r = self.xfeature_inherit(path)
+ if not r:
+ return (path, {})
+ fpath, feature = r
+ permissions = self.feature_dict(feature)
+ if READ in permissions:
+ permissions['read'] = permissions[READ]
+ del(permissions[READ])
+ if WRITE in permissions:
+ permissions['write'] = permissions[WRITE]
+ del(permissions[WRITE])
+ return (fpath, permissions)
+
+ def access_list(self, path):
+ """List all permission paths inherited by or inheriting from path."""
+
+ return [x[0] for x in self.xfeature_list(path) if x[0] != path]
+
+ def access_list_paths(self, member, prefix=None):
+ """Return the list of paths granted to member."""
+
+ xfeatures_xfeaturevals = self.xfeatures.join(self.xfeaturevals)
+
+ selectable = (self.groups.c.owner + ':' + self.groups.c.name)
+ member_groups = select([selectable.label('value')],
+ self.groups.c.member == member)
+
+ members = select([literal(member).label('value')])
+
+ extended_member_groups = member_groups.union(members).alias()
+ inner_join = join(xfeatures_xfeaturevals,
+ extended_member_groups,
+ self.xfeaturevals.c.value == extended_member_groups.c.value)
+ s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
+ if prefix:
+ s = s.where(self.xfeatures.c.path.like(prefix + '%'))
+ r = self.conn.execute(s)
+ l = [row[0] for row in r.fetchall()]
+ r.close()
+ return l
+
+ def access_list_shared(self, prefix=''):
+ """Return the list of shared paths."""
+
+ s = select([self.xfeatures.c.path],
+ self.xfeatures.c.path.like(prefix + '%'))
+ r = self.conn.execute(s)
+ l = [row[0] for row in r.fetchall()]
+ r.close()
+ return l
\ No newline at end of file
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select
+from dbworker import DBWorker
+
+
+class Policy(DBWorker):
+ """Paths can be assigned key, value pairs, representing policy."""
+
+ def __init__(self, **params):
+ DBWorker.__init__(self, **params)
+ metadata = MetaData()
+ columns=[]
+ #columns.append(Column('path', String(2048), primary_key=True))
+ columns.append(Column('path', String(255), primary_key=True))
+ columns.append(Column('key', String(255), primary_key=True))
+ columns.append(Column('value', String(255)))
+ self.policies = Table('policy', metadata, *columns)
+ metadata.create_all(self.engine)
+ metadata.bind = self.engine
+
+ def policy_set(self, path, policy):
+ s = self.policies.insert()
+ values = [{'path':path, 'key':k, 'value':v} for k,v in policy.iteritems()]
+ r = self.conn.execute(s, values)
+ r.close()
+
+ def policy_unset(self, path):
+ s = self.policies.delete().where(self.policies.c.path==path)
+ r = self.conn.execute(s)
+ r.close()
+
+ def policy_get(self, path):
+ s = select([self.policies.c.key, self.policies.c.value],
+ self.policies.c.path==self.policies.c.path==path)
+ r = self.conn.execute(s)
+ d = dict(r.fetchall())
+ r.close()
+ return d
\ No newline at end of file
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from dbworker import DBWorker
+from sqlalchemy import Table, Column, String, MetaData
+from sqlalchemy.sql import select
+
+class Public(DBWorker):
+ """Paths can be marked as public."""
+
+ def __init__(self, **params):
+ DBWorker.__init__(self, **params)
+ metadata = MetaData()
+ columns=[]
+ #columns.append(Column('path', String(2048), primary_key=True))
+ columns.append(Column('path', String(255), primary_key=True))
+ self.public = Table('public', metadata, *columns)
+ metadata.create_all(self.engine)
+ metadata.bind = self.engine
+
+
+ def public_set(self, path):
+ s = self.public.insert()
+ r = self.conn.execute(s, path = path)
+ r.close()
+
+ def public_unset(self, path):
+ s = self.public.delete().where(self.public.c.path == path)
+ r = self.conn.execute(s)
+ r.close()
+
+ def public_check(self, path):
+ s = select([self.public.c.path], self.public.c.path == path)
+ r = self.conn.execute(s)
+ l = r.fetchone()
+ r.close()
+ return bool(l)
\ No newline at end of file
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from collections import defaultdict
+from sqlalchemy import Table, Column, String, Integer, MetaData, ForeignKey
+from sqlalchemy.sql import select, and_
+from sqlalchemy.schema import Index
+from sqlalchemy.sql.expression import desc
+
+from dbworker import DBWorker
+
+
+class XFeatures(DBWorker):
+ """XFeatures are path properties that allow non-nested
+ inheritance patterns. Currently used for storing permissions.
+ """
+
+ def __init__(self, **params):
+ DBWorker.__init__(self, **params)
+ metadata = MetaData()
+ columns=[]
+ columns.append(Column('feature_id', Integer, primary_key=True))
+ columns.append(Column('path', String(2048)))
+ self.xfeatures = Table('xfeatures', metadata, *columns)
+ # place an index on path
+ Index('idx_features_path', self.xfeatures.c.path)
+
+ columns=[]
+ columns.append(Column('feature_id', Integer,
+ ForeignKey('xfeatures.feature_id',
+ ondelete='CASCADE'),
+ primary_key=True))
+ columns.append(Column('key', Integer, autoincrement=False,
+ primary_key=True))
+ columns.append(Column('value', String(255), primary_key=True))
+ self.xfeaturevals = Table('xfeaturevals', metadata, *columns)
+
+ metadata.create_all(self.engine)
+ metadata.bind = self.engine
+
+ def xfeature_inherit(self, path):
+ """Return the (path, feature) inherited by the path, or None."""
+
+ s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id])
+ s = s.where(self.xfeatures.c.path <= path)
+ s = s.order_by(desc(self.xfeatures.c.path)).limit(1)
+ r = self.conn.execute(s)
+ row = r.fetchone()
+ r.close()
+ if row and path.startswith(row[0]):
+ return row
+ else:
+ return None
+
+ def xfeature_list(self, path):
+ """Return the list of the (prefix, feature) pairs matching path.
+ A prefix matches path if either the prefix includes the path,
+ or the path includes the prefix.
+ """
+
+ inherited = self.xfeature_inherit(path)
+ if inherited:
+ return [inherited]
+
+ s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id])
+ s = s.where(and_(self.xfeatures.c.path.like(path + '%'),
+ self.xfeatures.c.path != path))
+ s = s.order_by(self.xfeatures.c.path)
+ r = self.conn.execute(s)
+ l = r.fetchall()
+ r.close()
+ return l
+
+ def xfeature_create(self, path):
+ """Create and return a feature for path.
+ If the path already inherits a feature or
+ bestows to paths already inheriting a feature,
+ create no feature and return None.
+ If the path has a feature, return it.
+ """
+
+ prefixes = self.xfeature_list(path)
+ pl = len(prefixes)
+ if (pl > 1) or (pl == 1 and prefixes[0][0] != path):
+ return None
+ if pl == 1 and prefixes[0][0] == path:
+ return prefixes[0][1]
+ s = self.xfeatures.insert()
+ r = self.conn.execute(s, path=path)
+ inserted_primary_key = r.inserted_primary_key[0]
+ r.close()
+ return inserted_primary_key
+
+ def xfeature_destroy(self, path):
+ """Destroy a feature and all its key, value pairs."""
+
+ s = self.xfeatures.delete().where(self.xfeatures.c.path == path)
+ r = self.conn.execute(s)
+ r.close()
+
+ def feature_dict(self, feature):
+ """Return a dict mapping keys to list of values for feature."""
+
+ s = select([self.xfeaturevals.c.key, self.xfeaturevals.c.value])
+ s = s.where(self.xfeaturevals.c.feature_id == feature)
+ r = self.conn.execute(s)
+ d = defaultdict(list)
+ for key, value in r.fetchall():
+ d[key].append(value)
+ r.close()
+ return d
+
+ def feature_set(self, feature, key, value):
+ """Associate a key, value pair with a feature."""
+
+ s = self.xfeaturevals.insert()
+ r = self.conn.execute(s, feature_id=feature, key=key, value=value)
+ r.close()
+
+ def feature_setmany(self, feature, key, values):
+ """Associate the given key, and values with a feature."""
+
+ s = self.xfeaturevals.insert()
+ values = [{'feature_id':feature, 'key':key, 'value':v} for v in values]
+ r = self.conn.execute(s, values)
+ r.close()
+
+ def feature_unset(self, feature, key, value):
+ """Disassociate a key, value pair from a feature."""
+
+ s = self.xfeaturevals.delete()
+ s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+ self.xfeaturevals.c.key == key,
+ self.xfeaturevals.c.value == value))
+ r = self.conn.execute(s)
+ r.close()
+
+ def feature_unsetmany(self, feature, key, values):
+ """Disassociate the key for the values given, from a feature."""
+
+ for v in values:
+ conditional = and_(self.xfeaturevals.c.feature_id == feature,
+ self.xfeaturevals.c.key == key,
+ self.xfeaturevals.c.value == v)
+ s = self.xfeaturevals.delete().where(conditional)
+ r = self.conn.execute(s)
+ r.close()
+
+ def feature_get(self, feature, key):
+ """Return the list of values for a key of a feature."""
+
+ s = select([self.xfeaturevals.c.value])
+ s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+ self.xfeaturevals.c.key == key))
+ r = self.conn.execute(s)
+ l = [row[0] for row in r.fetchall()]
+ r.close()
+ return l
+
+ def feature_clear(self, feature, key):
+ """Delete all key, value pairs for a key of a feature."""
+
+ s = self.xfeaturevals.delete()
+ s = s.where(and_(self.xfeaturevals.c.feature_id == feature,
+ self.xfeaturevals.c.key == key))
+ r = self.conn.execute(s)
+ r.close()
--- /dev/null
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+import os
+import time
+import sqlite3
+import logging
+import hashlib
+import binascii
+
+from base import NotAllowedError, BaseBackend
+from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
+from lib_alchemy.permissions import Permissions, READ, WRITE
+from lib_alchemy.policy import Policy
+from lib_alchemy.hashfiler import Mapper, Blocker
+from sqlalchemy import create_engine
+
+( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
+
+inf = float('inf')
+
+
+logger = logging.getLogger(__name__)
+
+def backend_method(func=None, autocommit=1):
+ if func is None:
+ def fn(func):
+ return backend_method(func, autocommit)
+ return fn
+
+ if not autocommit:
+ return func
+ def fn(self, *args, **kw):
+ self.con.execute('begin deferred')
+ try:
+ ret = func(self, *args, **kw)
+ self.con.commit()
+ return ret
+ except:
+ self.con.rollback()
+ raise
+ return fn
+
+
+class ModularBackend(BaseBackend):
+ """A modular backend.
+
+ Uses modules for SQL functions and storage.
+ """
+
+ def __init__(self, db):
+ self.hash_algorithm = 'sha256'
+ self.block_size = 4 * 1024 * 1024 # 4MB
+
+ self.default_policy = {'quota': 0, 'versioning': 'auto'}
+
+ basepath = os.path.split(db)[0]
+ if basepath and not os.path.exists(basepath):
+ os.makedirs(basepath)
+ if not os.path.isdir(basepath):
+ raise RuntimeError("Cannot open database at '%s'" % (db,))
+
+ dbuser = 'pithos'
+ dbpass = 'archipelagos'
+ dbhost = '62.217.112.56'
+ dbname = 'pithosdb'
+ connection_str = 'mysql://%s:%s@%s/%s' %(dbuser, dbpass, dbhost, dbname)
+ engine = create_engine(connection_str, echo=True)
+
+ params = {'blocksize': self.block_size,
+ 'blockpath': basepath + '/blocks',
+ 'hashtype': self.hash_algorithm}
+ self.blocker = Blocker(**params)
+
+ params = {'mappath': basepath + '/maps',
+ 'namelen': self.blocker.hashlen}
+ self.mapper = Mapper(**params)
+
+ params = {'connection': engine.connect(),
+ 'engine': engine}
+ self.permissions = Permissions(**params)
+ self.policy = Policy(**params)
+ self.node = Node(**params)
+
+ self.con.commit()
+
+ @backend_method
+ def list_accounts(self, user, marker=None, limit=10000):
+ """Return a list of accounts the user can access."""
+
+ logger.debug("list_accounts: %s %s", user, marker, limit)
+ allowed = self._allowed_accounts(user)
+ start, limit = self._list_limits(allowed, marker, limit)
+ return allowed[start:start + limit]
+
+ @backend_method
+ def get_account_meta(self, user, account, until=None):
+ """Return a dictionary with the account metadata."""
+
+ logger.debug("get_account_meta: %s %s", account, until)
+ path, node = self._lookup_account(account, user == account)
+ if user != account:
+ if until or node is None or account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ try:
+ props = self._get_properties(node, until)
+ mtime = props[MTIME]
+ except NameError:
+ props = None
+ mtime = until
+ count, bytes, tstamp = self._get_statistics(node, until)
+ tstamp = max(tstamp, mtime)
+ if until is None:
+ modified = tstamp
+ else:
+ modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = max(modified, mtime)
+
+ if user != account:
+ meta = {'name': account}
+ else:
+ meta = {}
+ if props is not None:
+ meta.update(dict(self.node.attribute_get(props[SERIAL])))
+ if until is not None:
+ meta.update({'until_timestamp': tstamp})
+ meta.update({'name': account, 'count': count, 'bytes': bytes})
+ meta.update({'modified': modified})
+ return meta
+
+ @backend_method
+ def update_account_meta(self, user, account, meta, replace=False):
+ """Update the metadata associated with the account."""
+
+ logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_account(account, True)
+ self._put_metadata(user, node, meta, replace, False)
+
+ @backend_method
+ def get_account_groups(self, user, account):
+ """Return a dictionary with the user groups defined for this account."""
+
+ logger.debug("get_account_groups: %s", account)
+ if user != account:
+ if account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ return {}
+ self._lookup_account(account, True)
+ return self.permissions.group_dict(account)
+
+ @backend_method
+ def update_account_groups(self, user, account, groups, replace=False):
+ """Update the groups associated with the account."""
+
+ logger.debug("update_account_groups: %s %s %s", account, groups, replace)
+ if user != account:
+ raise NotAllowedError
+ self._lookup_account(account, True)
+ self._check_groups(groups)
+ if replace:
+ self.permissions.group_destroy(account)
+ for k, v in groups.iteritems():
+ if not replace: # If not already deleted.
+ self.permissions.group_delete(account, k)
+ if v:
+ self.permissions.group_addmany(account, k, v)
+
+ @backend_method
+ def put_account(self, user, account):
+ """Create a new account with the given name."""
+
+ logger.debug("put_account: %s", account)
+ if user != account:
+ raise NotAllowedError
+ node = self.node.node_lookup(account)
+ if node is not None:
+ raise NameError('Account already exists')
+ self._put_path(user, ROOTNODE, account)
+
+ @backend_method
+ def delete_account(self, user, account):
+ """Delete the account with the given name."""
+
+ logger.debug("delete_account: %s", account)
+ if user != account:
+ raise NotAllowedError
+ node = self.node.node_lookup(account)
+ if node is None:
+ return
+ if not self.node.node_remove(node):
+ raise IndexError('Account is not empty')
+ self.permissions.group_destroy(account)
+
+ @backend_method
+ def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
+ """Return a list of containers existing under an account."""
+
+ logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
+ if user != account:
+ if until or account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ allowed = self._allowed_containers(user, account)
+ start, limit = self._list_limits(allowed, marker, limit)
+ return allowed[start:start + limit]
+ if shared:
+ allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
+ start, limit = self._list_limits(allowed, marker, limit)
+ return allowed[start:start + limit]
+ node = self.node.node_lookup(account)
+ return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+
+ @backend_method
+ def get_container_meta(self, user, account, container, until=None):
+ """Return a dictionary with the container metadata."""
+
+ logger.debug("get_container_meta: %s %s %s", account, container, until)
+ if user != account:
+ if until or container not in self._allowed_containers(user, account):
+ raise NotAllowedError
+ path, node = self._lookup_container(account, container)
+ props = self._get_properties(node, until)
+ mtime = props[MTIME]
+ count, bytes, tstamp = self._get_statistics(node, until)
+ tstamp = max(tstamp, mtime)
+ if until is None:
+ modified = tstamp
+ else:
+ modified = self._get_statistics(node)[2] # Overall last modification.
+ modified = max(modified, mtime)
+
+ if user != account:
+ meta = {'name': container}
+ else:
+ meta = dict(self.node.attribute_get(props[SERIAL]))
+ if until is not None:
+ meta.update({'until_timestamp': tstamp})
+ meta.update({'name': container, 'count': count, 'bytes': bytes})
+ meta.update({'modified': modified})
+ return meta
+
+ @backend_method
+ def update_container_meta(self, user, account, container, meta, replace=False):
+ """Update the metadata associated with the container."""
+
+ logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_container(account, container)
+ self._put_metadata(user, node, meta, replace, False)
+
+ @backend_method
+ def get_container_policy(self, user, account, container):
+ """Return a dictionary with the container policy."""
+
+ logger.debug("get_container_policy: %s %s", account, container)
+ if user != account:
+ if container not in self._allowed_containers(user, account):
+ raise NotAllowedError
+ return {}
+ path = self._lookup_container(account, container)[0]
+ return self.policy.policy_get(path)
+
+ @backend_method
+ def update_container_policy(self, user, account, container, policy, replace=False):
+ """Update the policy associated with the account."""
+
+ logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
+ if user != account:
+ raise NotAllowedError
+ path = self._lookup_container(account, container)[0]
+ self._check_policy(policy)
+ if replace:
+ for k, v in self.default_policy.iteritems():
+ if k not in policy:
+ policy[k] = v
+ self.policy.policy_set(path, policy)
+
+ @backend_method
+ def put_container(self, user, account, container, policy=None):
+ """Create a new container with the given name."""
+
+ logger.debug("put_container: %s %s %s", account, container, policy)
+ if user != account:
+ raise NotAllowedError
+ try:
+ path, node = self._lookup_container(account, container)
+ except NameError:
+ pass
+ else:
+ raise NameError('Container already exists')
+ if policy:
+ self._check_policy(policy)
+ path = '/'.join((account, container))
+ self._put_path(user, self._lookup_account(account, True)[1], path)
+ for k, v in self.default_policy.iteritems():
+ if k not in policy:
+ policy[k] = v
+ self.policy.policy_set(path, policy)
+
+ @backend_method
+ def delete_container(self, user, account, container, until=None):
+ """Delete/purge the container with the given name."""
+
+ logger.debug("delete_container: %s %s %s", account, container, until)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_container(account, container)
+
+ if until is not None:
+ versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+ for v in versions:
+ self.mapper.map_remv(v)
+ self.node.node_purge_children(node, until, CLUSTER_DELETED)
+ return
+
+ if self._get_statistics(node)[0] > 0:
+ raise IndexError('Container is not empty')
+ versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+ for v in versions:
+ self.mapper.map_remv(v)
+ self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+ self.node.node_remove(node)
+ self.policy.policy_unset(path)
+
+ @backend_method
+ def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+ """Return a list of objects existing under a container."""
+
+ logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+ allowed = []
+ if user != account:
+ if until:
+ raise NotAllowedError
+ allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+ if not allowed:
+ raise NotAllowedError
+ else:
+ if shared:
+ allowed = self.permissions.access_list_shared('/'.join((account, container)))
+ path, node = self._lookup_container(account, container)
+ return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+
+ @backend_method
+ def list_object_meta(self, user, account, container, until=None):
+ """Return a list with all the container's object meta keys."""
+
+ logger.debug("list_object_meta: %s %s %s", account, container, until)
+ allowed = []
+ if user != account:
+ if until:
+ raise NotAllowedError
+ allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+ if not allowed:
+ raise NotAllowedError
+ path, node = self._lookup_container(account, container)
+ before = until if until is not None else inf
+ return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+
+ @backend_method
+ def get_object_meta(self, user, account, container, name, version=None):
+ """Return a dictionary with the object metadata."""
+
+ logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+ self._can_read(user, account, container, name)
+ path, node = self._lookup_object(account, container, name)
+ props = self._get_version(node, version)
+ if version is None:
+ modified = props[MTIME]
+ else:
+ modified = self._get_version(node)[MTIME] # Overall last modification.
+
+ meta = dict(self.node.attribute_get(props[SERIAL]))
+ meta.update({'name': name, 'bytes': props[SIZE]})
+ meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
+ meta.update({'modified': modified, 'modified_by': props[MUSER]})
+ return meta
+
+ @backend_method
+ def update_object_meta(self, user, account, container, name, meta, replace=False):
+ """Update the metadata associated with the object."""
+
+ logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+ self._can_write(user, account, container, name)
+ path, node = self._lookup_object(account, container, name)
+ self._put_metadata(user, node, meta, replace)
+
+ @backend_method
+ def get_object_permissions(self, user, account, container, name):
+ """Return the path from which this object gets its permissions from,\
+ along with a dictionary containing the permissions."""
+
+ logger.debug("get_object_permissions: %s %s %s", account, container, name)
+ self._can_read(user, account, container, name)
+ path = self._lookup_object(account, container, name)[0]
+ return self.permissions.access_inherit(path)
+
+ @backend_method
+ def update_object_permissions(self, user, account, container, name, permissions):
+ """Update the permissions associated with the object."""
+
+ logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
+ if user != account:
+ raise NotAllowedError
+ path = self._lookup_object(account, container, name)[0]
+ self._check_permissions(path, permissions)
+ self.permissions.access_set(path, permissions)
+
+ @backend_method
+ def get_object_public(self, user, account, container, name):
+ """Return the public URL of the object if applicable."""
+
+ logger.debug("get_object_public: %s %s %s", account, container, name)
+ self._can_read(user, account, container, name)
+ path = self._lookup_object(account, container, name)[0]
+ if self.permissions.public_check(path):
+ return '/public/' + path
+ return None
+
+ @backend_method
+ def update_object_public(self, user, account, container, name, public):
+ """Update the public status of the object."""
+
+ logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
+ self._can_write(user, account, container, name)
+ path = self._lookup_object(account, container, name)[0]
+ if not public:
+ self.permissions.public_unset(path)
+ else:
+ self.permissions.public_set(path)
+
+ @backend_method
+ def get_object_hashmap(self, user, account, container, name, version=None):
+ """Return the object's size and a list with partial hashes."""
+
+ logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
+ self._can_read(user, account, container, name)
+ path, node = self._lookup_object(account, container, name)
+ props = self._get_version(node, version)
+ hashmap = self.mapper.map_retr(props[SERIAL])
+ return props[SIZE], [binascii.hexlify(x) for x in hashmap]
+
+ @backend_method
+ def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+ """Create/update an object with the specified size and partial hashes."""
+
+ logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+ if permissions is not None and user != account:
+ raise NotAllowedError
+ self._can_write(user, account, container, name)
+ missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
+ if missing:
+ ie = IndexError()
+ ie.data = missing
+ raise ie
+ if permissions is not None:
+ self._check_permissions(path, permissions)
+ path, node = self._put_object_node(account, container, name)
+ src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
+ self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
+ if not replace_meta and src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+ if permissions is not None:
+ self.permissions.access_set(path, permissions)
+
+ @backend_method
+ def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ """Copy an object's data and metadata."""
+
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+ if permissions is not None and user != account:
+ raise NotAllowedError
+ self._can_read(user, account, src_container, src_name)
+ self._can_write(user, account, dest_container, dest_name)
+ src_path, src_node = self._lookup_object(account, src_container, src_name)
+ if permissions is not None:
+ self._check_permissions(dest_path, permissions)
+ dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
+ src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
+ if src_version_id is not None:
+ self._copy_data(src_version_id, dest_version_id)
+ if not replace_meta and src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+ if permissions is not None:
+ self.permissions.access_set(dest_path, permissions)
+
+ @backend_method
+ def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+ """Move an object's data and metadata."""
+
+ logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
+ self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+ self.delete_object(user, account, src_container, src_name)
+
+ @backend_method
+ def delete_object(self, user, account, container, name, until=None):
+ """Delete/purge an object."""
+
+ logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+ if user != account:
+ raise NotAllowedError
+
+ if until is not None:
+ path = '/'.join((account, container, name))
+ node = self.node.node_lookup(path)
+ if node is None:
+ return
+ versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
+ versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
+ for v in versions:
+ self.mapper.map_remv(v)
+ self.node.node_purge_children(node, until, CLUSTER_DELETED)
+ try:
+ props = self._get_version(node)
+ except NameError:
+ pass
+ else:
+ self.permissions.access_clear(path)
+ return
+
+ path, node = self._lookup_object(account, container, name)
+ self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
+ self.permissions.access_clear(path)
+
+ @backend_method
+ def list_versions(self, user, account, container, name):
+ """Return a list of all (version, version_timestamp) tuples for an object."""
+
+ logger.debug("list_versions: %s %s %s", account, container, name)
+ self._can_read(user, account, container, name)
+ return self.node.node_get_versions(node, ['serial', 'mtime'])
+
+ @backend_method(autocommit=0)
+ def get_block(self, hash):
+ """Return a block's data."""
+
+ logger.debug("get_block: %s", hash)
+ blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
+ if not blocks:
+ raise NameError('Block does not exist')
+ return blocks[0]
+
+ @backend_method(autocommit=0)
+ def put_block(self, data):
+ """Create a block and return the hash."""
+
+ logger.debug("put_block: %s", len(data))
+ hashes, absent = self.blocker.block_stor((data,))
+ return binascii.hexlify(hashes[0])
+
+ @backend_method(autocommit=0)
+ def update_block(self, hash, data, offset=0):
+ """Update a known block and return the hash."""
+
+ logger.debug("update_block: %s %s %s", hash, len(data), offset)
+ if offset == 0 and len(data) == self.block_size:
+ return self.put_block(data)
+ h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+ return binascii.hexlify(h)
+
+ def _check_policy(self, policy):
+ for k in policy.keys():
+ if policy[k] == '':
+ policy[k] = self.default_policy.get(k)
+ for k, v in policy.iteritems():
+ if k == 'quota':
+ q = int(v) # May raise ValueError.
+ if q < 0:
+ raise ValueError
+ elif k == 'versioning':
+ if v not in ['auto', 'manual', 'none']:
+ raise ValueError
+ else:
+ raise ValueError
+
+ def _sql_until(self, parent, until=None):
+ """Return the sql to get the latest versions until the timestamp given."""
+
+ if until is None:
+ until = time.time()
+ sql = ("select v.serial, n.path, v.mtime, v.size "
+ "from versions v, nodes n "
+ "where v.serial = (select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < %s) "
+ "and v.cluster != %s "
+ "and v.node = n.node "
+ "and v.node in (select node "
+ "from nodes "
+ "where parent = %s)")
+ return sql % (until, CLUSTER_DELETED, parent)
+
+ def _list_limits(self, listing, marker, limit):
+ start = 0
+ if marker:
+ try:
+ start = listing.index(marker) + 1
+ except ValueError:
+ pass
+ if not limit or limit > 10000:
+ limit = 10000
+ return start, limit
+
+ def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+ cont_prefix = path + '/'
+ if keys and len(keys) > 0:
+# sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
+# m.version_id = o.version_id and m.key in (%s)'''
+# sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
+# param = (cont_prefix + prefix + '%',) + tuple(keys)
+# if allowed:
+# sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
+# param += tuple([x + '%' for x in allowed])
+# sql += ' order by o.name'
+ return []
+ else:
+ sql = 'select path, serial from (%s) where path like ?'
+ sql = sql % self._sql_until(parent, until)
+ param = (cont_prefix + prefix + '%',)
+ if allowed:
+ sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
+ param += tuple([x + '%' for x in allowed])
+ sql += ' order by path'
+ c = self.con.execute(sql, param)
+ objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
+ if delimiter:
+ pseudo_objects = []
+ for x in objects:
+ pseudo_name = x[0]
+ i = pseudo_name.find(delimiter, len(prefix))
+ if not virtual:
+ # If the delimiter is not found, or the name ends
+ # with the delimiter's first occurence.
+ if i == -1 or len(pseudo_name) == i + len(delimiter):
+ pseudo_objects.append(x)
+ else:
+ # If the delimiter is found, keep up to (and including) the delimiter.
+ if i != -1:
+ pseudo_name = pseudo_name[:i + len(delimiter)]
+ if pseudo_name not in [y[0] for y in pseudo_objects]:
+ if pseudo_name == x[0]:
+ pseudo_objects.append(x)
+ else:
+ pseudo_objects.append((pseudo_name, None))
+ objects = pseudo_objects
+
+ start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+ return objects[start:start + limit]
+
+ # Path functions.
+
+ def _put_object_node(self, account, container, name):
+ path, parent = self._lookup_container(account, container)
+ path = '/'.join((path, name))
+ node = self.node.node_lookup(path)
+ if node is None:
+ node = self.node.node_create(parent, path)
+ return path, node
+
+ def _put_path(self, user, parent, path):
+ node = self.node.node_create(parent, path)
+ self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
+ return node
+
+ def _lookup_account(self, account, create=True):
+ node = self.node.node_lookup(account)
+ if node is None and create:
+ node = self._put_path(account, ROOTNODE, account) # User is account.
+ return account, node
+
+ def _lookup_container(self, account, container):
+ path = '/'.join((account, container))
+ node = self.node.node_lookup(path)
+ if node is None:
+ raise NameError('Container does not exist')
+ return path, node
+
+ def _lookup_object(self, account, container, name):
+ path = '/'.join((account, container, name))
+ node = self.node.node_lookup(path)
+ if node is None:
+ raise NameError('Object does not exist')
+ return path, node
+
+ def _get_properties(self, node, until=None):
+ """Return properties until the timestamp given."""
+
+ before = until if until is not None else inf
+ props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
+ if props is None and until is not None:
+ props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
+ if props is None:
+ raise NameError('Path does not exist')
+ return props
+
+ def _get_statistics(self, node, until=None):
+ """Return count, sum of size and latest timestamp of everything under node."""
+
+ if until is None:
+ stats = self.node.statistics_get(node, CLUSTER_NORMAL)
+ else:
+ stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
+ if stats is None:
+ stats = (0, 0, 0)
+ return stats
+
+ def _get_version(self, node, version=None):
+ if version is None:
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is None:
+ raise NameError('Object does not exist')
+ else:
+ props = self.node.version_get_properties(version)
+ if props is None or props[CLUSTER] == CLUSTER_DELETED:
+ raise IndexError('Version does not exist')
+ return props
+
+ def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
+
+ # Get source serial and size.
+ if src_version is not None:
+ src_props = self._get_version(src_node, src_version)
+ src_version_id = src_props[SERIAL]
+ size = src_props[SIZE]
+ else:
+ # Latest or create from scratch.
+ try:
+ src_props = self._get_version(src_node)
+ src_version_id = src_props[SERIAL]
+ size = src_props[SIZE]
+ except NameError:
+ src_version_id = None
+ size = 0
+ if dest_size is not None:
+ size = dest_size
+
+ # Move the latest version at destination to CLUSTER_HISTORY and create new.
+ if src_node == dest_node and src_version is None and src_version_id is not None:
+ self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+ else:
+ dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
+ if dest_props is not None:
+ self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
+ dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
+
+ return src_version_id, dest_version_id
+
+ def _copy_data(self, src_version, dest_version):
+ hashmap = self.mapper.map_retr(src_version)
+ self.mapper.map_stor(dest_version, hashmap)
+
+ def _get_metadata(self, version):
+ if version is None:
+ return {}
+ return dict(self.node.attribute_get(version))
+
+ def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+ """Create a new version and store metadata."""
+
+ src_version_id, dest_version_id = self._copy_version(user, node, None, node)
+ if not replace:
+ if src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
+ else:
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+ if copy_data and src_version_id is not None:
+ self._copy_data(src_version_id, dest_version_id)
+
+ # Access control functions.
+
+ def _check_groups(self, groups):
+ # raise ValueError('Bad characters in groups')
+ pass
+
+ def _check_permissions(self, path, permissions):
+ # raise ValueError('Bad characters in permissions')
+
+ # Check for existing permissions.
+ paths = self.permissions.access_list(path)
+ if paths:
+ ae = AttributeError()
+ ae.data = paths
+ raise ae
+
+ def _can_read(self, user, account, container, name):
+ if user == account:
+ return True
+ path = '/'.join((account, container, name))
+ if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
+ raise NotAllowedError
+
+ def _can_write(self, user, account, container, name):
+ if user == account:
+ return True
+ path = '/'.join((account, container, name))
+ if not self.permissions.access_check(path, WRITE, user):
+ raise NotAllowedError
+
+ def _allowed_accounts(self, user):
+ allow = set()
+ for path in self.permissions.access_list_paths(user):
+ allow.add(path.split('/', 1)[0])
+ return sorted(allow)
+
+ def _allowed_containers(self, user, account):
+ allow = set()
+ for path in self.permissions.access_list_paths(user, account):
+ allow.add(path.split('/', 2)[1])
+ return sorted(allow)