Revision c30635bf snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py

b/snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from os import makedirs
35
from os.path import isdir, realpath, exists, join
36 34
from hashlib import new as newhasher
37 35
from binascii import hexlify
38 36

  
39
from context_file import ContextFile, file_sync_read_chunks
37
from radosblocker import RadosBlocker
38
from fileblocker import FileBlocker
39

  
40
def intersect(a, b):
41
    """ return the intersection of two lists """
42
    return list(set(a) & set(b))
43

  
44
def union(a, b):
45
    """ return the union of two lists """
46
    return list(set(a) | set(b))
40 47

  
41 48

  
42 49
class Blocker(object):
43 50
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
51
       Required constructor parameters: blocksize, blockpath, hashtype,
52
       blockpool.
45 53
    """
46 54

  
47
    blocksize = None
48
    blockpath = None
49
    hashtype = None
50

  
51 55
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpath = params['blockpath']
54
        blockpath = realpath(blockpath)
55
        if not isdir(blockpath):
56
            if not exists(blockpath):
57
                makedirs(blockpath)
58
            else:
59
                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
60

  
61
        hashtype = params['hashtype']
62
        try:
63
            hasher = newhasher(hashtype)
64
        except ValueError:
65
            msg = "Variable hashtype '%s' is not available from hashlib"
66
            raise ValueError(msg % (hashtype,))
67

  
68
        hasher.update("")
69
        emptyhash = hasher.digest()
70

  
71
        self.blocksize = blocksize
72
        self.blockpath = blockpath
73
        self.hashtype = hashtype
74
        self.hashlen = len(emptyhash)
75
        self.emptyhash = emptyhash
76

  
77
    def _pad(self, block):
78
        return block + ('\x00' * (self.blocksize - len(block)))
79

  
80
    def _get_rear_block(self, blkhash, create=0):
81
        filename = hexlify(blkhash)
82
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
83
        if not exists(dir):
84
            makedirs(dir)
85
        name = join(dir, filename)
86
        return ContextFile(name, create)
87

  
88
    def _check_rear_block(self, blkhash):
89
        filename = hexlify(blkhash)
90
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
91
        name = join(dir, filename)
92
        return exists(name)
56
        params['blockpool'] = 'blocks'
57
        self.rblocker = RadosBlocker(**params)
58
        self.fblocker = FileBlocker(**params)
59
        self.hashlen = self.rblocker.hashlen
60

  
61
#    def _get_rear_block(self, blkhash, create=0):
62
#        return self.rblocker._get_rear_block(blkhash, create)
63

  
64
#    def _check_rear_block(self, blkhash):
65
#        return self.rblocker._check_rear_block(blkhash)
66
#        return self.rblocker._check_rear_block(blkhash) and
67
#                self.fblocker._check_rear_block(blkhash)
93 68

  
94 69
    def block_hash(self, data):
95 70
        """Hash a block of data"""
96
        hasher = newhasher(self.hashtype)
97
        hasher.update(data.rstrip('\x00'))
98
        return hasher.digest()
71
        return self.rblocker.block_hash(data)
99 72

  
100 73
    def block_ping(self, hashes):
101 74
        """Check hashes for existence and
102 75
           return those missing from block storage.
103 76
        """
104
        notfound = []
105
        append = notfound.append
106

  
107
        for h in hashes:
108
            if h not in notfound and not self._check_rear_block(h):
109
                append(h)
110

  
111
        return notfound
77
#        return self.rblocker.block_ping(hashes)
78
        r = self.rblocker.block_ping(hashes)
79
        f = self.fblocker.block_ping(hashes)
80
        return union(r, f)
112 81

  
113 82
    def block_retr(self, hashes):
114 83
        """Retrieve blocks from storage by their hashes."""
115
        blocksize = self.blocksize
116
        blocks = []
117
        append = blocks.append
118
        block = None
119

  
120
        for h in hashes:
121
            if h == self.emptyhash:
122
                append(self._pad(''))
123
                continue
124
            with self._get_rear_block(h, 0) as rbl:
125
                if not rbl:
126
                    break
127
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
128
                    break # there should be just one block there
129
            if not block:
130
                break
131
            append(self._pad(block))
132

  
133
        return blocks
84
        return self.fblocker.block_retr(hashes)
134 85

  
135 86
    def block_stor(self, blocklist):
136 87
        """Store a bunch of blocks and return (hashes, missing).
......
138 89
           missing is a list of indices in that list indicating
139 90
           which blocks were missing from the store.
140 91
        """
141
        block_hash = self.block_hash
142
        hashlist = [block_hash(b) for b in blocklist]
143
        mf = None
144
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
145
        for i in missing:
146
            with self._get_rear_block(hashlist[i], 1) as rbl:
147
                 rbl.sync_write(blocklist[i]) #XXX: verify?
92
#        return self.rblocker.block_stor(blocklist)
93
        (hashes, r_missing) = self.rblocker.block_stor(blocklist)
94
        (_, f_missing) = self.fblocker.block_stor(blocklist)
95
        return (hashes, union(r_missing, f_missing))
148 96

  
149
        return hashlist, missing
150 97

  
151 98
    def block_delta(self, blkhash, offset, data):
152 99
        """Construct and store a new block from a given block
153 100
           and a data 'patch' applied at offset. Return:
154 101
           (the hash of the new block, if the block already existed)
155 102
        """
103
#        return self.rblocker.block_delta(blkhash, offset, data)
156 104

  
157
        blocksize = self.blocksize
158
        if offset >= blocksize or not data:
105
        (f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data)
106
        (r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset, data)
107
        if not r_hash and not f_hash:
159 108
            return None, None
160

  
161
        block = self.block_retr((blkhash,))
162
        if not block:
163
            return None, None
164
        
165
        block = block[0]
166
        newblock = block[:offset] + data
167
        if len(newblock) > blocksize:
168
            newblock = newblock[:blocksize]
169
        elif len(newblock) < blocksize:
170
            newblock += block[len(newblock):]
171

  
172
        h, a = self.block_stor((newblock,))
173
        return h[0], 1 if a else 0
174

  
175
    def block_hash_file(self, openfile):
176
        """Return the list of hashes (hashes map)
177
           for the blocks in a buffered file.
178
           Helper method, does not affect store.
179
        """
180
        hashes = []
181
        append = hashes.append
182
        block_hash = self.block_hash
183

  
184
        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
185
            append(block_hash(block))
186

  
187
        return hashes
188

  
189
    def block_stor_file(self, openfile):
190
        """Read blocks from buffered file object and store them. Return:
191
           (bytes read, list of hashes, list of hashes that were missing)
192
        """
193
        blocksize = self.blocksize
194
        block_stor = self.block_stor
195
        hashlist = []
196
        hextend = hashlist.extend
197
        storedlist = []
198
        sextend = storedlist.extend
199
        lastsize = 0
200

  
201
        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
202
            hl, sl = block_stor((block,))
203
            hextend(hl)
204
            sextend(sl)
205
            lastsize = len(block)
206

  
207
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
208
        return size, hashlist, storedlist
209

  
109
        if not r_hash:
110
            block = self.fblocker.block_retr((blkhash,))
111
            if not block:
112
                return None, None
113
            block = block[0]
114
            newblock = block[:offset] + data
115
            if len(newblock) > blocksize:
116
                newblock = newblock[:blocksize]
117
            elif len(newblock) < blocksize:
118
                newblock += block[len(newblock):]
119
            r_hash, r_existed = self.rblocker.block_stor((newblock,))
120

  
121
        return f_hash, 1 if r_existed and f_existed else 0

Also available in: Unified diff