Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / radosblocker.py @ e3a9d32a

History | View | Annotate | Download (6.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

    
38
from context_object import RadosObject, file_sync_read_chunks
39

    
40
CEPH_CONF_FILE="/etc/ceph/ceph.conf"
41

    
42
class RadosBlocker(object):
43
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
45
    """
46

    
47
    blocksize = None
48
    blockpool = None
49
    hashtype = None
50

    
51
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpool = params['blockpool']
54

    
55
        rados = Rados(conffile=CEPH_CONF_FILE)
56
        rados.connect()
57
        ioctx = rados.open_ioctx(blockpool)
58

    
59
        hashtype = params['hashtype']
60
        try:
61
            hasher = newhasher(hashtype)
62
        except ValueError:
63
            msg = "Variable hashtype '%s' is not available from hashlib"
64
            raise ValueError(msg % (hashtype,))
65

    
66
        hasher.update("")
67
        emptyhash = hasher.digest()
68

    
69
        self.blocksize = blocksize
70
        self.blockpool = blockpool
71
        self.rados = rados
72
        self.ioctx = ioctx
73
        self.hashtype = hashtype
74
        self.hashlen = len(emptyhash)
75
        self.emptyhash = emptyhash
76

    
77
    def _pad(self, block):
78
        return block + ('\x00' * (self.blocksize - len(block)))
79

    
80
    def _get_rear_block(self, blkhash, create=0):
81
        name = hexlify(blkhash)
82
        return RadosObject(name, self.ioctx, create)
83

    
84
    def _check_rear_block(self, blkhash):
85
        filename = hexlify(blkhash)
86
        try:
87
            self.ioctx.stat(filename)
88
            return True
89
        except ObjectNotFound:
90
            return False
91

    
92
    def block_hash(self, data):
93
        """Hash a block of data"""
94
        hasher = newhasher(self.hashtype)
95
        hasher.update(data.rstrip('\x00'))
96
        return hasher.digest()
97

    
98
    def block_ping(self, hashes):
99
        """Check hashes for existence and
100
           return those missing from block storage.
101
        """
102
        notfound = []
103
        append = notfound.append
104

    
105
        for h in hashes:
106
            if h not in notfound and not self._check_rear_block(h):
107
                append(h)
108

    
109
        return notfound
110

    
111
    def block_retr(self, hashes):
112
        """Retrieve blocks from storage by their hashes."""
113
        blocksize = self.blocksize
114
        blocks = []
115
        append = blocks.append
116
        block = None
117

    
118
        for h in hashes:
119
            if h == self.emptyhash:
120
                append(self._pad(''))
121
                continue
122
            with self._get_rear_block(h, 0) as rbl:
123
                if not rbl:
124
                    break
125
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
126
                    break # there should be just one block there
127
            if not block:
128
                break
129
            append(self._pad(block))
130

    
131
        return blocks
132

    
133
    def block_stor(self, blocklist):
134
        """Store a bunch of blocks and return (hashes, missing).
135
           Hashes is a list of the hashes of the blocks,
136
           missing is a list of indices in that list indicating
137
           which blocks were missing from the store.
138
        """
139
        block_hash = self.block_hash
140
        hashlist = [block_hash(b) for b in blocklist]
141
        mf = None
142
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
143
        for i in missing:
144
            with self._get_rear_block(hashlist[i], 1) as rbl:
145
                 rbl.sync_write(blocklist[i]) #XXX: verify?
146

    
147
        return hashlist, missing
148

    
149
    def block_delta(self, blkhash, offset, data):
150
        """Construct and store a new block from a given block
151
           and a data 'patch' applied at offset. Return:
152
           (the hash of the new block, if the block already existed)
153
        """
154

    
155
        blocksize = self.blocksize
156
        if offset >= blocksize or not data:
157
            return None, None
158

    
159
        block = self.block_retr((blkhash,))
160
        if not block:
161
            return None, None
162

    
163
        block = block[0]
164
        newblock = block[:offset] + data
165
        if len(newblock) > blocksize:
166
            newblock = newblock[:blocksize]
167
        elif len(newblock) < blocksize:
168
            newblock += block[len(newblock):]
169

    
170
        h, a = self.block_stor((newblock,))
171
        return h[0], 1 if a else 0
172

    
173
    def block_hash_file(self, radosobject):
174
        """Return the list of hashes (hashes map)
175
           for the blocks in a buffered file.
176
           Helper method, does not affect store.
177
        """
178
        hashes = []
179
        append = hashes.append
180
        block_hash = self.block_hash
181

    
182
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
183
            append(block_hash(block))
184

    
185
        return hashes
186

    
187
    def block_stor_file(self, radosobject):
188
        """Read blocks from buffered file object and store them. Return:
189
           (bytes read, list of hashes, list of hashes that were missing)
190
        """
191
        blocksize = self.blocksize
192
        block_stor = self.block_stor
193
        hashlist = []
194
        hextend = hashlist.extend
195
        storedlist = []
196
        sextend = storedlist.extend
197
        lastsize = 0
198

    
199
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
200
            hl, sl = block_stor((block,))
201
            hextend(hl)
202
            sextend(sl)
203
            lastsize = len(block)
204

    
205
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
206
        return size, hashlist, storedlist
207