Merge branch 'master' of https://code.grnet.gr/git/pithos
[pithos] / pithos / backends / lib_alchemy / hashfiler / hashfiler / blocker.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from os import makedirs
35 from os.path import isdir, realpath, exists, join
36 from hashlib import new as newhasher
37 from binascii import hexlify
38
39 from context_file import ContextFile, file_sync_read_chunks
40
41
42 class Blocker(object):
43     """Blocker.
44        Required contstructor parameters: blocksize, blockpath, hashtype.
45     """
46
47     blocksize = None
48     blockpath = None
49     hashtype = None
50
51     def __init__(self, **params):
52         blocksize = params['blocksize']
53         blockpath = params['blockpath']
54         blockpath = realpath(blockpath)
55         if not isdir(blockpath):
56             if not exists(blockpath):
57                 makedirs(blockpath)
58             else:
59                 raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
60
61         hashtype = params['hashtype']
62         try:
63             hasher = newhasher(hashtype)
64         except ValueError:
65             msg = "Variable hashtype '%s' is not available from hashlib"
66             raise ValueError(msg % (hashtype,))
67
68         hasher.update("")
69         emptyhash = hasher.digest()
70
71         self.blocksize = blocksize
72         self.blockpath = blockpath
73         self.hashtype = hashtype
74         self.hashlen = len(emptyhash)
75         self.emptyhash = emptyhash
76
77     def _get_rear_block(self, blkhash, create=0):
78         filename = hexlify(blkhash)
79         dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
80         if not exists(dir):
81             makedirs(dir)
82         name = join(dir, filename)
83         return ContextFile(name, create)
84
85     def _check_rear_block(self, blkhash):
86         filename = hexlify(blkhash)
87         dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
88         name = join(dir, filename)
89         return exists(name)
90
91     def block_hash(self, data):
92         """Hash a block of data"""
93         hasher = newhasher(self.hashtype)
94         hasher.update(data.rstrip('\x00'))
95         return hasher.digest()
96
97     def block_ping(self, hashes):
98         """Check hashes for existence and
99            return those missing from block storage.
100         """
101         missing = []
102         append = missing.append
103         for i, h in enumerate(hashes):
104             if not self._check_rear_block(h):
105                 append(i)
106         return missing
107
108     def block_retr(self, hashes):
109         """Retrieve blocks from storage by their hashes."""
110         blocksize = self.blocksize
111         blocks = []
112         append = blocks.append
113         block = None
114
115         for h in hashes:
116             with self._get_rear_block(h, 0) as rbl:
117                 if not rbl:
118                     break
119                 for block in rbl.sync_read_chunks(blocksize, 1, 0):
120                     break # there should be just one block there
121             if not block:
122                 break
123             append(block)
124
125         return blocks
126
127     def block_stor(self, blocklist):
128         """Store a bunch of blocks and return (hashes, missing).
129            Hashes is a list of the hashes of the blocks,
130            missing is a list of indices in that list indicating
131            which blocks were missing from the store.
132         """
133         block_hash = self.block_hash
134         hashlist = [block_hash(b) for b in blocklist]
135         mf = None
136         missing = self.block_ping(hashlist)
137         for i in missing:
138             with self._get_rear_block(hashlist[i], 1) as rbl:
139                  rbl.sync_write(blocklist[i]) #XXX: verify?
140
141         return hashlist, missing
142
143     def block_delta(self, blkhash, offdata=()):
144         """Construct and store a new block from a given block
145            and a list of (offset, data) 'patches'. Return:
146            (the hash of the new block, if the block already existed)
147         """
148         if not offdata:
149             return None, None
150
151         blocksize = self.blocksize
152         block = self.block_retr((blkhash,))
153         if not block:
154             return None, None
155
156         block = block[0]
157         newblock = ''
158         idx = 0
159         size = 0
160         trunc = 0
161         for off, data in offdata:
162             if not data:
163                 trunc = 1
164                 break
165             newblock += block[idx:off] + data
166             size += off - idx + len(data)
167             if size >= blocksize:
168                 break
169             off = size
170
171         if not trunc:
172             newblock += block[size:len(block)]
173
174         h, a = self.block_stor((newblock,))
175         return h[0], 1 if a else 0
176
177     def block_hash_file(self, openfile):
178         """Return the list of hashes (hashes map)
179            for the blocks in a buffered file.
180            Helper method, does not affect store.
181         """
182         hashes = []
183         append = hashes.append
184         block_hash = self.block_hash
185
186         for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
187             append(block_hash(block))
188
189         return hashes
190
191     def block_stor_file(self, openfile):
192         """Read blocks from buffered file object and store them. Return:
193            (bytes read, list of hashes, list of hashes that were missing)
194         """
195         blocksize = self.blocksize
196         block_stor = self.block_stor
197         hashlist = []
198         hextend = hashlist.extend
199         storedlist = []
200         sextend = storedlist.extend
201         lastsize = 0
202
203         for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
204             hl, sl = block_stor((block,))
205             hextend(hl)
206             sextend(sl)
207             lastsize = len(block)
208
209         size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
210         return size, hashlist, storedlist
211