Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / hashfilerback.py @ 5bd53e3b

History | View | Annotate | Download (18.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from pithos.lib.hashfiler import (Filer, ROOTNODE,
35
                                  SERIAL, PARENT, PATH, SIZE, POPULATION,
36
                                  POPSIZE, SOURCE, MTIME, CLUSTER)
37
from base import NotAllowedError
38
from binascii import hexlify, unhexlify
39
from os import makedirs, mkdir
40
from os.path import exists, isdir, split
41
from itertools import chain
42
from collections import defaultdict
43
from heapq import merge
44

    
45
debug = 0
46

    
47
READ  = 0
48
WRITE = 1
49

    
50
MAIN_CLUSTER  = 0
51
TRASH_CLUSTER = 1
52

    
53
def backend_method(func=None, name=None, autocommit=1):
54
    if func is None:
55
        def fn(func):
56
            return backend_method(func, name=name, autocommit=autocommit)
57
        return fn
58

    
59
    name = func.__name__ if name is None else name
60
    thefunc = None
61
    if debug:
62
        def fn(self, *args, **kw):
63
            print "%s %s %s" % (name, args, kw)
64
            return func(self, *args, **kw)
65
        thefunc = fn
66
    else:
67
        thefunc = func
68
    #XXX: Can't set __doc__ in methods?!
69
    #
70
    #doc = func.__doc__
71
    #if doc is None:
72
    #    doc = ''
73
    #func.__doc__ = getattr(BaseBackend, name).__doc__ + '\n***\n' + doc
74
    if not autocommit:
75
        return func
76

    
77
    def method(self, *args, **kw):
78
        with self.hf:
79
            return thefunc(self, *args, **kw)
80

    
81
    return method
82

    
83

    
84
class HashFilerBack(object):
85
    """Abstract backend class that serves as a reference for actual implementations.
86

87
    The purpose of the backend is to provide the necessary functions for handling data
88
    and metadata. It is responsible for the actual storage and retrieval of information.
89

90
    Note that the account level is always valid as it is checked from another subsystem.
91

92
    The following variables should be available:
93
        'hash_algorithm': Suggested is 'sha256'
94
        'block_size': Suggested is 4MB
95
    """
96

    
97
    def __init__(self, db):
98
        head, tail = split(db)
99
        if not exists(head):
100
            makedirs(head)
101
        if not isdir(head):
102
            raise RuntimeError("Cannot open database at '%s'" % (db,))
103

    
104
        blocksize = 4*1024*1024
105
        hashtype = 'sha256'
106
        root = head + '/root'
107
        if not exists(root):
108
            mkdir(root)
109
        db = head + '/db'
110
        params = {  'filepath': db,
111
                    'mappath' : root,
112
                    'blockpath': root,
113
                    'hashtype': hashtype,
114
                    'blocksize': blocksize  }
115

    
116
        hf = Filer(**params)
117
        self.hf = hf
118
        self.block_size = blocksize
119
        self.hash_algorithm = hashtype
120

    
121
    def quit(self):
122
        self.hf.quit()
123

    
124
    def get_account(self, account, root=ROOTNODE):
125
        #XXX: how are accounts created?
126
        r = self.hf.file_lookup(root, account, create=1)
127
        if r is None:
128
            raise NameError("account '%s' not found" % (account,))
129
        return r[0]
130

    
131
    def get_container(self, serial, name):
132
        r = self.hf.file_lookup(serial, name, create=0)
133
        if r is None:
134
            raise NameError("container '%s' not found" % (name,))
135
        return r[0]
136

    
137
    def get_file(self, serial, path):
138
        r = self.hf.file_lookup(ROOTNODE, account, create=0)
139
        if r is None:
140
            raise NameError("file '%s' not found" % (path,))
141
        return r
142

    
143
    @staticmethod
144
    def insert_properties(meta, props):
145
        (serial, parent, path, size,
146
         population, popsize, source, mtime, cluster) = props
147
        meta['version'] = serial
148
        meta['name'] = path
149
        meta['bytes'] = size + popsize
150
        meta['count'] = population
151
        meta['created'] = mtime
152
        meta['modified'] = mtime
153
        return meta
154

    
155
    def lookup_file(self, account, container, name,
156
                    create=0, size=0, authorize=0, before=None):
157
        # XXX: account/container lookup cache?
158
        serial = self.get_account(account)
159
        serial = self.get_container(serial, container)
160
        if before is None:
161
            before = float('inf')
162
        props = self.hf.file_lookup(serial, name,
163
                                    create=create, size=size, before=before)
164
        if props is None:
165
            raise NameError("File '%s/%s/%s' does not exist" %
166
                            (account, container, name))
167
        return props
168

    
169
    def can_read(self, user, account, container, path):
170
        path = "%s:%s:%s" % (account, container, path)
171
        check = self.hf.access_check
172
        return check(READ, path, user) or check(WRITE, path, user)
173

    
174
    def can_write(self, user, account, container, path):
175
        path = "%s:%s:%s" % (account, container, path)
176
        check = self.hf.access_check
177
        return check(WRITE, path, user)
178

    
179
    @backend_method
180
    def get_account_meta(self, user, account, until=None):
181
        if user != account:
182
            raise NotAllowedError
183

    
184
        hf = self.hf
185
        before = float("inf") if until is None else until
186
        props = hf.file_lookup(ROOTNODE, account, create=0, before=before)
187
        if props is None:
188
            #XXX: why not return error?
189
            meta = {'name': account,
190
                    'count': 0,
191
                    'bytes': 0}
192
            if until is not None:
193
                meta['until_timestamp'] = 0
194
            return meta
195

    
196
        serial = props[SERIAL]
197
        meta = dict(hf.file_get_attributes(serial))
198
        meta['name'] = account
199
        self.insert_properties(meta, props)
200
        if until is not None:
201
            meta['until_timestamp'] = props[MTIME]
202
        return meta
203

    
204
    @backend_method
205
    def update_account_meta(self, user, account, meta, replace=False):
206
        if user != account:
207
            raise NotAllowedError
208

    
209
        hf = self.hf
210
        serial = self.get_account(account)
211
        if replace:
212
            hf.file_del_attributes(serial)
213

    
214
        hf.file_set_attributes(serial, meta.iteritems())
215

    
216
    @backend_method
217
    def get_account_groups(self, user, account):
218
        if user != account:
219
            raise NotAllowedError
220

    
221
        groups = defaultdict(list)
222
        grouplist = self.hf.group_list("%s:" % user)
223
        for name, member in grouplist:
224
            groups[name].append(member)
225

    
226
        return groups
227

    
228
    @backend_method
229
    def update_account_groups(self, user, account, groups, replace=False):
230
        if user != account:
231
            raise NotAllowedError
232

    
233
        hf = self.hf
234
        group_addmany = hf.group_addmany
235
        for name, members in groups:
236
            name = "%s:%s" % (user, name)
237
            if not members:
238
                members = (name,)
239
            group_addmany(name, members)
240

    
241
        if not replace:
242
            return
243

    
244
        groupnames = hf.group_list("%s:" % user)
245
        group_destroy = hf.group_destroy
246
        for name in groupnames:
247
            if name not in groups:
248
                group_destroy(name)
249

    
250
    @backend_method
251
    def delete_account(self, user, account):
252
        if user != account:
253
            raise NotAllowedError
254

    
255
        hf = self.hf
256
        r = hf.file_lookup(ROOTNODE, account)
257
        if r is None or r[SERIAL] is None:
258
            raise NameError
259

    
260
        if r[POPULATION]:
261
            raise IndexError
262

    
263
        hf.file_remove(r[SERIAL])
264

    
265
    @backend_method
266
    def put_container(self, user, account, name, policy=None):
267
        if user != account:
268
            raise NotAllowedError
269

    
270
        parent = self.get_account(account)
271
        hf = self.hf
272
        r = hf.file_lookup(parent, name)
273
        if r is not None:
274
            raise NameError("%s:%s already exists" % (account, name))
275

    
276
        serial, mtime = hf.file_create(parent, name, 0)
277
        return serial, mtime
278

    
279
    @backend_method
280
    def delete_container(self, user, account, name):
281
        if user != account:
282
            raise NotAllowedError
283

    
284
        serial = self.get_account(account)
285
        serial = self.get_container(serial, name)
286
        r = self.hf.file_remove(serial)
287
        if r is None:
288
            raise AssertionError("This should not have happened.")
289
        elif r == 0:
290
            #XXX: if container has only trashed objects, is it empty or not?
291
            msg = "'%s:%s' not empty. will not delete." % (account, name)
292
            raise IndexError(msg)
293

    
294
    @backend_method
295
    def get_container_meta(self, user, account, name, until=None):
296
        if user != account:
297
            raise NotAllowedError
298

    
299
        serial = self.get_account(account)
300
        hf = self.hf
301
        before = float('inf') if until is None else until
302
        props = hf.file_lookup(serial, name, before=before)
303
        if props is None:
304
            raise NameError("%s:%s does not exist" % (account, name))
305
        serial = props[0]
306

    
307
        r = hf.file_get_attributes(serial)
308
        meta = dict(r)
309
        self.insert_properties(meta, props)
310
        if until:
311
            meta['until_timestamp'] = props[MTIME]
312
        return meta
313

    
314
    @backend_method
315
    def update_container_meta(self, user, account, name, meta, replace=False):
316
        if user != account:
317
            raise NotAllowedError
318

    
319
        serial = self.get_account(account)
320
        serial = self.get_container(serial, name)
321
        hf = self.hf
322
        if replace:
323
            hf.file_del_attributes(serial)
324

    
325
        hf.file_set_attributes(serial, meta.iteritems())
326

    
327
    @backend_method
328
    def get_container_policy(self, user, account, container):
329
        return {}
330

    
331
    @backend_method
332
    def update_container_policy(self, user, account,
333
                                container, policy, replace=0):
334
        return
335

    
336
    @backend_method
337
    def list_containers(self, user, account,
338
                        marker=None, limit=10000, until=None):
339
        if not marker:
340
            marker = ''
341
        if limit is None:
342
            limit = -1
343
        serial = self.get_account(account)
344
        r = self.hf.file_list(serial, '', start=marker, limit=limit)
345
        objects, prefixes = r
346
        return [(o[PATH], o[SERIAL]) for o in objects]
347

    
348
    @backend_method
349
    def list_objects(self, user, account, container, prefix='',
350
                     delimiter=None, marker=None, limit=10000,
351
                     virtual=True, keys=[], until=None):
352
        if user != account:
353
            raise NotAllowedError
354

    
355
        if not marker:
356
            #XXX: '' is a valid string, should be None instead
357
            marker = None
358
        if limit is None:
359
            #XXX: limit should be an integer
360
            limit = -1
361

    
362
        if not delimiter:
363
            delimiter = None
364

    
365
        serial = self.get_account(account)
366
        serial = self.get_container(serial, container)
367
        hf = self.hf
368
        filterq = ','.join(keys) if keys else None
369
        r = hf.file_list(serial, prefix=prefix, start=marker,
370
                         versions=0, filterq=filterq,
371
                         delimiter=delimiter, limit=limit)
372
        objects, prefixes = r
373
        # XXX: virtual is not needed since we get them anyway
374
        #if virtual:
375
        #    return [(p, None) for p in prefixes]
376

    
377
        objects = ((o[PATH], o[MTIME]) for o in objects)
378
        prefixes = ((p, None) for p in prefixes) if virtual else ()
379
        return list(merge(objects, prefixes))
380

    
381
    @backend_method
382
    def list_object_meta(self, user, account, container, until=None):
383
        serial = self.get_account(account)
384
        serial = self.get_container(serial, container)
385
        return self.hf.file_list_attributes(serial)
386

    
387
    @backend_method
388
    def get_object_meta(self, user, account, container, name, until=None):
389
        props = self.lookup_file(account, container, name,
390
                                 before=until, authorize=2)
391
        serial = props[0]
392
        meta = dict(self.hf.file_get_attributes(serial))
393
        self.insert_properties(meta, props)
394
        meta['version_timestamp'] = props[MTIME]
395
        meta['modified_by'] = 'fuck you'
396
        return meta
397

    
398
    @backend_method
399
    def update_object_meta(self, user, account, container,
400
                           name, meta, replace=False):
401
        props = self.lookup_file(account, container, name, authorize=1)
402
        serial = props[0]
403
        hf = self.hf
404
        if replace:
405
            hf.file_del_attributes(serial)
406
        hf.file_set_attributes(serial, meta.iteritems())
407

    
408
    @backend_method
409
    def get_object_permissions(self, user, account, container, name):
410
        path = "%s/%s/%s" % (account, container, name)
411
        perms = defaultdict(list)
412
        for access, ident in self.hf.access_list(path):
413
            perms[access].append(ident)
414
        perms['read'] = perms.pop(READ, [])
415
        perms['write'] = perms.pop(WRITE, [])
416
        return path, perms
417

    
418
    @backend_method
419
    def update_object_permissions(self, user, account, container, name, permissions):
420
        if user != account:
421
            return NotAllowedError
422

    
423
        hf = self.hf
424
        path = "%s/%s/%s" % (account, container, name)
425
        feature = hf.feature_create()
426
        hf.access_grant(READ, path, members=permissions['read'])
427
        hf.access_grant(WRITE, path, members=permissions['write'])
428
        r = hf.xfeature_bestow(path, feature)
429
        assert(not r)
430

    
431
    @backend_method
432
    def get_object_public(self, user, account, container, name):
433
        """Return the public URL of the object if applicable."""
434
        return None
435

    
436
    @backend_method
437
    def update_object_public(self, user, account, container, name, public):
438
        """Update the public status of the object."""
439
        return
440

    
441
    @backend_method
442
    def get_object_hashmap(self, user, account, container, name, version=None):
443
        hf = self.hf
444
        if version is None:
445
            props = self.lookup_file(account, container, name, authorize=2)
446
        else:
447
            props = hf.file_get_properties(version)
448
        serial = props[SERIAL]
449
        size = props[SIZE]
450
        hashes = [hexlify(h) for h in hf.map_retr(serial)]
451
        return size, hashes
452

    
453
    @backend_method
454
    def update_object_hashmap(self, user, account, container,
455
                              name, size, hashmap, meta=None,
456
                              replace_meta=0, permissions=None):
457
        hf = self.hf
458
        props = self.lookup_file(account, container, name,
459
                                 create=1, size=size, authorize=1)
460
        serial = props[SERIAL]
461
        if size != props[SIZE]:
462
            hf.file_increment(serial, size)
463
        hf.map_stor(serial, (unhexlify(h) for h in hashmap))
464
        if replace_meta:
465
            hf.file_del_attributes(serial)
466
        if meta:
467
            hf.file_set_attributes(serial, meta.iteritems())
468

    
469
    @backend_method
470
    def copy_object(self, user, account, src_container, src_name,
471
                    dest_container, dest_name, dest_meta={},
472
                    replace_meta=False, permissions=None, src_version=None):
473
        hf = self.hf
474
        #FIXME: authorization
475
        if src_version:
476
            props = hf.file_get_properties(src_version)
477
        else:
478
            acc = self.get_account(account)
479
            srccont = self.get_container(acc, src_container)
480
            props = hf.file_lookup(srccont, src_name)
481
        if props is None:
482
            raise NameError
483
        srcobj = props[SERIAL]
484

    
485
        dstcont = self.get_container(acc, dest_container)
486
        serial, mtime = hf.file_copy(srcobj, dstcont, dest_name)
487
        if replace_meta:
488
            hf.file_del_attributes(serial)
489
        hf.file_set_attributes(serial, dest_meta.iteritems())
490

    
491
    @backend_method
492
    def move_object(self, user, account, src_container, src_name,
493
                    dest_container, dest_name, dest_meta={},
494
                    replace_meta=False, permissions=None, src_version=None):
495
        hf = self.hf
496
        #FIXME: authorization
497
        if src_version:
498
            props = hf.file_get_properties(src_version)
499
        else:
500
            acc = self.get_account(account)
501
            srccont = self.get_container(acc, src_container)
502
            props = hf.file_lookup(srccont, src_name)
503
        if props is None:
504
            raise NameError
505
        srcobj = props[SERIAL]
506

    
507
        dstcont = self.get_container(acc, dest_container)
508
        serial, mtime = hf.file_copy(srcobj, dstcont, dest_name)
509
        if replace_meta:
510
            hf.file_del_attributes(serial)
511
        hf.file_set_attributes(serial, dest_meta.iteritems())
512
        hf.file_remove(srcobj)
513

    
514
    @backend_method
515
    def delete_object(self, user, account, container, name):
516
        if user != account:
517
            raise NotAllowedError
518

    
519
        props = self.lookup_file(account, container, name, authorize=WRITE)
520
        serial = props[SERIAL]
521
        hf = self.hf
522
        #XXX: see delete_container, for empty-trashed ambiguty
523
        #     for now, we will just remove the object
524
        hf.file_remove(serial)
525
        #hf.file_recluster(serial, TRASH_CLUSTER)
526
        #XXX: Where to unset sharing? on file trash or on trash purge?
527
        #     We'll do it on file trash, but should we trash all versions?
528
        hf.xfeature_disinherit("%s/%s/%s" % (account, container, name))
529

    
530
    @backend_method
531
    def purge_trash(self, user, acount, container):
532
        if user != account:
533
            raise NotAllowedError
534

    
535
        serial = self.get_account(account)
536
        serial = self.get_container(serial, container)
537
        self.hf.file_purge_cluster(serial, cluster=TRASH_CLUSTER)
538

    
539
    @backend_method(autocommit=0)
540
    def get_block(self, blkhash):
541
        blocks = self.hf.block_retr((unhexlify(blkhash),))
542
        if not blocks:
543
            raise NameError
544
        return blocks[0]
545

    
546
    @backend_method(autocommit=0)
547
    def put_block(self, data):
548
        hashes, absent = self.hf.block_stor((data,))
549
        #XXX: why hexlify hashes?
550
        return hexlify(hashes[0])
551

    
552
    @backend_method(autocommit=0)
553
    def update_block(self, blkhash, data, offset=0):
554
        h, e = self.hf.block_delta(unhexlify(blkhash), ((offset, data),))
555
        return hexlify(h)
556