Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / radosblocker.py @ 93ac642e

History | View | Annotate | Download (6.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

    
38
from context_object import RadosObject, file_sync_read_chunks
39

    
40
CEPH_CONF_FILE = "/etc/ceph/ceph.conf"
41

    
42

    
43
class RadosBlocker(object):
44
    """Blocker.
45
       Required constructor parameters: blocksize, blockpath, hashtype.
46
    """
47

    
48
    blocksize = None
49
    blockpool = None
50
    hashtype = None
51
    rados = None
52
    rados_ctx = None
53

    
54
    @classmethod
55
    def get_rados_ctx(cls, pool):
56
        if cls.rados_ctx is None:
57
            cls.rados = Rados(conffile=CEPH_CONF_FILE)
58
            cls.rados.connect()
59
            cls.rados_ctx = cls.rados.open_ioctx(pool)
60
        return cls.rados_ctx
61

    
62
    def __init__(self, **params):
63
        blocksize = params['blocksize']
64
        blockpool = params['blockpool']
65

    
66
        hashtype = params['hashtype']
67
        try:
68
            hasher = newhasher(hashtype)
69
        except ValueError:
70
            msg = "Variable hashtype '%s' is not available from hashlib"
71
            raise ValueError(msg % (hashtype,))
72

    
73
        hasher.update("")
74
        emptyhash = hasher.digest()
75

    
76
        self.blocksize = blocksize
77
        self.blockpool = blockpool
78
        self.ioctx = RadosBlocker.get_rados_ctx(self.blockpool)
79
        self.hashtype = hashtype
80
        self.hashlen = len(emptyhash)
81
        self.emptyhash = emptyhash
82

    
83
    def _pad(self, block):
84
        return block + ('\x00' * (self.blocksize - len(block)))
85

    
86
    def _get_rear_block(self, blkhash):
87
        name = hexlify(blkhash)
88
        return RadosObject(name, self.ioctx)
89

    
90
    def _check_rear_block(self, blkhash):
91
        filename = hexlify(blkhash)
92
        try:
93
            self.ioctx.stat(filename)
94
            return True
95
        except ObjectNotFound:
96
            return False
97

    
98
    def block_hash(self, data):
99
        """Hash a block of data"""
100
        hasher = newhasher(self.hashtype)
101
        hasher.update(data.rstrip('\x00'))
102
        return hasher.digest()
103

    
104
    def block_ping(self, hashes):
105
        """Check hashes for existence and
106
           return those missing from block storage.
107
        """
108
        notfound = []
109
        append = notfound.append
110

    
111
        for h in hashes:
112
            if h not in notfound and not self._check_rear_block(h):
113
                append(h)
114

    
115
        return notfound
116

    
117
    def block_retr(self, hashes):
118
        """Retrieve blocks from storage by their hashes."""
119
        blocksize = self.blocksize
120
        blocks = []
121
        append = blocks.append
122
        block = None
123

    
124
        for h in hashes:
125
            if h == self.emptyhash:
126
                append(self._pad(''))
127
                continue
128
            with self._get_rear_block(h) as rbl:
129
                if not rbl:
130
                    break
131
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
132
                    break  # there should be just one block there
133
            if not block:
134
                break
135
            append(self._pad(block))
136

    
137
        return blocks
138

    
139
    def block_stor(self, blocklist):
140
        """Store a bunch of blocks and return (hashes, missing).
141
           Hashes is a list of the hashes of the blocks,
142
           missing is a list of indices in that list indicating
143
           which blocks were missing from the store.
144
        """
145
        block_hash = self.block_hash
146
        hashlist = [block_hash(b) for b in blocklist]
147
        missing = [i for i, h in enumerate(hashlist) if not
148
                   self._check_rear_block(h)]
149
        for i in missing:
150
            with self._get_rear_block(hashlist[i]) as rbl:
151
                rbl.sync_write(blocklist[i])  # XXX: verify?
152

    
153
        return hashlist, missing
154

    
155
    def block_delta(self, blkhash, offset, data):
156
        """Construct and store a new block from a given block
157
           and a data 'patch' applied at offset. Return:
158
           (the hash of the new block, if the block already existed)
159
        """
160

    
161
        blocksize = self.blocksize
162
        if offset >= blocksize or not data:
163
            return None, None
164

    
165
        block = self.block_retr((blkhash,))
166
        if not block:
167
            return None, None
168

    
169
        block = block[0]
170
        newblock = block[:offset] + data
171
        if len(newblock) > blocksize:
172
            newblock = newblock[:blocksize]
173
        elif len(newblock) < blocksize:
174
            newblock += block[len(newblock):]
175

    
176
        h, a = self.block_stor((newblock,))
177
        return h[0], 1 if a else 0
178

    
179
    def block_hash_file(self, radosobject):
180
        """Return the list of hashes (hashes map)
181
           for the blocks in a buffered file.
182
           Helper method, does not affect store.
183
        """
184
        hashes = []
185
        append = hashes.append
186
        block_hash = self.block_hash
187

    
188
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
189
            append(block_hash(block))
190

    
191
        return hashes
192

    
193
    def block_stor_file(self, radosobject):
194
        """Read blocks from buffered file object and store them. Return:
195
           (bytes read, list of hashes, list of hashes that were missing)
196
        """
197
        blocksize = self.blocksize
198
        block_stor = self.block_stor
199
        hashlist = []
200
        hextend = hashlist.extend
201
        storedlist = []
202
        sextend = storedlist.extend
203
        lastsize = 0
204

    
205
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
206
            hl, sl = block_stor((block,))
207
            hextend(hl)
208
            sextend(sl)
209
            lastsize = len(block)
210

    
211
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
212
        return size, hashlist, storedlist