Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / radosblocker.py @ 4a7b190f

History | View | Annotate | Download (6.8 kB)

1 c30635bf Filippos Giannakos
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 c30635bf Filippos Giannakos
#
3 c30635bf Filippos Giannakos
# Redistribution and use in source and binary forms, with or
4 c30635bf Filippos Giannakos
# without modification, are permitted provided that the following
5 c30635bf Filippos Giannakos
# conditions are met:
6 c30635bf Filippos Giannakos
#
7 c30635bf Filippos Giannakos
#   1. Redistributions of source code must retain the above
8 c30635bf Filippos Giannakos
#      copyright notice, this list of conditions and the following
9 c30635bf Filippos Giannakos
#      disclaimer.
10 c30635bf Filippos Giannakos
#
11 c30635bf Filippos Giannakos
#   2. Redistributions in binary form must reproduce the above
12 c30635bf Filippos Giannakos
#      copyright notice, this list of conditions and the following
13 c30635bf Filippos Giannakos
#      disclaimer in the documentation and/or other materials
14 c30635bf Filippos Giannakos
#      provided with the distribution.
15 c30635bf Filippos Giannakos
#
16 c30635bf Filippos Giannakos
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 c30635bf Filippos Giannakos
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 c30635bf Filippos Giannakos
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 c30635bf Filippos Giannakos
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 c30635bf Filippos Giannakos
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 c30635bf Filippos Giannakos
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 c30635bf Filippos Giannakos
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 c30635bf Filippos Giannakos
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 c30635bf Filippos Giannakos
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 c30635bf Filippos Giannakos
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 c30635bf Filippos Giannakos
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 c30635bf Filippos Giannakos
# POSSIBILITY OF SUCH DAMAGE.
28 c30635bf Filippos Giannakos
#
29 c30635bf Filippos Giannakos
# The views and conclusions contained in the software and
30 c30635bf Filippos Giannakos
# documentation are those of the authors and should not be
31 c30635bf Filippos Giannakos
# interpreted as representing official policies, either expressed
32 c30635bf Filippos Giannakos
# or implied, of GRNET S.A.
33 c30635bf Filippos Giannakos
34 c30635bf Filippos Giannakos
from hashlib import new as newhasher
35 c30635bf Filippos Giannakos
from binascii import hexlify
36 c30635bf Filippos Giannakos
from rados import *
37 c30635bf Filippos Giannakos
38 c30635bf Filippos Giannakos
from context_object import RadosObject, file_sync_read_chunks
39 c30635bf Filippos Giannakos
40 29148653 Sofia Papagiannaki
CEPH_CONF_FILE = "/etc/ceph/ceph.conf"
41 29148653 Sofia Papagiannaki
42 c30635bf Filippos Giannakos
43 c30635bf Filippos Giannakos
class RadosBlocker(object):
44 c30635bf Filippos Giannakos
    """Blocker.
45 c30635bf Filippos Giannakos
       Required constructor parameters: blocksize, blockpath, hashtype.
46 c30635bf Filippos Giannakos
    """
47 c30635bf Filippos Giannakos
48 c30635bf Filippos Giannakos
    blocksize = None
49 c30635bf Filippos Giannakos
    blockpool = None
50 c30635bf Filippos Giannakos
    hashtype = None
51 c30635bf Filippos Giannakos
52 c30635bf Filippos Giannakos
    def __init__(self, **params):
53 c30635bf Filippos Giannakos
        blocksize = params['blocksize']
54 c30635bf Filippos Giannakos
        blockpool = params['blockpool']
55 c30635bf Filippos Giannakos
56 c30635bf Filippos Giannakos
        rados = Rados(conffile=CEPH_CONF_FILE)
57 c30635bf Filippos Giannakos
        rados.connect()
58 c30635bf Filippos Giannakos
        ioctx = rados.open_ioctx(blockpool)
59 c30635bf Filippos Giannakos
60 c30635bf Filippos Giannakos
        hashtype = params['hashtype']
61 c30635bf Filippos Giannakos
        try:
62 c30635bf Filippos Giannakos
            hasher = newhasher(hashtype)
63 c30635bf Filippos Giannakos
        except ValueError:
64 c30635bf Filippos Giannakos
            msg = "Variable hashtype '%s' is not available from hashlib"
65 c30635bf Filippos Giannakos
            raise ValueError(msg % (hashtype,))
66 c30635bf Filippos Giannakos
67 c30635bf Filippos Giannakos
        hasher.update("")
68 c30635bf Filippos Giannakos
        emptyhash = hasher.digest()
69 c30635bf Filippos Giannakos
70 c30635bf Filippos Giannakos
        self.blocksize = blocksize
71 c30635bf Filippos Giannakos
        self.blockpool = blockpool
72 c30635bf Filippos Giannakos
        self.rados = rados
73 c30635bf Filippos Giannakos
        self.ioctx = ioctx
74 c30635bf Filippos Giannakos
        self.hashtype = hashtype
75 c30635bf Filippos Giannakos
        self.hashlen = len(emptyhash)
76 c30635bf Filippos Giannakos
        self.emptyhash = emptyhash
77 c30635bf Filippos Giannakos
78 c30635bf Filippos Giannakos
    def _pad(self, block):
79 c30635bf Filippos Giannakos
        return block + ('\x00' * (self.blocksize - len(block)))
80 c30635bf Filippos Giannakos
81 c30635bf Filippos Giannakos
    def _get_rear_block(self, blkhash, create=0):
82 c30635bf Filippos Giannakos
        name = hexlify(blkhash)
83 c30635bf Filippos Giannakos
        return RadosObject(name, self.ioctx, create)
84 c30635bf Filippos Giannakos
85 c30635bf Filippos Giannakos
    def _check_rear_block(self, blkhash):
86 c30635bf Filippos Giannakos
        filename = hexlify(blkhash)
87 c30635bf Filippos Giannakos
        try:
88 c30635bf Filippos Giannakos
            self.ioctx.stat(filename)
89 c30635bf Filippos Giannakos
            return True
90 c30635bf Filippos Giannakos
        except ObjectNotFound:
91 c30635bf Filippos Giannakos
            return False
92 c30635bf Filippos Giannakos
93 c30635bf Filippos Giannakos
    def block_hash(self, data):
94 c30635bf Filippos Giannakos
        """Hash a block of data"""
95 c30635bf Filippos Giannakos
        hasher = newhasher(self.hashtype)
96 c30635bf Filippos Giannakos
        hasher.update(data.rstrip('\x00'))
97 c30635bf Filippos Giannakos
        return hasher.digest()
98 c30635bf Filippos Giannakos
99 c30635bf Filippos Giannakos
    def block_ping(self, hashes):
100 c30635bf Filippos Giannakos
        """Check hashes for existence and
101 c30635bf Filippos Giannakos
           return those missing from block storage.
102 c30635bf Filippos Giannakos
        """
103 c30635bf Filippos Giannakos
        notfound = []
104 c30635bf Filippos Giannakos
        append = notfound.append
105 c30635bf Filippos Giannakos
106 c30635bf Filippos Giannakos
        for h in hashes:
107 c30635bf Filippos Giannakos
            if h not in notfound and not self._check_rear_block(h):
108 c30635bf Filippos Giannakos
                append(h)
109 c30635bf Filippos Giannakos
110 c30635bf Filippos Giannakos
        return notfound
111 c30635bf Filippos Giannakos
112 c30635bf Filippos Giannakos
    def block_retr(self, hashes):
113 c30635bf Filippos Giannakos
        """Retrieve blocks from storage by their hashes."""
114 c30635bf Filippos Giannakos
        blocksize = self.blocksize
115 c30635bf Filippos Giannakos
        blocks = []
116 c30635bf Filippos Giannakos
        append = blocks.append
117 c30635bf Filippos Giannakos
        block = None
118 c30635bf Filippos Giannakos
119 c30635bf Filippos Giannakos
        for h in hashes:
120 c30635bf Filippos Giannakos
            if h == self.emptyhash:
121 c30635bf Filippos Giannakos
                append(self._pad(''))
122 c30635bf Filippos Giannakos
                continue
123 c30635bf Filippos Giannakos
            with self._get_rear_block(h, 0) as rbl:
124 c30635bf Filippos Giannakos
                if not rbl:
125 c30635bf Filippos Giannakos
                    break
126 c30635bf Filippos Giannakos
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
127 29148653 Sofia Papagiannaki
                    break  # there should be just one block there
128 c30635bf Filippos Giannakos
            if not block:
129 c30635bf Filippos Giannakos
                break
130 c30635bf Filippos Giannakos
            append(self._pad(block))
131 c30635bf Filippos Giannakos
132 c30635bf Filippos Giannakos
        return blocks
133 c30635bf Filippos Giannakos
134 c30635bf Filippos Giannakos
    def block_stor(self, blocklist):
135 c30635bf Filippos Giannakos
        """Store a bunch of blocks and return (hashes, missing).
136 c30635bf Filippos Giannakos
           Hashes is a list of the hashes of the blocks,
137 c30635bf Filippos Giannakos
           missing is a list of indices in that list indicating
138 c30635bf Filippos Giannakos
           which blocks were missing from the store.
139 c30635bf Filippos Giannakos
        """
140 c30635bf Filippos Giannakos
        block_hash = self.block_hash
141 c30635bf Filippos Giannakos
        hashlist = [block_hash(b) for b in blocklist]
142 29148653 Sofia Papagiannaki
        missing = [i for i, h in enumerate(hashlist) if not
143 29148653 Sofia Papagiannaki
                   self._check_rear_block(h)]
144 c30635bf Filippos Giannakos
        for i in missing:
145 c30635bf Filippos Giannakos
            with self._get_rear_block(hashlist[i], 1) as rbl:
146 29148653 Sofia Papagiannaki
                rbl.sync_write(blocklist[i])  # XXX: verify?
147 c30635bf Filippos Giannakos
148 c30635bf Filippos Giannakos
        return hashlist, missing
149 c30635bf Filippos Giannakos
150 c30635bf Filippos Giannakos
    def block_delta(self, blkhash, offset, data):
151 c30635bf Filippos Giannakos
        """Construct and store a new block from a given block
152 c30635bf Filippos Giannakos
           and a data 'patch' applied at offset. Return:
153 c30635bf Filippos Giannakos
           (the hash of the new block, if the block already existed)
154 c30635bf Filippos Giannakos
        """
155 c30635bf Filippos Giannakos
156 c30635bf Filippos Giannakos
        blocksize = self.blocksize
157 c30635bf Filippos Giannakos
        if offset >= blocksize or not data:
158 c30635bf Filippos Giannakos
            return None, None
159 c30635bf Filippos Giannakos
160 c30635bf Filippos Giannakos
        block = self.block_retr((blkhash,))
161 c30635bf Filippos Giannakos
        if not block:
162 c30635bf Filippos Giannakos
            return None, None
163 c30635bf Filippos Giannakos
164 c30635bf Filippos Giannakos
        block = block[0]
165 c30635bf Filippos Giannakos
        newblock = block[:offset] + data
166 c30635bf Filippos Giannakos
        if len(newblock) > blocksize:
167 c30635bf Filippos Giannakos
            newblock = newblock[:blocksize]
168 c30635bf Filippos Giannakos
        elif len(newblock) < blocksize:
169 c30635bf Filippos Giannakos
            newblock += block[len(newblock):]
170 c30635bf Filippos Giannakos
171 c30635bf Filippos Giannakos
        h, a = self.block_stor((newblock,))
172 c30635bf Filippos Giannakos
        return h[0], 1 if a else 0
173 c30635bf Filippos Giannakos
174 c30635bf Filippos Giannakos
    def block_hash_file(self, radosobject):
175 c30635bf Filippos Giannakos
        """Return the list of hashes (hashes map)
176 c30635bf Filippos Giannakos
           for the blocks in a buffered file.
177 c30635bf Filippos Giannakos
           Helper method, does not affect store.
178 c30635bf Filippos Giannakos
        """
179 c30635bf Filippos Giannakos
        hashes = []
180 c30635bf Filippos Giannakos
        append = hashes.append
181 c30635bf Filippos Giannakos
        block_hash = self.block_hash
182 c30635bf Filippos Giannakos
183 c30635bf Filippos Giannakos
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
184 c30635bf Filippos Giannakos
            append(block_hash(block))
185 c30635bf Filippos Giannakos
186 c30635bf Filippos Giannakos
        return hashes
187 c30635bf Filippos Giannakos
188 c30635bf Filippos Giannakos
    def block_stor_file(self, radosobject):
189 c30635bf Filippos Giannakos
        """Read blocks from buffered file object and store them. Return:
190 c30635bf Filippos Giannakos
           (bytes read, list of hashes, list of hashes that were missing)
191 c30635bf Filippos Giannakos
        """
192 c30635bf Filippos Giannakos
        blocksize = self.blocksize
193 c30635bf Filippos Giannakos
        block_stor = self.block_stor
194 c30635bf Filippos Giannakos
        hashlist = []
195 c30635bf Filippos Giannakos
        hextend = hashlist.extend
196 c30635bf Filippos Giannakos
        storedlist = []
197 c30635bf Filippos Giannakos
        sextend = storedlist.extend
198 c30635bf Filippos Giannakos
        lastsize = 0
199 c30635bf Filippos Giannakos
200 c30635bf Filippos Giannakos
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
201 c30635bf Filippos Giannakos
            hl, sl = block_stor((block,))
202 c30635bf Filippos Giannakos
            hextend(hl)
203 c30635bf Filippos Giannakos
            sextend(sl)
204 c30635bf Filippos Giannakos
            lastsize = len(block)
205 c30635bf Filippos Giannakos
206 29148653 Sofia Papagiannaki
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
207 c30635bf Filippos Giannakos
        return size, hashlist, storedlist