Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / radosblocker.py @ df8debbb

History | View | Annotate | Download (6.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

    
38
from context_object import RadosObject, file_sync_read_chunks
39

    
40
CEPH_CONF_FILE="/etc/ceph/ceph.conf"
41

    
42
class RadosBlocker(object):
43
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
45
    """
46

    
47
    blocksize = None
48
    blockpool = None
49
    hashtype = None
50

    
51
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpool = params['blockpool']
54

    
55
        rados = Rados(conffile=CEPH_CONF_FILE)
56
        rados.connect()
57
        if not rados.pool_exists(blockpool):
58
            rados.create_pool(blockpool)
59

    
60
        ioctx = rados.open_ioctx(blockpool)
61

    
62
        hashtype = params['hashtype']
63
        try:
64
            hasher = newhasher(hashtype)
65
        except ValueError:
66
            msg = "Variable hashtype '%s' is not available from hashlib"
67
            raise ValueError(msg % (hashtype,))
68

    
69
        hasher.update("")
70
        emptyhash = hasher.digest()
71

    
72
        self.blocksize = blocksize
73
        self.blockpool = blockpool
74
        self.rados = rados
75
        self.ioctx = ioctx
76
        self.hashtype = hashtype
77
        self.hashlen = len(emptyhash)
78
        self.emptyhash = emptyhash
79

    
80
    def _pad(self, block):
81
        return block + ('\x00' * (self.blocksize - len(block)))
82

    
83
    def _get_rear_block(self, blkhash, create=0):
84
        name = hexlify(blkhash)
85
        return RadosObject(name, self.ioctx, create)
86

    
87
    def _check_rear_block(self, blkhash):
88
        filename = hexlify(blkhash)
89
        try:
90
            self.ioctx.stat(filename)
91
            return True
92
        except ObjectNotFound:
93
            return False
94

    
95
    def block_hash(self, data):
96
        """Hash a block of data"""
97
        hasher = newhasher(self.hashtype)
98
        hasher.update(data.rstrip('\x00'))
99
        return hasher.digest()
100

    
101
    def block_ping(self, hashes):
102
        """Check hashes for existence and
103
           return those missing from block storage.
104
        """
105
        notfound = []
106
        append = notfound.append
107

    
108
        for h in hashes:
109
            if h not in notfound and not self._check_rear_block(h):
110
                append(h)
111

    
112
        return notfound
113

    
114
    def block_retr(self, hashes):
115
        """Retrieve blocks from storage by their hashes."""
116
        blocksize = self.blocksize
117
        blocks = []
118
        append = blocks.append
119
        block = None
120

    
121
        for h in hashes:
122
            if h == self.emptyhash:
123
                append(self._pad(''))
124
                continue
125
            with self._get_rear_block(h, 0) as rbl:
126
                if not rbl:
127
                    break
128
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
129
                    break # there should be just one block there
130
            if not block:
131
                break
132
            append(self._pad(block))
133

    
134
        return blocks
135

    
136
    def block_stor(self, blocklist):
137
        """Store a bunch of blocks and return (hashes, missing).
138
           Hashes is a list of the hashes of the blocks,
139
           missing is a list of indices in that list indicating
140
           which blocks were missing from the store.
141
        """
142
        block_hash = self.block_hash
143
        hashlist = [block_hash(b) for b in blocklist]
144
        mf = None
145
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
146
        for i in missing:
147
            with self._get_rear_block(hashlist[i], 1) as rbl:
148
                 rbl.sync_write(blocklist[i]) #XXX: verify?
149

    
150
        return hashlist, missing
151

    
152
    def block_delta(self, blkhash, offset, data):
153
        """Construct and store a new block from a given block
154
           and a data 'patch' applied at offset. Return:
155
           (the hash of the new block, if the block already existed)
156
        """
157

    
158
        blocksize = self.blocksize
159
        if offset >= blocksize or not data:
160
            return None, None
161

    
162
        block = self.block_retr((blkhash,))
163
        if not block:
164
            return None, None
165

    
166
        block = block[0]
167
        newblock = block[:offset] + data
168
        if len(newblock) > blocksize:
169
            newblock = newblock[:blocksize]
170
        elif len(newblock) < blocksize:
171
            newblock += block[len(newblock):]
172

    
173
        h, a = self.block_stor((newblock,))
174
        return h[0], 1 if a else 0
175

    
176
    def block_hash_file(self, radosobject):
177
        """Return the list of hashes (hashes map)
178
           for the blocks in a buffered file.
179
           Helper method, does not affect store.
180
        """
181
        hashes = []
182
        append = hashes.append
183
        block_hash = self.block_hash
184

    
185
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
186
            append(block_hash(block))
187

    
188
        return hashes
189

    
190
    def block_stor_file(self, radosobject):
191
        """Read blocks from buffered file object and store them. Return:
192
           (bytes read, list of hashes, list of hashes that were missing)
193
        """
194
        blocksize = self.blocksize
195
        block_stor = self.block_stor
196
        hashlist = []
197
        hextend = hashlist.extend
198
        storedlist = []
199
        sextend = storedlist.extend
200
        lastsize = 0
201

    
202
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
203
            hl, sl = block_stor((block,))
204
            hextend(hl)
205
            sextend(sl)
206
            lastsize = len(block)
207

    
208
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
209
        return size, hashlist, storedlist
210