Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / radosblocker.py @ 4a7b190f

History | View | Annotate | Download (6.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

    
38
from context_object import RadosObject, file_sync_read_chunks
39

    
40
CEPH_CONF_FILE = "/etc/ceph/ceph.conf"
41

    
42

    
43
class RadosBlocker(object):
44
    """Blocker.
45
       Required constructor parameters: blocksize, blockpath, hashtype.
46
    """
47

    
48
    blocksize = None
49
    blockpool = None
50
    hashtype = None
51

    
52
    def __init__(self, **params):
53
        blocksize = params['blocksize']
54
        blockpool = params['blockpool']
55

    
56
        rados = Rados(conffile=CEPH_CONF_FILE)
57
        rados.connect()
58
        ioctx = rados.open_ioctx(blockpool)
59

    
60
        hashtype = params['hashtype']
61
        try:
62
            hasher = newhasher(hashtype)
63
        except ValueError:
64
            msg = "Variable hashtype '%s' is not available from hashlib"
65
            raise ValueError(msg % (hashtype,))
66

    
67
        hasher.update("")
68
        emptyhash = hasher.digest()
69

    
70
        self.blocksize = blocksize
71
        self.blockpool = blockpool
72
        self.rados = rados
73
        self.ioctx = ioctx
74
        self.hashtype = hashtype
75
        self.hashlen = len(emptyhash)
76
        self.emptyhash = emptyhash
77

    
78
    def _pad(self, block):
79
        return block + ('\x00' * (self.blocksize - len(block)))
80

    
81
    def _get_rear_block(self, blkhash, create=0):
82
        name = hexlify(blkhash)
83
        return RadosObject(name, self.ioctx, create)
84

    
85
    def _check_rear_block(self, blkhash):
86
        filename = hexlify(blkhash)
87
        try:
88
            self.ioctx.stat(filename)
89
            return True
90
        except ObjectNotFound:
91
            return False
92

    
93
    def block_hash(self, data):
94
        """Hash a block of data"""
95
        hasher = newhasher(self.hashtype)
96
        hasher.update(data.rstrip('\x00'))
97
        return hasher.digest()
98

    
99
    def block_ping(self, hashes):
100
        """Check hashes for existence and
101
           return those missing from block storage.
102
        """
103
        notfound = []
104
        append = notfound.append
105

    
106
        for h in hashes:
107
            if h not in notfound and not self._check_rear_block(h):
108
                append(h)
109

    
110
        return notfound
111

    
112
    def block_retr(self, hashes):
113
        """Retrieve blocks from storage by their hashes."""
114
        blocksize = self.blocksize
115
        blocks = []
116
        append = blocks.append
117
        block = None
118

    
119
        for h in hashes:
120
            if h == self.emptyhash:
121
                append(self._pad(''))
122
                continue
123
            with self._get_rear_block(h, 0) as rbl:
124
                if not rbl:
125
                    break
126
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
127
                    break  # there should be just one block there
128
            if not block:
129
                break
130
            append(self._pad(block))
131

    
132
        return blocks
133

    
134
    def block_stor(self, blocklist):
135
        """Store a bunch of blocks and return (hashes, missing).
136
           Hashes is a list of the hashes of the blocks,
137
           missing is a list of indices in that list indicating
138
           which blocks were missing from the store.
139
        """
140
        block_hash = self.block_hash
141
        hashlist = [block_hash(b) for b in blocklist]
142
        missing = [i for i, h in enumerate(hashlist) if not
143
                   self._check_rear_block(h)]
144
        for i in missing:
145
            with self._get_rear_block(hashlist[i], 1) as rbl:
146
                rbl.sync_write(blocklist[i])  # XXX: verify?
147

    
148
        return hashlist, missing
149

    
150
    def block_delta(self, blkhash, offset, data):
151
        """Construct and store a new block from a given block
152
           and a data 'patch' applied at offset. Return:
153
           (the hash of the new block, if the block already existed)
154
        """
155

    
156
        blocksize = self.blocksize
157
        if offset >= blocksize or not data:
158
            return None, None
159

    
160
        block = self.block_retr((blkhash,))
161
        if not block:
162
            return None, None
163

    
164
        block = block[0]
165
        newblock = block[:offset] + data
166
        if len(newblock) > blocksize:
167
            newblock = newblock[:blocksize]
168
        elif len(newblock) < blocksize:
169
            newblock += block[len(newblock):]
170

    
171
        h, a = self.block_stor((newblock,))
172
        return h[0], 1 if a else 0
173

    
174
    def block_hash_file(self, radosobject):
175
        """Return the list of hashes (hashes map)
176
           for the blocks in a buffered file.
177
           Helper method, does not affect store.
178
        """
179
        hashes = []
180
        append = hashes.append
181
        block_hash = self.block_hash
182

    
183
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
184
            append(block_hash(block))
185

    
186
        return hashes
187

    
188
    def block_stor_file(self, radosobject):
189
        """Read blocks from buffered file object and store them. Return:
190
           (bytes read, list of hashes, list of hashes that were missing)
191
        """
192
        blocksize = self.blocksize
193
        block_stor = self.block_stor
194
        hashlist = []
195
        hextend = hashlist.extend
196
        storedlist = []
197
        sextend = storedlist.extend
198
        lastsize = 0
199

    
200
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
201
            hl, sl = block_stor((block,))
202
            hextend(hl)
203
            sextend(sl)
204
            lastsize = len(block)
205

    
206
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
207
        return size, hashlist, storedlist