Revision a1557c9c

b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf
10 10

  
11 11
# Block storage.
12 12
#PITHOS_BACKEND_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
13
#PITHOS_BACKEND_BLOCK_PATH = '/tmp/pithos-data/'
14 13
#PITHOS_BACKEND_BLOCK_UMASK = 0o022
15 14

  
16 15
# Default setting for new accounts.
......
25 24
# Service Token acquired by identity provider.
26 25
#PITHOS_SERVICE_TOKEN = ''
27 26

  
28
# Enable and configure secondary rados storage for pithos
29
#PITHOS_RADOS_STORAGE = False
30
#PITHOS_RADOS_POOL_BLOCKS = 'blocks'
31
#PITHOS_RADOS_POOL_MAPS = 'maps'
32

  
33 27
# This enables a ui compatibility layer for the introduction of UUIDs in
34 28
# identity management.  WARNING: Setting to True will break your installation.
35 29
# PITHOS_TRANSLATE_UUIDS = False
......
63 57
# Set domain to restrict requests of pithos object contents serve endpoint or
64 58
# None for no domain restriction
65 59
#PITHOS_UNSAFE_DOMAIN = None
60
#
61
#Archipelago Configuration File
62
#PITHOS_BACKEND_ARCHIPELAGO_CONF = '/etc/archipelago/archipelago.conf'
b/snf-pithos-app/pithos/api/settings.py
138 138
    settings, 'PITHOS_BACKEND_BLOCK_PATH', '/tmp/pithos-data/')
139 139
BACKEND_BLOCK_UMASK = getattr(settings, 'PITHOS_BACKEND_BLOCK_UMASK', 0o022)
140 140

  
141
# Archipelago Configuration File
142
BACKEND_ARCHIPELAGO_CONF = getattr(
143
        settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
144
        '/etc/archipelago/archipelago.conf')
145

  
141 146
# Queue for billing.
142 147
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', None)
143 148
# Example: 'pithos.backends.lib.rabbitmq'
b/snf-pithos-backend/pithos/backends/__init__.py
33 33

  
34 34
import warnings
35 35

  
36
from pithos.workers import glue
37
from archipelago.common import Segment, Xseg_ctx
38
from objpool import ObjectPool
39
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF as cfile
40

  
41
glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx, cfile,
42
                              pool_size=8)
43

  
36 44

  
37 45
def connect_backend(**kwargs):
38 46
    from pithos.backends.modular import ModularBackend
b/snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from fileblocker import FileBlocker
35

  
36

  
37
def intersect(a, b):
38
    """ return the intersection of two lists """
39
    return list(set(a) & set(b))
40

  
41

  
42
def union(a, b):
43
    """ return the union of two lists """
44
    return list(set(a) | set(b))
34
from archipelagoblocker import ArchipelagoBlocker
45 35

  
46 36

  
47 37
class Blocker(object):
......
51 41
    """
52 42

  
53 43
    def __init__(self, **params):
54
        self.rblocker = None
55
        try:
56
            if params['blockpool']:
57
                from radosblocker import RadosBlocker
58
                self.rblocker = RadosBlocker(**params)
59
        except KeyError:
60
            pass
61

  
62
        self.fblocker = FileBlocker(**params)
63
        self.hashlen = self.fblocker.hashlen
44
        self.archip_blocker = ArchipelagoBlocker(**params)
45
        self.hashlen = self.archip_blocker.hashlen
64 46
        self.blocksize = params['blocksize']
65 47

  
66 48
    def block_hash(self, data):
67 49
        """Hash a block of data"""
68
        return self.fblocker.block_hash(data)
50
        return self.archip_blocker.block_hash(data)
69 51

  
70 52
    def block_ping(self, hashes):
71 53
        """Check hashes for existence and
72 54
           return those missing from block storage.
73 55
        """
74
        r = []
75
        if self.rblocker:
76
            r = self.rblocker.block_ping(hashes)
77
        f = self.fblocker.block_ping(hashes)
78
        return union(r, f)
56
        return self.archip_blocker.block_ping(hashes)
79 57

  
80 58
    def block_retr(self, hashes):
81 59
        """Retrieve blocks from storage by their hashes."""
82
        return self.fblocker.block_retr(hashes)
60
        return self.archip_blocker.block_retr(hashes)
61

  
62
    def block_retr_archipelago(self, hashes):
63
        """Retrieve blocks from storage by theri hashes."""
64
        return self.archip_blocker.block_retr_archipelago(hashes)
83 65

  
84 66
    def block_stor(self, blocklist):
85 67
        """Store a bunch of blocks and return (hashes, missing).
......
87 69
           missing is a list of indices in that list indicating
88 70
           which blocks were missing from the store.
89 71
        """
90
        r_missing = []
91
        (hashes, f_missing) = self.fblocker.block_stor(blocklist)
92
        if self.rblocker:
93
            (_, r_missing) = self.rblocker.block_stor(blocklist)
94
        return (hashes, union(r_missing, f_missing))
72
        (hashes, missing) = self.archip_blocker.block_stor(blocklist)
73
        return (hashes, missing)
95 74

  
96 75
    def block_delta(self, blkhash, offset, data):
97 76
        """Construct and store a new block from a given block
......
99 78
           (the hash of the new block, if the block already existed)
100 79
        """
101 80
        blocksize = self.blocksize
102
        r_hash = None
103
        r_existed = True
104
        (f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data)
105
        if self.rblocker:
106
            (r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset,
107
                                                            data)
108
        if not r_hash and not f_hash:
81
        archip_hash = None
82
        archip_existed = True
83
        (archip_hash, archip_existed) = \
84
                self.archip_blocker.block_delta(blkhash, offset, data)
85

  
86
        if not archip_hash:
109 87
            return None, None
110
        if self.rblocker and not r_hash:
111
            block = self.fblocker.block_retr((blkhash,))
88

  
89
        if self.archip_blocker and not archip_hash:
90
            block = self.archip_blocker.block_retr((blkhash,))
112 91
            if not block:
113 92
                return None, None
114 93
            block = block[0]
......
117 96
                newblock = newblock[:blocksize]
118 97
            elif len(newblock) < blocksize:
119 98
                newblock += block[len(newblock):]
120
            r_hash, r_existed = self.rblocker.block_stor((newblock,))
99
            archip_hash, archip_existed = self.rblocker.block_stor((newblock,))
121 100

  
122
        return f_hash, 1 if r_existed and f_existed else 0
101
        return archip_hash, 1 if archip_existed else 0
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import SEEK_CUR, SEEK_SET
35
from errno import ENOENT, EROFS
36

  
37

  
38
_zeros = ''
39

  
40

  
41
def zeros(nr):
42
    global _zeros
43
    size = len(_zeros)
44
    if nr == size:
45
        return _zeros
46

  
47
    if nr > size:
48
        _zeros += '\0' * (nr - size)
49
        return _zeros
50

  
51
    if nr < size:
52
        _zeros = _zeros[:nr]
53
        return _zeros
54

  
55

  
56
def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None):
57
    """Write given chunks to the given buffered file object.
58
       Writes never span across chunk boundaries.
59
       If size is given stop after or pad until size bytes have been written.
60
    """
61
    fwrite = openfile.write
62
    seek = openfile.seek
63
    padding = 0
64

  
65
    try:
66
        seek(offset * chunksize)
67
    except IOError:
68
        seek = None
69
        for x in xrange(offset):
70
            fwrite(zeros(chunksize))
71

  
72
    cursize = offset * chunksize
73

  
74
    for chunk in chunks:
75
        if padding:
76
            if seek:
77
                seek(padding - 1, SEEK_CUR)
78
                fwrite("\x00")
79
            else:
80
                fwrite(buffer(zeros(chunksize), 0, padding))
81
        if size is not None and cursize + chunksize >= size:
82
            chunk = chunk[:chunksize - (cursize - size)]
83
            fwrite(chunk)
84
            cursize += len(chunk)
85
            break
86
        fwrite(chunk)
87
        padding = chunksize - len(chunk)
88

  
89
    padding = size - cursize if size is not None else 0
90
    if padding <= 0:
91
        return
92

  
93
    q, r = divmod(padding, chunksize)
94
    for x in xrange(q):
95
        fwrite(zeros(chunksize))
96
    fwrite(buffer(zeros(chunksize), 0, r))
97

  
98

  
99
def file_sync_read_chunks(openfile, chunksize, nr, offset=0):
100
    """Read and yield groups of chunks from a buffered file object at offset.
101
       Reads never span accros chunksize boundaries.
102
    """
103
    fread = openfile.read
104
    remains = offset * chunksize
105
    seek = openfile.seek
106
    try:
107
        seek(remains)
108
    except IOError:
109
        seek = None
110
        while 1:
111
            s = fread(remains)
112
            remains -= len(s)
113
            if remains <= 0:
114
                break
115

  
116
    while nr:
117
        remains = chunksize
118
        chunk = ''
119
        while 1:
120
            s = fread(remains)
121
            if not s:
122
                if chunk:
123
                    yield chunk
124
                return
125
            chunk += s
126
            remains -= len(s)
127
            if remains <= 0:
128
                break
129
        yield chunk
130
        nr -= 1
131

  
132

  
133
class ContextFile(object):
134
    __slots__ = ("name", "fdesc", "create")
135

  
136
    def __init__(self, name, create=0):
137
        self.name = name
138
        self.fdesc = None
139
        self.create = create
140
        #self.dirty = 0
141

  
142
    def __enter__(self):
143
        name = self.name
144
        try:
145
            fdesc = open(name, 'rb+')
146
        except IOError, e:
147
            if self.create and e.errno == ENOENT:
148
                fdesc = open(name, 'w+')
149
            elif not self.create and e.errno == EROFS:
150
                fdesc = open(name, 'rb')
151
            else:
152
                raise
153

  
154
        self.fdesc = fdesc
155
        return self
156

  
157
    def __exit__(self, exc, arg, trace):
158
        fdesc = self.fdesc
159
        if fdesc is not None:
160
            #if self.dirty:
161
            #    fsync(fdesc.fileno())
162
            fdesc.close()
163
        return False  # propagate exceptions
164

  
165
    def seek(self, offset, whence=SEEK_SET):
166
        return self.fdesc.seek(offset, whence)
167

  
168
    def tell(self):
169
        return self.fdesc.tell()
170

  
171
    def truncate(self, size):
172
        self.fdesc.truncate(size)
173

  
174
    def sync_write(self, data):
175
        #self.dirty = 1
176
        self.fdesc.write(data)
177

  
178
    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
179
        #self.dirty = 1
180
        return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks,
181
                                      size)
182

  
183
    def sync_read(self, size):
184
        read = self.fdesc.read
185
        data = ''
186
        while 1:
187
            s = read(size)
188
            if not s:
189
                break
190
            data += s
191
        return data
192

  
193
    def sync_read_chunks(self, chunksize, nr, offset=0):
194
        return file_sync_read_chunks(self.fdesc, chunksize, nr, offset)
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import SEEK_CUR, SEEK_SET
35
from rados import ObjectNotFound
36

  
37
_zeros = ''
38

  
39

  
40
def zeros(nr):
41
    global _zeros
42
    size = len(_zeros)
43
    if nr == size:
44
        return _zeros
45

  
46
    if nr > size:
47
        _zeros += '\0' * (nr - size)
48
        return _zeros
49

  
50
    if nr < size:
51
        _zeros = _zeros[:nr]
52
        return _zeros
53

  
54

  
55
def file_sync_write_chunks(radosobject, chunksize, offset, chunks, size=None):
56
    """Write given chunks to the given buffered file object.
57
       Writes never span across chunk boundaries.
58
       If size is given stop after or pad until size bytes have been written.
59
    """
60
    padding = 0
61
    cursize = chunksize * offset
62
    radosobject.seek(cursize)
63
    for chunk in chunks:
64
        if padding:
65
            radosobject.sync_write(buffer(zeros(chunksize), 0, padding))
66
        if size is not None and cursize + chunksize >= size:
67
            chunk = chunk[:chunksize - (cursize - size)]
68
            radosobject.sync_write(chunk)
69
            cursize += len(chunk)
70
            break
71
        radosobject.sync_write(chunk)
72
        padding = chunksize - len(chunk)
73

  
74
    padding = size - cursize if size is not None else 0
75
    if padding <= 0:
76
        return
77

  
78
    q, r = divmod(padding, chunksize)
79
    for x in xrange(q):
80
        radosobject.sunc_write(zeros(chunksize))
81
    radosobject.sync_write(buffer(zeros(chunksize), 0, r))
82

  
83

  
84
def file_sync_read_chunks(radosobject, chunksize, nr, offset=0):
85
    """Read and yield groups of chunks from a buffered file object at offset.
86
       Reads never span accros chunksize boundaries.
87
    """
88
    radosobject.seek(offset * chunksize)
89
    while nr:
90
        remains = chunksize
91
        chunk = ''
92
        while 1:
93
            s = radosobject.sync_read(remains)
94
            if not s:
95
                if chunk:
96
                    yield chunk
97
                return
98
            chunk += s
99
            remains -= len(s)
100
            if remains <= 0:
101
                break
102
        yield chunk
103
        nr -= 1
104

  
105

  
106
class RadosObject(object):
107
    __slots__ = ("name", "ioctx", "create", "offset")
108

  
109
    def __init__(self, name, ioctx, create=0):
110
        self.name = name
111
        self.ioctx = ioctx
112
        self.create = create
113
        self.offset = 0
114
        #self.dirty = 0
115

  
116
    def __enter__(self):
117
        return self
118

  
119
    def __exit__(self, exc, arg, trace):
120
        return False
121

  
122
    def seek(self, offset, whence=SEEK_SET):
123
        if whence == SEEK_CUR:
124
            offset += self.offset
125
        self.offset = offset
126
        return offset
127

  
128
    def tell(self):
129
        return self.offset
130

  
131
    def truncate(self, size):
132
        self.ioctx.trunc(self.name, size)
133

  
134
    def sync_write(self, data):
135
        #self.dirty = 1
136
        self.ioctx.write(self.name, data, self.offset)
137
        self.offset += len(data)
138

  
139
    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
140
        #self.dirty = 1
141
        return file_sync_write_chunks(self, chunksize, offset, chunks, size)
142

  
143
    def sync_read(self, size):
144
        read = self.ioctx.read
145
        data = ''
146
        datalen = 0
147
        while 1:
148
            try:
149
                s = read(self.name, size - datalen, self.offset)
150
            except ObjectNotFound:
151
                s = None
152
            if not s:
153
                break
154
            data += s
155
            datalen += len(s)
156
            self.offset += len(s)
157
            if datalen >= size:
158
                break
159
        return data
160

  
161
    def sync_read_chunks(self, chunksize, nr, offset=0):
162
        return file_sync_read_chunks(self, chunksize, nr, offset)
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import makedirs
35
from os.path import isdir, realpath, exists, join
36
from hashlib import new as newhasher
37
from binascii import hexlify
38

  
39
from context_file import ContextFile, file_sync_read_chunks
40

  
41

  
42
class FileBlocker(object):
43
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
45
    """
46

  
47
    blocksize = None
48
    blockpath = None
49
    hashtype = None
50

  
51
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpath = params['blockpath']
54
        blockpath = realpath(blockpath)
55
        if not isdir(blockpath):
56
            if not exists(blockpath):
57
                makedirs(blockpath)
58
            else:
59
                raise ValueError("Variable blockpath '%s' is not a directory" %
60
                                 (blockpath,))
61

  
62
        hashtype = params['hashtype']
63
        try:
64
            hasher = newhasher(hashtype)
65
        except ValueError:
66
            msg = "Variable hashtype '%s' is not available from hashlib"
67
            raise ValueError(msg % (hashtype,))
68

  
69
        hasher.update("")
70
        emptyhash = hasher.digest()
71

  
72
        self.blocksize = blocksize
73
        self.blockpath = blockpath
74
        self.hashtype = hashtype
75
        self.hashlen = len(emptyhash)
76
        self.emptyhash = emptyhash
77

  
78
    def _pad(self, block):
79
        return block + ('\x00' * (self.blocksize - len(block)))
80

  
81
    def _get_rear_block(self, blkhash, create=0):
82
        filename = hexlify(blkhash)
83
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
84
        if not exists(dir):
85
            makedirs(dir)
86
        name = join(dir, filename)
87
        return ContextFile(name, create)
88

  
89
    def _check_rear_block(self, blkhash):
90
        filename = hexlify(blkhash)
91
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
92
        name = join(dir, filename)
93
        return exists(name)
94

  
95
    def block_hash(self, data):
96
        """Hash a block of data"""
97
        hasher = newhasher(self.hashtype)
98
        hasher.update(data.rstrip('\x00'))
99
        return hasher.digest()
100

  
101
    def block_ping(self, hashes):
102
        """Check hashes for existence and
103
           return those missing from block storage.
104
        """
105
        notfound = []
106
        append = notfound.append
107

  
108
        for h in hashes:
109
            if h not in notfound and not self._check_rear_block(h):
110
                append(h)
111

  
112
        return notfound
113

  
114
    def block_retr(self, hashes):
115
        """Retrieve blocks from storage by their hashes."""
116
        blocksize = self.blocksize
117
        blocks = []
118
        append = blocks.append
119
        block = None
120

  
121
        for h in hashes:
122
            if h == self.emptyhash:
123
                append(self._pad(''))
124
                continue
125
            with self._get_rear_block(h, 0) as rbl:
126
                if not rbl:
127
                    break
128
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
129
                    break  # there should be just one block there
130
            if not block:
131
                break
132
            append(self._pad(block))
133

  
134
        return blocks
135

  
136
    def block_stor(self, blocklist):
137
        """Store a bunch of blocks and return (hashes, missing).
138
           Hashes is a list of the hashes of the blocks,
139
           missing is a list of indices in that list indicating
140
           which blocks were missing from the store.
141
        """
142
        block_hash = self.block_hash
143
        hashlist = [block_hash(b) for b in blocklist]
144
        missing = [i for i, h in enumerate(hashlist) if not
145
                   self._check_rear_block(h)]
146
        for i in missing:
147
            with self._get_rear_block(hashlist[i], 1) as rbl:
148
                rbl.sync_write(blocklist[i])  # XXX: verify?
149

  
150
        return hashlist, missing
151

  
152
    def block_delta(self, blkhash, offset, data):
153
        """Construct and store a new block from a given block
154
           and a data 'patch' applied at offset. Return:
155
           (the hash of the new block, if the block already existed)
156
        """
157

  
158
        blocksize = self.blocksize
159
        if offset >= blocksize or not data:
160
            return None, None
161

  
162
        block = self.block_retr((blkhash,))
163
        if not block:
164
            return None, None
165

  
166
        block = block[0]
167
        newblock = block[:offset] + data
168
        if len(newblock) > blocksize:
169
            newblock = newblock[:blocksize]
170
        elif len(newblock) < blocksize:
171
            newblock += block[len(newblock):]
172

  
173
        h, a = self.block_stor((newblock,))
174
        return h[0], 1 if a else 0
175

  
176
    def block_hash_file(self, openfile):
177
        """Return the list of hashes (hashes map)
178
           for the blocks in a buffered file.
179
           Helper method, does not affect store.
180
        """
181
        hashes = []
182
        append = hashes.append
183
        block_hash = self.block_hash
184

  
185
        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
186
            append(block_hash(block))
187

  
188
        return hashes
189

  
190
    def block_stor_file(self, openfile):
191
        """Read blocks from buffered file object and store them. Return:
192
           (bytes read, list of hashes, list of hashes that were missing)
193
        """
194
        blocksize = self.blocksize
195
        block_stor = self.block_stor
196
        hashlist = []
197
        hextend = hashlist.extend
198
        storedlist = []
199
        sextend = storedlist.extend
200
        lastsize = 0
201

  
202
        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
203
            hl, sl = block_stor((block,))
204
            hextend(hl)
205
            sextend(sl)
206
            lastsize = len(block)
207

  
208
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
209
        return size, hashlist, storedlist
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import makedirs
35
from os.path import isdir, realpath, exists, join
36
from binascii import hexlify
37

  
38
from context_file import ContextFile
39

  
40

  
41
class FileMapper(object):
42
    """Mapper.
43
       Required constructor parameters: mappath, namelen.
44
    """
45

  
46
    mappath = None
47
    namelen = None
48

  
49
    def __init__(self, **params):
50
        self.params = params
51
        self.namelen = params['namelen']
52
        mappath = realpath(params['mappath'])
53
        if not isdir(mappath):
54
            if not exists(mappath):
55
                makedirs(mappath)
56
            else:
57
                raise ValueError("Variable mappath '%s' is not a directory" %
58
                                 (mappath,))
59
        self.mappath = mappath
60

  
61
    def _get_rear_map(self, maphash, create=0):
62
        filename = hexlify(maphash)
63
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
64
        if not exists(dir):
65
            makedirs(dir)
66
        name = join(dir, filename)
67
        return ContextFile(name, create)
68

  
69
    def _check_rear_map(self, maphash):
70
        filename = hexlify(maphash)
71
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
72
        name = join(dir, filename)
73
        return exists(name)
74

  
75
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
76
        """Return as a list, part of the hashes map of an object
77
           at the given block offset.
78
           By default, return the whole hashes map.
79
        """
80
        namelen = self.namelen
81
        hashes = ()
82

  
83
        with self._get_rear_map(maphash, 0) as rmap:
84
            if rmap:
85
                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
86
        return hashes
87

  
88
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
89
        """Store hashes in the given hashes map."""
90
        namelen = self.namelen
91
        if self._check_rear_map(maphash):
92
            return
93
        with self._get_rear_map(maphash, 1) as rmap:
94
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
b/snf-pithos-backend/pithos/backends/lib/hashfiler/mapper.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from filemapper import FileMapper
34
from archipelagomapper import ArchipelagoMapper
35 35

  
36 36

  
37 37
class Mapper(object):
......
41 41
    """
42 42

  
43 43
    def __init__(self, **params):
44
        self.rmap = None
45
        try:
46
            if params['mappool']:
47
                from radosmapper import RadosMapper
48
                self.rmap = RadosMapper(**params)
49
        except KeyError:
50
            pass
51

  
52
        self.fmap = FileMapper(**params)
44
        self.archip_map = ArchipelagoMapper(**params)
53 45

  
54 46
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
55 47
        """Return as a list, part of the hashes map of an object
56 48
           at the given block offset.
57 49
           By default, return the whole hashes map.
58 50
        """
59
        return self.fmap.map_retr(maphash, blkoff, nr)
51
        return self.archip_map.map_retr(maphash, blkoff, nr)
52

  
53
    def map_retr_archipelago(self, maphash, size):
54
        """Return as a list the hashes map of an Archipelago
55
        Volume.
56
        """
57
        return self.archip_map.map_retr_archipelago(maphash, size)
60 58

  
61 59
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
62 60
        """Store hashes in the given hashes map."""
63
        if self.rmap:
64
            self.rmap.map_stor(maphash, hashes, blkoff, create)
65
        self.fmap.map_stor(maphash, hashes, blkoff, create)
61
        self.archip_map.map_stor(maphash, hashes, blkoff, create)
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

  
38
from context_object import RadosObject, file_sync_read_chunks
39

  
40
CEPH_CONF_FILE = "/etc/ceph/ceph.conf"
41

  
42

  
43
class RadosBlocker(object):
44
    """Blocker.
45
       Required constructor parameters: blocksize, blockpath, hashtype.
46
    """
47

  
48
    blocksize = None
49
    blockpool = None
50
    hashtype = None
51

  
52
    def __init__(self, **params):
53
        blocksize = params['blocksize']
54
        blockpool = params['blockpool']
55

  
56
        rados = Rados(conffile=CEPH_CONF_FILE)
57
        rados.connect()
58
        ioctx = rados.open_ioctx(blockpool)
59

  
60
        hashtype = params['hashtype']
61
        try:
62
            hasher = newhasher(hashtype)
63
        except ValueError:
64
            msg = "Variable hashtype '%s' is not available from hashlib"
65
            raise ValueError(msg % (hashtype,))
66

  
67
        hasher.update("")
68
        emptyhash = hasher.digest()
69

  
70
        self.blocksize = blocksize
71
        self.blockpool = blockpool
72
        self.rados = rados
73
        self.ioctx = ioctx
74
        self.hashtype = hashtype
75
        self.hashlen = len(emptyhash)
76
        self.emptyhash = emptyhash
77

  
78
    def _pad(self, block):
79
        return block + ('\x00' * (self.blocksize - len(block)))
80

  
81
    def _get_rear_block(self, blkhash, create=0):
82
        name = hexlify(blkhash)
83
        return RadosObject(name, self.ioctx, create)
84

  
85
    def _check_rear_block(self, blkhash):
86
        filename = hexlify(blkhash)
87
        try:
88
            self.ioctx.stat(filename)
89
            return True
90
        except ObjectNotFound:
91
            return False
92

  
93
    def block_hash(self, data):
94
        """Hash a block of data"""
95
        hasher = newhasher(self.hashtype)
96
        hasher.update(data.rstrip('\x00'))
97
        return hasher.digest()
98

  
99
    def block_ping(self, hashes):
100
        """Check hashes for existence and
101
           return those missing from block storage.
102
        """
103
        notfound = []
104
        append = notfound.append
105

  
106
        for h in hashes:
107
            if h not in notfound and not self._check_rear_block(h):
108
                append(h)
109

  
110
        return notfound
111

  
112
    def block_retr(self, hashes):
113
        """Retrieve blocks from storage by their hashes."""
114
        blocksize = self.blocksize
115
        blocks = []
116
        append = blocks.append
117
        block = None
118

  
119
        for h in hashes:
120
            if h == self.emptyhash:
121
                append(self._pad(''))
122
                continue
123
            with self._get_rear_block(h, 0) as rbl:
124
                if not rbl:
125
                    break
126
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
127
                    break  # there should be just one block there
128
            if not block:
129
                break
130
            append(self._pad(block))
131

  
132
        return blocks
133

  
134
    def block_stor(self, blocklist):
135
        """Store a bunch of blocks and return (hashes, missing).
136
           Hashes is a list of the hashes of the blocks,
137
           missing is a list of indices in that list indicating
138
           which blocks were missing from the store.
139
        """
140
        block_hash = self.block_hash
141
        hashlist = [block_hash(b) for b in blocklist]
142
        missing = [i for i, h in enumerate(hashlist) if not
143
                   self._check_rear_block(h)]
144
        for i in missing:
145
            with self._get_rear_block(hashlist[i], 1) as rbl:
146
                rbl.sync_write(blocklist[i])  # XXX: verify?
147

  
148
        return hashlist, missing
149

  
150
    def block_delta(self, blkhash, offset, data):
151
        """Construct and store a new block from a given block
152
           and a data 'patch' applied at offset. Return:
153
           (the hash of the new block, if the block already existed)
154
        """
155

  
156
        blocksize = self.blocksize
157
        if offset >= blocksize or not data:
158
            return None, None
159

  
160
        block = self.block_retr((blkhash,))
161
        if not block:
162
            return None, None
163

  
164
        block = block[0]
165
        newblock = block[:offset] + data
166
        if len(newblock) > blocksize:
167
            newblock = newblock[:blocksize]
168
        elif len(newblock) < blocksize:
169
            newblock += block[len(newblock):]
170

  
171
        h, a = self.block_stor((newblock,))
172
        return h[0], 1 if a else 0
173

  
174
    def block_hash_file(self, radosobject):
175
        """Return the list of hashes (hashes map)
176
           for the blocks in a buffered file.
177
           Helper method, does not affect store.
178
        """
179
        hashes = []
180
        append = hashes.append
181
        block_hash = self.block_hash
182

  
183
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
184
            append(block_hash(block))
185

  
186
        return hashes
187

  
188
    def block_stor_file(self, radosobject):
189
        """Read blocks from buffered file object and store them. Return:
190
           (bytes read, list of hashes, list of hashes that were missing)
191
        """
192
        blocksize = self.blocksize
193
        block_stor = self.block_stor
194
        hashlist = []
195
        hextend = hashlist.extend
196
        storedlist = []
197
        sextend = storedlist.extend
198
        lastsize = 0
199

  
200
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
201
            hl, sl = block_stor((block,))
202
            hextend(hl)
203
            sextend(sl)
204
            lastsize = len(block)
205

  
206
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
207
        return size, hashlist, storedlist
/dev/null
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from binascii import hexlify
35

  
36
from context_object import RadosObject
37
from rados import *
38

  
39
CEPH_CONF_FILE = "/etc/ceph/ceph.conf"
40

  
41

  
42
class RadosMapper(object):
43
    """Mapper.
44
       Required constructor parameters: mappath, namelen.
45
    """
46

  
47
    mappool = None
48
    namelen = None
49

  
50
    def __init__(self, **params):
51
        self.params = params
52
        self.namelen = params['namelen']
53
        mappool = params['mappool']
54

  
55
        rados = Rados(conffile=CEPH_CONF_FILE)
56
        rados.connect()
57
        ioctx = rados.open_ioctx(mappool)
58

  
59
        self.mappool = mappool
60
        self.rados = rados
61
        self.ioctx = ioctx
62

  
63
    def _get_rear_map(self, maphash, create=0):
64
        name = hexlify(maphash)
65
        return RadosObject(name, self.ioctx, create)
66

  
67
    def _check_rear_map(self, maphash):
68
        name = hexlify(maphash)
69
        try:
70
            self.ioctx.stat(name)
71
            return True
72
        except ObjectNotFound:
73
            return False
74

  
75
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
76
        """Return as a list, part of the hashes map of an object
77
           at the given block offset.
78
           By default, return the whole hashes map.
79
        """
80
        namelen = self.namelen
81
        hashes = ()
82

  
83
        with self._get_rear_map(maphash, 0) as rmap:
84
            if rmap:
85
                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
86
        return hashes
87

  
88
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
89
        """Store hashes in the given hashes map."""
90
        namelen = self.namelen
91
        if self._check_rear_map(maphash):
92
            return
93
        with self._get_rear_map(maphash, 1) as rmap:
94
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
b/snf-pithos-backend/pithos/backends/lib/hashfiler/store.py
48 48
        if umask is not None:
49 49
            os.umask(umask)
50 50

  
51
        path = params['path']
52
        if path and not os.path.exists(path):
53
            os.makedirs(path)
54
        if not os.path.isdir(path):
55
            raise RuntimeError("Cannot open path '%s'" % (path,))
56

  
57
        p = {'blocksize': params['block_size'],
58
             'blockpath': os.path.join(path + '/blocks'),
51
        pb = {'blocksize': params['block_size'],
59 52
             'hashtype': params['hash_algorithm'],
60
             'blockpool': params['blockpool']}
61
        self.blocker = Blocker(**p)
62
        p = {'mappath': os.path.join(path + '/maps'),
63
             'namelen': self.blocker.hashlen,
64
             'mappool': params['mappool']}
65
        self.mapper = Mapper(**p)
53
            }
54
        self.blocker = Blocker(**pb)
55
        pm = {'namelen': self.blocker.hashlen,
56
            }
57
        self.mapper = Mapper(**pm)
66 58

  
67 59
    def map_get(self, name):
68 60
        return self.mapper.map_retr(name)
69 61

  
62
    def map_get_archipelago(self, name, size):
63
        return self.mapper.map_retr_archipelago(name, size)
64

  
70 65
    def map_put(self, name, map):
71 66
        self.mapper.map_stor(name, map)
72 67

  
......
79 74
            return None
80 75
        return blocks[0]
81 76

  
77
    def block_get_archipelago(self, hash):
78
        blocks = self.blocker.block_retr_archipelago((hash,))
79
        if not blocks:
80
            return None
81
        return blocks[0]
82

  
82 83
    def block_put(self, data):
83 84
        hashes, absent = self.blocker.block_stor((data,))
84 85
        return hashes[0]

Also available in: Unified diff