add support to store blocks on RADOS
[pithos] / snf-pithos-backend / pithos / backends / lib / hashfiler / blocker.py
index af64bfa..0dd1644 100644 (file)
 # interpreted as representing official policies, either expressed
 # or implied, of GRNET S.A.
 
-from os import makedirs
-from os.path import isdir, realpath, exists, join
 from hashlib import new as newhasher
 from binascii import hexlify
 
-from context_file import ContextFile, file_sync_read_chunks
+from radosblocker import RadosBlocker
+from fileblocker import FileBlocker
+
+def intersect(a, b):
+    """ return the intersection of two lists """
+    return list(set(a) & set(b))
+
+def union(a, b):
+    """ return the union of two lists """
+    return list(set(a) | set(b))
 
 
 class Blocker(object):
     """Blocker.
-       Required constructor parameters: blocksize, blockpath, hashtype.
+       Required constructor parameters: blocksize, blockpath, hashtype,
+       blockpool.
     """
 
-    blocksize = None
-    blockpath = None
-    hashtype = None
-
     def __init__(self, **params):
-        blocksize = params['blocksize']
-        blockpath = params['blockpath']
-        blockpath = realpath(blockpath)
-        if not isdir(blockpath):
-            if not exists(blockpath):
-                makedirs(blockpath)
-            else:
-                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
-
-        hashtype = params['hashtype']
-        try:
-            hasher = newhasher(hashtype)
-        except ValueError:
-            msg = "Variable hashtype '%s' is not available from hashlib"
-            raise ValueError(msg % (hashtype,))
-
-        hasher.update("")
-        emptyhash = hasher.digest()
-
-        self.blocksize = blocksize
-        self.blockpath = blockpath
-        self.hashtype = hashtype
-        self.hashlen = len(emptyhash)
-        self.emptyhash = emptyhash
-
-    def _pad(self, block):
-        return block + ('\x00' * (self.blocksize - len(block)))
-
-    def _get_rear_block(self, blkhash, create=0):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        if not exists(dir):
-            makedirs(dir)
-        name = join(dir, filename)
-        return ContextFile(name, create)
-
-    def _check_rear_block(self, blkhash):
-        filename = hexlify(blkhash)
-        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
-        name = join(dir, filename)
-        return exists(name)
+        params['blockpool'] = 'blocks'
+        self.rblocker = RadosBlocker(**params)
+        self.fblocker = FileBlocker(**params)
+        self.hashlen = self.rblocker.hashlen
+
+#    def _get_rear_block(self, blkhash, create=0):
+#        return self.rblocker._get_rear_block(blkhash, create)
+
+#    def _check_rear_block(self, blkhash):
+#        return self.rblocker._check_rear_block(blkhash)
+#        return self.rblocker._check_rear_block(blkhash) and
+#                self.fblocker._check_rear_block(blkhash)
 
     def block_hash(self, data):
         """Hash a block of data"""
-        hasher = newhasher(self.hashtype)
-        hasher.update(data.rstrip('\x00'))
-        return hasher.digest()
+        return self.rblocker.block_hash(data)
 
     def block_ping(self, hashes):
         """Check hashes for existence and
            return those missing from block storage.
         """
-        notfound = []
-        append = notfound.append
-
-        for h in hashes:
-            if h not in notfound and not self._check_rear_block(h):
-                append(h)
-
-        return notfound
+#        return self.rblocker.block_ping(hashes)
+        r = self.rblocker.block_ping(hashes)
+        f = self.fblocker.block_ping(hashes)
+        return union(r, f)
 
     def block_retr(self, hashes):
         """Retrieve blocks from storage by their hashes."""
-        blocksize = self.blocksize
-        blocks = []
-        append = blocks.append
-        block = None
-
-        for h in hashes:
-            if h == self.emptyhash:
-                append(self._pad(''))
-                continue
-            with self._get_rear_block(h, 0) as rbl:
-                if not rbl:
-                    break
-                for block in rbl.sync_read_chunks(blocksize, 1, 0):
-                    break # there should be just one block there
-            if not block:
-                break
-            append(self._pad(block))
-
-        return blocks
+        return self.fblocker.block_retr(hashes)
 
     def block_stor(self, blocklist):
         """Store a bunch of blocks and return (hashes, missing).
@@ -138,72 +89,33 @@ class Blocker(object):
            missing is a list of indices in that list indicating
            which blocks were missing from the store.
         """
-        block_hash = self.block_hash
-        hashlist = [block_hash(b) for b in blocklist]
-        mf = None
-        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
-        for i in missing:
-            with self._get_rear_block(hashlist[i], 1) as rbl:
-                 rbl.sync_write(blocklist[i]) #XXX: verify?
+#        return self.rblocker.block_stor(blocklist)
+        (hashes, r_missing) = self.rblocker.block_stor(blocklist)
+        (_, f_missing) = self.fblocker.block_stor(blocklist)
+        return (hashes, union(r_missing, f_missing))
 
-        return hashlist, missing
 
     def block_delta(self, blkhash, offset, data):
         """Construct and store a new block from a given block
            and a data 'patch' applied at offset. Return:
            (the hash of the new block, if the block already existed)
         """
+#        return self.rblocker.block_delta(blkhash, offset, data)
 
-        blocksize = self.blocksize
-        if offset >= blocksize or not data:
+        (f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data)
+        (r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset, data)
+        if not r_hash and not f_hash:
             return None, None
-
-        block = self.block_retr((blkhash,))
-        if not block:
-            return None, None
-        
-        block = block[0]
-        newblock = block[:offset] + data
-        if len(newblock) > blocksize:
-            newblock = newblock[:blocksize]
-        elif len(newblock) < blocksize:
-            newblock += block[len(newblock):]
-
-        h, a = self.block_stor((newblock,))
-        return h[0], 1 if a else 0
-
-    def block_hash_file(self, openfile):
-        """Return the list of hashes (hashes map)
-           for the blocks in a buffered file.
-           Helper method, does not affect store.
-        """
-        hashes = []
-        append = hashes.append
-        block_hash = self.block_hash
-
-        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
-            append(block_hash(block))
-
-        return hashes
-
-    def block_stor_file(self, openfile):
-        """Read blocks from buffered file object and store them. Return:
-           (bytes read, list of hashes, list of hashes that were missing)
-        """
-        blocksize = self.blocksize
-        block_stor = self.block_stor
-        hashlist = []
-        hextend = hashlist.extend
-        storedlist = []
-        sextend = storedlist.extend
-        lastsize = 0
-
-        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
-            hl, sl = block_stor((block,))
-            hextend(hl)
-            sextend(sl)
-            lastsize = len(block)
-
-        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
-        return size, hashlist, storedlist
-
+        if not r_hash:
+            block = self.fblocker.block_retr((blkhash,))
+            if not block:
+                return None, None
+            block = block[0]
+            newblock = block[:offset] + data
+            if len(newblock) > blocksize:
+                newblock = newblock[:blocksize]
+            elif len(newblock) < blocksize:
+                newblock += block[len(newblock):]
+            r_hash, r_existed = self.rblocker.block_stor((newblock,))
+
+        return f_hash, 1 if r_existed and f_existed else 0