# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-from os import makedirs
-from os.path import isdir, realpath, exists, join
from hashlib import new as newhasher
from binascii import hexlify
-from context_file import ContextFile, file_sync_read_chunks
+from radosblocker import RadosBlocker
+from fileblocker import FileBlocker
+
+def intersect(a, b):
+ """ return the intersection of two lists """
+ return list(set(a) & set(b))
+
+def union(a, b):
+ """ return the union of two lists """
+ return list(set(a) | set(b))
class Blocker(object):
"""Blocker.
- Required constructor parameters: blocksize, blockpath, hashtype.
+ Required constructor parameters: blocksize, blockpath, hashtype,
+ blockpool.
"""
- blocksize = None
- blockpath = None
- hashtype = None
-
def __init__(self, **params):
- blocksize = params['blocksize']
- blockpath = params['blockpath']
- blockpath = realpath(blockpath)
- if not isdir(blockpath):
- if not exists(blockpath):
- makedirs(blockpath)
- else:
- raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
-
- hashtype = params['hashtype']
- try:
- hasher = newhasher(hashtype)
- except ValueError:
- msg = "Variable hashtype '%s' is not available from hashlib"
- raise ValueError(msg % (hashtype,))
-
- hasher.update("")
- emptyhash = hasher.digest()
-
- self.blocksize = blocksize
- self.blockpath = blockpath
- self.hashtype = hashtype
- self.hashlen = len(emptyhash)
- self.emptyhash = emptyhash
-
- def _pad(self, block):
- return block + ('\x00' * (self.blocksize - len(block)))
-
- def _get_rear_block(self, blkhash, create=0):
- filename = hexlify(blkhash)
- dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
- if not exists(dir):
- makedirs(dir)
- name = join(dir, filename)
- return ContextFile(name, create)
-
- def _check_rear_block(self, blkhash):
- filename = hexlify(blkhash)
- dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
- name = join(dir, filename)
- return exists(name)
+ params['blockpool'] = 'blocks'
+ self.rblocker = RadosBlocker(**params)
+ self.fblocker = FileBlocker(**params)
+ self.hashlen = self.rblocker.hashlen
+
+# def _get_rear_block(self, blkhash, create=0):
+# return self.rblocker._get_rear_block(blkhash, create)
+
+# def _check_rear_block(self, blkhash):
+# return self.rblocker._check_rear_block(blkhash)
+# return self.rblocker._check_rear_block(blkhash) and
+# self.fblocker._check_rear_block(blkhash)
def block_hash(self, data):
"""Hash a block of data"""
- hasher = newhasher(self.hashtype)
- hasher.update(data.rstrip('\x00'))
- return hasher.digest()
+ return self.rblocker.block_hash(data)
def block_ping(self, hashes):
"""Check hashes for existence and
return those missing from block storage.
"""
- notfound = []
- append = notfound.append
-
- for h in hashes:
- if h not in notfound and not self._check_rear_block(h):
- append(h)
-
- return notfound
+# return self.rblocker.block_ping(hashes)
+ r = self.rblocker.block_ping(hashes)
+ f = self.fblocker.block_ping(hashes)
+ return union(r, f)
def block_retr(self, hashes):
"""Retrieve blocks from storage by their hashes."""
- blocksize = self.blocksize
- blocks = []
- append = blocks.append
- block = None
-
- for h in hashes:
- if h == self.emptyhash:
- append(self._pad(''))
- continue
- with self._get_rear_block(h, 0) as rbl:
- if not rbl:
- break
- for block in rbl.sync_read_chunks(blocksize, 1, 0):
- break # there should be just one block there
- if not block:
- break
- append(self._pad(block))
-
- return blocks
+ return self.fblocker.block_retr(hashes)
def block_stor(self, blocklist):
"""Store a bunch of blocks and return (hashes, missing).
missing is a list of indices in that list indicating
which blocks were missing from the store.
"""
- block_hash = self.block_hash
- hashlist = [block_hash(b) for b in blocklist]
- mf = None
- missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
- for i in missing:
- with self._get_rear_block(hashlist[i], 1) as rbl:
- rbl.sync_write(blocklist[i]) #XXX: verify?
+# return self.rblocker.block_stor(blocklist)
+ (hashes, r_missing) = self.rblocker.block_stor(blocklist)
+ (_, f_missing) = self.fblocker.block_stor(blocklist)
+ return (hashes, union(r_missing, f_missing))
- return hashlist, missing
def block_delta(self, blkhash, offset, data):
"""Construct and store a new block from a given block
and a data 'patch' applied at offset. Return:
(the hash of the new block, if the block already existed)
"""
+# return self.rblocker.block_delta(blkhash, offset, data)
- blocksize = self.blocksize
- if offset >= blocksize or not data:
+ (f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data)
+ (r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset, data)
+ if not r_hash and not f_hash:
return None, None
-
- block = self.block_retr((blkhash,))
- if not block:
- return None, None
-
- block = block[0]
- newblock = block[:offset] + data
- if len(newblock) > blocksize:
- newblock = newblock[:blocksize]
- elif len(newblock) < blocksize:
- newblock += block[len(newblock):]
-
- h, a = self.block_stor((newblock,))
- return h[0], 1 if a else 0
-
- def block_hash_file(self, openfile):
- """Return the list of hashes (hashes map)
- for the blocks in a buffered file.
- Helper method, does not affect store.
- """
- hashes = []
- append = hashes.append
- block_hash = self.block_hash
-
- for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
- append(block_hash(block))
-
- return hashes
-
- def block_stor_file(self, openfile):
- """Read blocks from buffered file object and store them. Return:
- (bytes read, list of hashes, list of hashes that were missing)
- """
- blocksize = self.blocksize
- block_stor = self.block_stor
- hashlist = []
- hextend = hashlist.extend
- storedlist = []
- sextend = storedlist.extend
- lastsize = 0
-
- for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
- hl, sl = block_stor((block,))
- hextend(hl)
- sextend(sl)
- lastsize = len(block)
-
- size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
- return size, hashlist, storedlist
-
+ if not r_hash:
+ block = self.fblocker.block_retr((blkhash,))
+ if not block:
+ return None, None
+ block = block[0]
+ newblock = block[:offset] + data
+ if len(newblock) > blocksize:
+ newblock = newblock[:blocksize]
+ elif len(newblock) < blocksize:
+ newblock += block[len(newblock):]
+ r_hash, r_existed = self.rblocker.block_stor((newblock,))
+
+ return f_hash, 1 if r_existed and f_existed else 0