Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / hashfilerback.py @ 41ddb860

History | View | Annotate | Download (18.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from pithos.lib.hashfiler import (Filer, ROOTNODE,
35
                                  SERIAL, PARENT, PATH, SIZE, POPULATION,
36
                                  POPSIZE, SOURCE, MTIME, CLUSTER)
37
from base import NotAllowedError
38
from binascii import hexlify, unhexlify
39
from os import makedirs, mkdir
40
from os.path import exists, isdir, split
41
from itertools import chain
42
from collections import defaultdict
43
from heapq import merge
44

    
45
debug = 0
46

    
47
READ  = 0
48
WRITE = 1
49

    
50
MAIN_CLUSTER  = 0
51
TRASH_CLUSTER = 1
52

    
53
def backend_method(func=None, name=None, autocommit=1):
54
    if func is None:
55
        def fn(func):
56
            return backend_method(func, name=name, autocommit=autocommit)
57
        return fn
58

    
59
    name = func.__name__ if name is None else name
60
    thefunc = None
61
    if debug:
62
        def fn(self, *args, **kw):
63
            print "%s %s %s" % (name, args, kw)
64
            return func(self, *args, **kw)
65
        thefunc = fn
66
    else:
67
        thefunc = func
68
    #XXX: Can't set __doc__ in methods?!
69
    #
70
    #doc = func.__doc__
71
    #if doc is None:
72
    #    doc = ''
73
    #func.__doc__ = getattr(BaseBackend, name).__doc__ + '\n***\n' + doc
74
    if not autocommit:
75
        return func
76

    
77
    def method(self, *args, **kw):
78
        with self.hf:
79
            return thefunc(self, *args, **kw)
80

    
81
    return method
82

    
83

    
84
class HashFilerBack(object):
85

    
86
    def __init__(self, db):
87
        head, tail = split(db)
88
        if not exists(head):
89
            makedirs(head)
90
        if not isdir(head):
91
            raise RuntimeError("Cannot open database at '%s'" % (db,))
92

    
93
        blocksize = 4*1024*1024
94
        hashtype = 'sha256'
95
        maps = head + '/maps'
96
        blocks = head + '/blocks'
97
        if not exists(root):
98
            mkdir(root)
99
        db = head + '/db'
100
        params = {  'filepath': db,
101
                    'mappath' : maps,
102
                    'blockpath': blocks,
103
                    'hashtype': hashtype,
104
                    'blocksize': blocksize  }
105

    
106
        hf = Filer(**params)
107
        self.hf = hf
108
        self.block_size = blocksize
109
        self.hash_algorithm = hashtype
110

    
111
    def quit(self):
112
        self.hf.quit()
113

    
114
    def get_account(self, account, root=ROOTNODE):
115
        #XXX: how are accounts created?
116
        r = self.hf.file_lookup(root, account, create=1)
117
        if r is None:
118
            raise NameError("account '%s' not found" % (account,))
119
        return r[0]
120

    
121
    def get_container(self, serial, name):
122
        r = self.hf.file_lookup(serial, name, create=0)
123
        if r is None:
124
            raise NameError("container '%s' not found" % (name,))
125
        return r[0]
126

    
127
    @staticmethod
128
    def insert_properties(meta, props):
129
        (serial, parent, path, size,
130
         population, popsize, source, mtime, cluster) = props
131
        meta['version'] = serial
132
        meta['name'] = path
133
        meta['bytes'] = size + popsize
134
        meta['count'] = population
135
        meta['created'] = mtime
136
        meta['modified'] = mtime
137
        return meta
138

    
139
    def lookup_file(self, account, container, name,
140
                    create=0, size=0, authorize=0, before=None):
141
        # XXX: account/container lookup cache?
142
        serial = self.get_account(account)
143
        serial = self.get_container(serial, container)
144
        if before is None:
145
            before = float('inf')
146
        props = self.hf.file_lookup(serial, name,
147
                                    create=create, size=size, before=before)
148
        if props is None:
149
            raise NameError("File '%s/%s/%s' does not exist" %
150
                            (account, container, name))
151
        return props
152

    
153
    def can_read(self, user, account, container, path):
154
        path = "%s:%s:%s" % (account, container, path)
155
        check = self.hf.access_check
156
        return check(READ, path, user) or check(WRITE, path, user)
157

    
158
    def can_write(self, user, account, container, path):
159
        path = "%s:%s:%s" % (account, container, path)
160
        check = self.hf.access_check
161
        return check(WRITE, path, user)
162

    
163
    @backend_method
164
    def get_account_meta(self, user, account, until=None):
165
        if user != account:
166
            raise NotAllowedError
167

    
168
        hf = self.hf
169
        before = float("inf") if until is None else until
170
        props = hf.file_lookup(ROOTNODE, account, create=0, before=before)
171
        if props is None:
172
            #XXX: why not return error?
173
            meta = {'name': account,
174
                    'count': 0,
175
                    'bytes': 0}
176
            if until is not None:
177
                meta['until_timestamp'] = 0
178
            return meta
179

    
180
        serial = props[SERIAL]
181
        meta = dict(hf.file_get_attributes(serial))
182
        meta['name'] = account
183
        self.insert_properties(meta, props)
184
        if until is not None:
185
            meta['until_timestamp'] = props[MTIME]
186
        return meta
187

    
188
    @backend_method
189
    def update_account_meta(self, user, account, meta, replace=False):
190
        if user != account:
191
            raise NotAllowedError
192

    
193
        hf = self.hf
194
        serial = self.get_account(account)
195
        if replace:
196
            hf.file_del_attributes(serial)
197

    
198
        hf.file_set_attributes(serial, meta.iteritems())
199

    
200
    @backend_method
201
    def get_account_groups(self, user, account):
202
        if user != account:
203
            raise NotAllowedError
204

    
205
        groups = defaultdict(list)
206
        grouplist = self.hf.group_list("%s:" % user)
207
        for name, member in grouplist:
208
            groups[name].append(member)
209

    
210
        return groups
211

    
212
    @backend_method
213
    def update_account_groups(self, user, account, groups, replace=False):
214
        if user != account:
215
            raise NotAllowedError
216

    
217
        hf = self.hf
218
        group_addmany = hf.group_addmany
219
        for name, members in groups:
220
            name = "%s:%s" % (user, name)
221
            if not members:
222
                members = (name,)
223
            group_addmany(name, members)
224

    
225
        if not replace:
226
            return
227

    
228
        groupnames = hf.group_list("%s:" % user)
229
        group_destroy = hf.group_destroy
230
        for name in groupnames:
231
            if name not in groups:
232
                group_destroy(name)
233

    
234
    @backend_method
235
    def delete_account(self, user, account):
236
        if user != account:
237
            raise NotAllowedError
238

    
239
        hf = self.hf
240
        r = hf.file_lookup(ROOTNODE, account)
241
        if r is None or r[SERIAL] is None:
242
            raise NameError
243

    
244
        if r[POPULATION]:
245
            raise IndexError
246

    
247
        hf.file_remove(r[SERIAL])
248

    
249
    @backend_method
250
    def put_container(self, user, account, name, policy=None):
251
        if user != account:
252
            raise NotAllowedError
253

    
254
        parent = self.get_account(account)
255
        hf = self.hf
256
        r = hf.file_lookup(parent, name)
257
        if r is not None:
258
            raise NameError("%s:%s already exists" % (account, name))
259

    
260
        serial, mtime = hf.file_create(parent, name, 0)
261
        return serial, mtime
262

    
263
    @backend_method
264
    def delete_container(self, user, account, name):
265
        if user != account:
266
            raise NotAllowedError
267

    
268
        serial = self.get_account(account)
269
        serial = self.get_container(serial, name)
270
        r = self.hf.file_remove(serial)
271
        if r is None:
272
            raise AssertionError("This should not have happened.")
273
        elif r == 0:
274
            #XXX: if container has only trashed objects, is it empty or not?
275
            msg = "'%s:%s' not empty. will not delete." % (account, name)
276
            raise IndexError(msg)
277

    
278
    @backend_method
279
    def get_container_meta(self, user, account, name, until=None):
280
        if user != account:
281
            raise NotAllowedError
282

    
283
        serial = self.get_account(account)
284
        hf = self.hf
285
        before = float('inf') if until is None else until
286
        props = hf.file_lookup(serial, name, before=before)
287
        if props is None:
288
            raise NameError("%s:%s does not exist" % (account, name))
289
        serial = props[0]
290

    
291
        r = hf.file_get_attributes(serial)
292
        meta = dict(r)
293
        self.insert_properties(meta, props)
294
        if until:
295
            meta['until_timestamp'] = props[MTIME]
296
        return meta
297

    
298
    @backend_method
299
    def update_container_meta(self, user, account, name, meta, replace=False):
300
        if user != account:
301
            raise NotAllowedError
302

    
303
        serial = self.get_account(account)
304
        serial = self.get_container(serial, name)
305
        hf = self.hf
306
        if replace:
307
            hf.file_del_attributes(serial)
308

    
309
        hf.file_set_attributes(serial, meta.iteritems())
310

    
311
    @backend_method
312
    def get_container_policy(self, user, account, container):
313
        return {}
314

    
315
    @backend_method
316
    def update_container_policy(self, user, account,
317
                                container, policy, replace=0):
318
        return
319

    
320
    @backend_method
321
    def list_containers(self, user, account,
322
                        marker=None, limit=10000, until=None):
323
        if not marker:
324
            marker = ''
325
        if limit is None:
326
            limit = -1
327
        serial = self.get_account(account)
328
        r = self.hf.file_list(serial, '', start=marker, limit=limit)
329
        objects, prefixes = r
330
        return [(o[PATH], o[SERIAL]) for o in objects]
331

    
332
    @backend_method
333
    def list_objects(self, user, account, container, prefix='',
334
                     delimiter=None, marker=None, limit=10000,
335
                     virtual=True, keys=[], until=None):
336
        if user != account:
337
            raise NotAllowedError
338

    
339
        if not marker:
340
            #XXX: '' is a valid string, should be None instead
341
            marker = None
342
        if limit is None:
343
            #XXX: limit should be an integer
344
            limit = -1
345

    
346
        if not delimiter:
347
            delimiter = None
348

    
349
        serial = self.get_account(account)
350
        serial = self.get_container(serial, container)
351
        hf = self.hf
352
        filterq = ','.join(keys) if keys else None
353
        r = hf.file_list(serial, prefix=prefix, start=marker,
354
                         versions=0, filterq=filterq,
355
                         delimiter=delimiter, limit=limit)
356
        objects, prefixes = r
357
        # XXX: virtual is not needed since we get them anyway
358
        #if virtual:
359
        #    return [(p, None) for p in prefixes]
360

    
361
        objects = ((o[PATH], o[MTIME]) for o in objects)
362
        prefixes = ((p, None) for p in prefixes) if virtual else ()
363
        return list(merge(objects, prefixes))
364

    
365
    @backend_method
366
    def list_object_meta(self, user, account, container, until=None):
367
        serial = self.get_account(account)
368
        serial = self.get_container(serial, container)
369
        return self.hf.file_list_attributes(serial)
370

    
371
    @backend_method
372
    def get_object_meta(self, user, account, container, name, until=None):
373
        props = self.lookup_file(account, container, name,
374
                                 before=until, authorize=2)
375
        serial = props[0]
376
        meta = dict(self.hf.file_get_attributes(serial))
377
        self.insert_properties(meta, props)
378
        meta['version_timestamp'] = props[MTIME]
379
        meta['modified_by'] = 'fuck you'
380
        return meta
381

    
382
    @backend_method
383
    def update_object_meta(self, user, account, container,
384
                           name, meta, replace=False):
385
        props = self.lookup_file(account, container, name, authorize=1)
386
        serial = props[0]
387
        hf = self.hf
388
        if replace:
389
            hf.file_del_attributes(serial)
390
        hf.file_set_attributes(serial, meta.iteritems())
391

    
392
    @backend_method
393
    def get_object_permissions(self, user, account, container, name):
394
        path = "%s/%s/%s" % (account, container, name)
395
        perms = defaultdict(list)
396
        for access, ident in self.hf.access_list(path):
397
            if access == READ and ident:
398
                perms['read'].append(ident)
399
            if access == WRITE and ident:
400
                perms['write'].append(ident)
401
        return path, perms
402

    
403
    @backend_method
404
    def update_object_permissions(self, user, account, container, name, permissions):
405
        if user != account:
406
            return NotAllowedError
407

    
408
        hf = self.hf
409
        path = "%s/%s/%s" % (account, container, name)
410
        feature = hf.feature_create()
411
        hf.access_grant(READ, path, members=permissions['read'])
412
        hf.access_grant(WRITE, path, members=permissions['write'])
413
        r = hf.xfeature_bestow(path, feature)
414
        assert(not r)
415

    
416
    @backend_method
417
    def get_object_public(self, user, account, container, name):
418
        """Return the public URL of the object if applicable."""
419
        return None
420

    
421
    @backend_method
422
    def update_object_public(self, user, account, container, name, public):
423
        """Update the public status of the object."""
424
        return
425

    
426
    @backend_method
427
    def get_object_hashmap(self, user, account, container, name, version=None):
428
        hf = self.hf
429
        if version is None:
430
            props = self.lookup_file(account, container, name, authorize=2)
431
        else:
432
            props = hf.file_get_properties(version)
433
        serial = props[SERIAL]
434
        size = props[SIZE]
435
        hashes = [hexlify(h) for h in hf.map_retr(serial)]
436
        return size, hashes
437

    
438
    @backend_method
439
    def update_object_hashmap(self, user, account, container,
440
                              name, size, hashmap, meta=None,
441
                              replace_meta=0, permissions=None):
442
        hf = self.hf
443
        props = self.lookup_file(account, container, name,
444
                                 create=1, size=size, authorize=1)
445
        serial = props[SERIAL]
446
        if size != props[SIZE]:
447
            hf.file_increment(serial, size)
448
        hf.map_stor(serial, (unhexlify(h) for h in hashmap))
449
        if replace_meta:
450
            hf.file_del_attributes(serial)
451
        if meta:
452
            hf.file_set_attributes(serial, meta.iteritems())
453

    
454
    @backend_method
455
    def copy_object(self, user, account, src_container, src_name,
456
                    dest_container, dest_name, dest_meta={},
457
                    replace_meta=False, permissions=None, src_version=None):
458
        hf = self.hf
459
        #FIXME: authorization
460
        if src_version:
461
            props = hf.file_get_properties(src_version)
462
        else:
463
            acc = self.get_account(account)
464
            srccont = self.get_container(acc, src_container)
465
            props = hf.file_lookup(srccont, src_name)
466
        if props is None:
467
            raise NameError
468
        srcobj = props[SERIAL]
469

    
470
        dstcont = self.get_container(acc, dest_container)
471
        serial, mtime = hf.file_copy(srcobj, dstcont, dest_name)
472
        if replace_meta:
473
            hf.file_del_attributes(serial)
474
        hf.file_set_attributes(serial, dest_meta.iteritems())
475

    
476
    @backend_method
477
    def move_object(self, user, account, src_container, src_name,
478
                    dest_container, dest_name, dest_meta={},
479
                    replace_meta=False, permissions=None, src_version=None):
480
        hf = self.hf
481
        #FIXME: authorization
482
        if src_version:
483
            props = hf.file_get_properties(src_version)
484
        else:
485
            acc = self.get_account(account)
486
            srccont = self.get_container(acc, src_container)
487
            props = hf.file_lookup(srccont, src_name)
488
        if props is None:
489
            raise NameError
490
        srcobj = props[SERIAL]
491

    
492
        dstcont = self.get_container(acc, dest_container)
493
        serial, mtime = hf.file_copy(srcobj, dstcont, dest_name)
494
        if replace_meta:
495
            hf.file_del_attributes(serial)
496
        hf.file_set_attributes(serial, dest_meta.iteritems())
497
        hf.file_remove(srcobj)
498

    
499
    @backend_method
500
    def delete_object(self, user, account, container, name):
501
        if user != account:
502
            raise NotAllowedError
503

    
504
        props = self.lookup_file(account, container, name, authorize=WRITE)
505
        serial = props[SERIAL]
506
        hf = self.hf
507
        #XXX: see delete_container, for empty-trashed ambiguty
508
        #     for now, we will just remove the object
509
        hf.file_remove(serial)
510
        #hf.file_recluster(serial, TRASH_CLUSTER)
511
        #XXX: Where to unset sharing? on file trash or on trash purge?
512
        #     We'll do it on file trash, but should we trash all versions?
513
        hf.xfeature_disinherit("%s/%s/%s" % (account, container, name))
514

    
515
#     @backend_method
516
#     def purge_trash(self, user, acount, container):
517
#         if user != account:
518
#             raise NotAllowedError
519
# 
520
#         serial = self.get_account(account)
521
#         serial = self.get_container(serial, container)
522
#         self.hf.file_purge_cluster(serial, cluster=TRASH_CLUSTER)
523

    
524
    @backend_method(autocommit=0)
525
    def get_block(self, blkhash):
526
        blocks = self.hf.block_retr((unhexlify(blkhash),))
527
        if not blocks:
528
            raise NameError
529
        return blocks[0]
530

    
531
    @backend_method(autocommit=0)
532
    def put_block(self, data):
533
        hashes, absent = self.hf.block_stor((data,))
534
        return hexlify(hashes[0])
535

    
536
    @backend_method(autocommit=0)
537
    def update_block(self, blkhash, data, offset=0):
538
        if offset == 0 and len(data) == self.block_size:
539
            return self.put_block(data)
540
        h, e = self.hf.block_delta(unhexlify(blkhash), ((offset, data),))
541
        return hexlify(h)