Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 6e147ecc

History | View | Annotate | Download (48.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79

    
80
QUEUE_MESSAGE_KEY = '#'
81
QUEUE_CLIENT_ID = 2 # Pithos.
82

    
83
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
84

    
85
inf = float('inf')
86

    
87
ULTIMATE_ANSWER = 42
88

    
89

    
90
logger = logging.getLogger(__name__)
91

    
92

    
93
def backend_method(func=None, autocommit=1):
94
    if func is None:
95
        def fn(func):
96
            return backend_method(func, autocommit)
97
        return fn
98

    
99
    if not autocommit:
100
        return func
101
    def fn(self, *args, **kw):
102
        self.wrapper.execute()
103
        try:
104
            ret = func(self, *args, **kw)
105
            self.wrapper.commit()
106
            return ret
107
        except:
108
            self.wrapper.rollback()
109
            raise
110
    return fn
111

    
112

    
113
class ModularBackend(BaseBackend):
114
    """A modular backend.
115
    
116
    Uses modules for SQL functions and storage.
117
    """
118
    
119
    def __init__(self, db_module=None, db_connection=None,
120
                 block_module=None, block_path=None,
121
                 queue_module=None, queue_connection=None):
122
        db_module = db_module or DEFAULT_DB_MODULE
123
        db_connection = db_connection or DEFAULT_DB_CONNECTION
124
        block_module = block_module or DEFAULT_BLOCK_MODULE
125
        block_path = block_path or DEFAULT_BLOCK_PATH
126
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
127
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
128
        
129
        self.hash_algorithm = 'sha256'
130
        self.block_size = 4 * 1024 * 1024 # 4MB
131
        
132
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
133
        
134
        def load_module(m):
135
            __import__(m)
136
            return sys.modules[m]
137
        
138
        self.db_module = load_module(db_module)
139
        self.wrapper = self.db_module.DBWrapper(db_connection)
140
        params = {'wrapper': self.wrapper}
141
        self.permissions = self.db_module.Permissions(**params)
142
        for x in ['READ', 'WRITE']:
143
            setattr(self, x, getattr(self.db_module, x))
144
        self.node = self.db_module.Node(**params)
145
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
146
            setattr(self, x, getattr(self.db_module, x))
147
        
148
        self.block_module = load_module(block_module)
149
        params = {'path': block_path,
150
                  'block_size': self.block_size,
151
                  'hash_algorithm': self.hash_algorithm}
152
        self.store = self.block_module.Store(**params)
153

    
154
        if queue_module and queue_connection:
155
            self.queue_module = load_module(queue_module)
156
            params = {'exchange': queue_connection,
157
                      'message_key': QUEUE_MESSAGE_KEY,
158
                      'client_id': QUEUE_CLIENT_ID}
159
            self.queue = self.queue_module.Queue(**params)
160
        else:
161
            class NoQueue:
162
                def send(self, *args):
163
                    pass
164
                
165
                def close(self):
166
                    pass
167
            
168
            self.queue = NoQueue()
169
    
170
    def close(self):
171
        self.wrapper.close()
172
        self.queue.close()
173
    
174
    @backend_method
175
    def list_accounts(self, user, marker=None, limit=10000):
176
        """Return a list of accounts the user can access."""
177
        
178
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
179
        allowed = self._allowed_accounts(user)
180
        start, limit = self._list_limits(allowed, marker, limit)
181
        return allowed[start:start + limit]
182
    
183
    @backend_method
184
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
185
        """Return a dictionary with the account metadata for the domain."""
186
        
187
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
188
        path, node = self._lookup_account(account, user == account)
189
        if user != account:
190
            if until or node is None or account not in self._allowed_accounts(user):
191
                raise NotAllowedError
192
        try:
193
            props = self._get_properties(node, until)
194
            mtime = props[self.MTIME]
195
        except NameError:
196
            props = None
197
            mtime = until
198
        count, bytes, tstamp = self._get_statistics(node, until)
199
        tstamp = max(tstamp, mtime)
200
        if until is None:
201
            modified = tstamp
202
        else:
203
            modified = self._get_statistics(node)[2] # Overall last modification.
204
            modified = max(modified, mtime)
205
        
206
        if user != account:
207
            meta = {'name': account}
208
        else:
209
            meta = {}
210
            if props is not None and include_user_defined:
211
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
212
            if until is not None:
213
                meta.update({'until_timestamp': tstamp})
214
            meta.update({'name': account, 'count': count, 'bytes': bytes})
215
        meta.update({'modified': modified})
216
        return meta
217
    
218
    @backend_method
219
    def update_account_meta(self, user, account, domain, meta, replace=False):
220
        """Update the metadata associated with the account for the domain."""
221
        
222
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
223
        if user != account:
224
            raise NotAllowedError
225
        path, node = self._lookup_account(account, True)
226
        self._put_metadata(user, node, domain, meta, replace)
227
    
228
    @backend_method
229
    def get_account_groups(self, user, account):
230
        """Return a dictionary with the user groups defined for this account."""
231
        
232
        logger.debug("get_account_groups: %s", account)
233
        if user != account:
234
            if account not in self._allowed_accounts(user):
235
                raise NotAllowedError
236
            return {}
237
        self._lookup_account(account, True)
238
        return self.permissions.group_dict(account)
239
    
240
    @backend_method
241
    def update_account_groups(self, user, account, groups, replace=False):
242
        """Update the groups associated with the account."""
243
        
244
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
245
        if user != account:
246
            raise NotAllowedError
247
        self._lookup_account(account, True)
248
        self._check_groups(groups)
249
        if replace:
250
            self.permissions.group_destroy(account)
251
        for k, v in groups.iteritems():
252
            if not replace: # If not already deleted.
253
                self.permissions.group_delete(account, k)
254
            if v:
255
                self.permissions.group_addmany(account, k, v)
256
    
257
    @backend_method
258
    def get_account_policy(self, user, account):
259
        """Return a dictionary with the account policy."""
260
        
261
        logger.debug("get_account_policy: %s", account)
262
        if user != account:
263
            if account not in self._allowed_accounts(user):
264
                raise NotAllowedError
265
            return {}
266
        path, node = self._lookup_account(account, True)
267
        return self._get_policy(node)
268
    
269
    @backend_method
270
    def update_account_policy(self, user, account, policy, replace=False):
271
        """Update the policy associated with the account."""
272
        
273
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
274
        if user != account:
275
            raise NotAllowedError
276
        path, node = self._lookup_account(account, True)
277
        self._check_policy(policy)
278
        self._put_policy(node, policy, replace)
279
    
280
    @backend_method
281
    def put_account(self, user, account, policy={}):
282
        """Create a new account with the given name."""
283
        
284
        logger.debug("put_account: %s %s", account, policy)
285
        if user != account:
286
            raise NotAllowedError
287
        node = self.node.node_lookup(account)
288
        if node is not None:
289
            raise NameError('Account already exists')
290
        if policy:
291
            self._check_policy(policy)
292
        node = self._put_path(user, self.ROOTNODE, account)
293
        self._put_policy(node, policy, True)
294
    
295
    @backend_method
296
    def delete_account(self, user, account):
297
        """Delete the account with the given name."""
298
        
299
        logger.debug("delete_account: %s", account)
300
        if user != account:
301
            raise NotAllowedError
302
        node = self.node.node_lookup(account)
303
        if node is None:
304
            return
305
        if not self.node.node_remove(node):
306
            raise IndexError('Account is not empty')
307
        self.permissions.group_destroy(account)
308
    
309
    @backend_method
310
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
311
        """Return a list of containers existing under an account."""
312
        
313
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
314
        if user != account:
315
            if until or account not in self._allowed_accounts(user):
316
                raise NotAllowedError
317
            allowed = self._allowed_containers(user, account)
318
            start, limit = self._list_limits(allowed, marker, limit)
319
            return allowed[start:start + limit]
320
        if shared:
321
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
322
            allowed = list(set(allowed))
323
            start, limit = self._list_limits(allowed, marker, limit)
324
            return allowed[start:start + limit]
325
        node = self.node.node_lookup(account)
326
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
327
    
328
    @backend_method
329
    def list_container_meta(self, user, account, container, domain, until=None):
330
        """Return a list with all the container's object meta keys for the domain."""
331
        
332
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
333
        allowed = []
334
        if user != account:
335
            if until:
336
                raise NotAllowedError
337
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
338
            if not allowed:
339
                raise NotAllowedError
340
        path, node = self._lookup_container(account, container)
341
        before = until if until is not None else inf
342
        allowed = self._get_formatted_paths(allowed)
343
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
344
    
345
    @backend_method
346
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
347
        """Return a dictionary with the container metadata for the domain."""
348
        
349
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
350
        if user != account:
351
            if until or container not in self._allowed_containers(user, account):
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        props = self._get_properties(node, until)
355
        mtime = props[self.MTIME]
356
        count, bytes, tstamp = self._get_statistics(node, until)
357
        tstamp = max(tstamp, mtime)
358
        if until is None:
359
            modified = tstamp
360
        else:
361
            modified = self._get_statistics(node)[2] # Overall last modification.
362
            modified = max(modified, mtime)
363
        
364
        if user != account:
365
            meta = {'name': container}
366
        else:
367
            meta = {}
368
            if include_user_defined:
369
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
370
            if until is not None:
371
                meta.update({'until_timestamp': tstamp})
372
            meta.update({'name': container, 'count': count, 'bytes': bytes})
373
        meta.update({'modified': modified})
374
        return meta
375
    
376
    @backend_method
377
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
378
        """Update the metadata associated with the container for the domain."""
379
        
380
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
381
        if user != account:
382
            raise NotAllowedError
383
        path, node = self._lookup_container(account, container)
384
        self._put_metadata(user, node, domain, meta, replace)
385
    
386
    @backend_method
387
    def get_container_policy(self, user, account, container):
388
        """Return a dictionary with the container policy."""
389
        
390
        logger.debug("get_container_policy: %s %s", account, container)
391
        if user != account:
392
            if container not in self._allowed_containers(user, account):
393
                raise NotAllowedError
394
            return {}
395
        path, node = self._lookup_container(account, container)
396
        return self._get_policy(node)
397
    
398
    @backend_method
399
    def update_container_policy(self, user, account, container, policy, replace=False):
400
        """Update the policy associated with the container."""
401
        
402
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
403
        if user != account:
404
            raise NotAllowedError
405
        path, node = self._lookup_container(account, container)
406
        self._check_policy(policy)
407
        self._put_policy(node, policy, replace)
408
    
409
    @backend_method
410
    def put_container(self, user, account, container, policy={}):
411
        """Create a new container with the given name."""
412
        
413
        logger.debug("put_container: %s %s %s", account, container, policy)
414
        if user != account:
415
            raise NotAllowedError
416
        try:
417
            path, node = self._lookup_container(account, container)
418
        except NameError:
419
            pass
420
        else:
421
            raise NameError('Container already exists')
422
        if policy:
423
            self._check_policy(policy)
424
        path = '/'.join((account, container))
425
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
426
        self._put_policy(node, policy, True)
427
    
428
    @backend_method
429
    def delete_container(self, user, account, container, until=None):
430
        """Delete/purge the container with the given name."""
431
        
432
        logger.debug("delete_container: %s %s %s", account, container, until)
433
        if user != account:
434
            raise NotAllowedError
435
        path, node = self._lookup_container(account, container)
436
        
437
        if until is not None:
438
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
439
            for h in hashes:
440
                self.store.map_delete(h)
441
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
442
            self._report_size_change(user, account, -size, {'action': 'container purge'})
443
            return
444
        
445
        if self._get_statistics(node)[0] > 0:
446
            raise IndexError('Container is not empty')
447
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
448
        for h in hashes:
449
            self.store.map_delete(h)
450
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
451
        self.node.node_remove(node)
452
        self._report_size_change(user, account, -size, {'action': 'container delete'})
453
    
454
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
455
        if user != account and until:
456
            raise NotAllowedError
457
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
458
        if shared and not allowed:
459
            return []
460
        path, node = self._lookup_container(account, container)
461
        allowed = self._get_formatted_paths(allowed)
462
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
463
    
464
    def _list_object_permissions(self, user, account, container, prefix, shared):
465
        allowed = []
466
        path = '/'.join((account, container, prefix)).rstrip('/')
467
        if user != account:
468
            allowed = self.permissions.access_list_paths(user, path)
469
            if not allowed:
470
                raise NotAllowedError
471
        else:
472
            if shared:
473
                allowed = self.permissions.access_list_shared(path)
474
                if not allowed:
475
                    return []
476
        return allowed
477
    
478
    @backend_method
479
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
480
        """Return a list of object (name, version_id) tuples existing under a container."""
481
        
482
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
483
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
484
    
485
    @backend_method
486
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487
        """Return a list of object metadata dicts existing under a container."""
488
        
489
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
491
        objects = []
492
        for p in props:
493
            if len(p) == 2:
494
                objects.append({'subdir': p[0]})
495
            else:
496
                objects.append({'name': p[0],
497
                                'bytes': p[self.SIZE + 1],
498
                                'type': p[self.TYPE + 1],
499
                                'hash': p[self.HASH + 1],
500
                                'version': p[self.SERIAL + 1],
501
                                'version_timestamp': p[self.MTIME + 1],
502
                                'modified': p[self.MTIME + 1] if until is None else None,
503
                                'modified_by': p[self.MUSER + 1],
504
                                'uuid': p[self.UUID + 1],
505
                                'checksum': p[self.CHECKSUM + 1]})
506
        return objects
507
    
508
    @backend_method
509
    def list_object_permissions(self, user, account, container, prefix=''):
510
        """Return a list of paths that enforce permissions under a container."""
511
        
512
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
513
        return self._list_object_permissions(user, account, container, prefix, True)
514
    
515
    @backend_method
516
    def list_object_public(self, user, account, container, prefix=''):
517
        """Return a dict mapping paths to public ids for objects that are public under a container."""
518
        
519
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
520
        public = {}
521
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
522
            public[path] = p + ULTIMATE_ANSWER
523
        return public
524
    
525
    @backend_method
526
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
527
        """Return a dictionary with the object metadata for the domain."""
528
        
529
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
530
        self._can_read(user, account, container, name)
531
        path, node = self._lookup_object(account, container, name)
532
        props = self._get_version(node, version)
533
        if version is None:
534
            modified = props[self.MTIME]
535
        else:
536
            try:
537
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
538
            except NameError: # Object may be deleted.
539
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
540
                if del_props is None:
541
                    raise NameError('Object does not exist')
542
                modified = del_props[self.MTIME]
543
        
544
        meta = {}
545
        if include_user_defined:
546
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
547
        meta.update({'name': name,
548
                     'bytes': props[self.SIZE],
549
                     'type': props[self.TYPE],
550
                     'hash': props[self.HASH],
551
                     'version': props[self.SERIAL],
552
                     'version_timestamp': props[self.MTIME],
553
                     'modified': modified,
554
                     'modified_by': props[self.MUSER],
555
                     'uuid': props[self.UUID],
556
                     'checksum': props[self.CHECKSUM]})
557
        return meta
558
    
559
    @backend_method
560
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
561
        """Update the metadata associated with the object for the domain and return the new version."""
562
        
563
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
564
        self._can_write(user, account, container, name)
565
        path, node = self._lookup_object(account, container, name)
566
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
567
        self._apply_versioning(account, container, src_version_id)
568
        return dest_version_id
569
    
570
    @backend_method
571
    def get_object_permissions(self, user, account, container, name):
572
        """Return the action allowed on the object, the path
573
        from which the object gets its permissions from,
574
        along with a dictionary containing the permissions."""
575
        
576
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
577
        allowed = 'write'
578
        permissions_path = self._get_permissions_path(account, container, name)
579
        if user != account:
580
            if self.permissions.access_check(permissions_path, self.WRITE, user):
581
                allowed = 'write'
582
            elif self.permissions.access_check(permissions_path, self.READ, user):
583
                allowed = 'read'
584
            else:
585
                raise NotAllowedError
586
        self._lookup_object(account, container, name)
587
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
588
    
589
    @backend_method
590
    def update_object_permissions(self, user, account, container, name, permissions):
591
        """Update the permissions associated with the object."""
592
        
593
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
594
        if user != account:
595
            raise NotAllowedError
596
        path = self._lookup_object(account, container, name)[0]
597
        self._check_permissions(path, permissions)
598
        self.permissions.access_set(path, permissions)
599
    
600
    @backend_method
601
    def get_object_public(self, user, account, container, name):
602
        """Return the public id of the object if applicable."""
603
        
604
        logger.debug("get_object_public: %s %s %s", account, container, name)
605
        self._can_read(user, account, container, name)
606
        path = self._lookup_object(account, container, name)[0]
607
        p = self.permissions.public_get(path)
608
        if p is not None:
609
            p += ULTIMATE_ANSWER
610
        return p
611
    
612
    @backend_method
613
    def update_object_public(self, user, account, container, name, public):
614
        """Update the public status of the object."""
615
        
616
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
617
        self._can_write(user, account, container, name)
618
        path = self._lookup_object(account, container, name)[0]
619
        if not public:
620
            self.permissions.public_unset(path)
621
        else:
622
            self.permissions.public_set(path)
623
    
624
    @backend_method
625
    def get_object_hashmap(self, user, account, container, name, version=None):
626
        """Return the object's size and a list with partial hashes."""
627
        
628
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
629
        self._can_read(user, account, container, name)
630
        path, node = self._lookup_object(account, container, name)
631
        props = self._get_version(node, version)
632
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
633
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
634
    
635
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
636
        if permissions is not None and user != account:
637
            raise NotAllowedError
638
        self._can_write(user, account, container, name)
639
        if permissions is not None:
640
            path = '/'.join((account, container, name))
641
            self._check_permissions(path, permissions)
642
        
643
        account_path, account_node = self._lookup_account(account, True)
644
        container_path, container_node = self._lookup_container(account, container)
645
        path, node = self._put_object_node(container_path, container_node, name)
646
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
647
        
648
        # Check quota.
649
        del_size = self._apply_versioning(account, container, pre_version_id)
650
        size_delta = size - del_size
651
        if size_delta > 0:
652
            account_quota = long(self._get_policy(account_node)['quota'])
653
            container_quota = long(self._get_policy(container_node)['quota'])
654
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
655
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
656
                # This must be executed in a transaction, so the version is never created if it fails.
657
                raise QuotaError
658
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
659
        
660
        if permissions is not None:
661
            self.permissions.access_set(path, permissions)
662
        return pre_version_id, dest_version_id
663
    
664
    @backend_method
665
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
666
        """Create/update an object with the specified size and partial hashes."""
667
        
668
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
669
        if size == 0: # No such thing as an empty hashmap.
670
            hashmap = [self.put_block('')]
671
        map = HashMap(self.block_size, self.hash_algorithm)
672
        map.extend([binascii.unhexlify(x) for x in hashmap])
673
        missing = self.store.block_search(map)
674
        if missing:
675
            ie = IndexError()
676
            ie.data = [binascii.hexlify(x) for x in missing]
677
            raise ie
678
        
679
        hash = map.hash()
680
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
681
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
682
        self.store.map_put(hash, map)
683
        return dest_version_id
684
    
685
    @backend_method
686
    def update_object_checksum(self, user, account, container, name, version, checksum):
687
        """Update an object's checksum."""
688
        
689
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
690
        # Update objects with greater version and same hashmap and size (fix metadata updates).
691
        self._can_write(user, account, container, name)
692
        path, node = self._lookup_object(account, container, name)
693
        props = self._get_version(node, version)
694
        versions = self.node.node_get_versions(node)
695
        for x in versions:
696
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
697
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
698
    
699
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
700
        self._can_read(user, src_account, src_container, src_name)
701
        path, node = self._lookup_object(src_account, src_container, src_name)
702
        # TODO: Will do another fetch of the properties in duplicate version...
703
        props = self._get_version(node, src_version) # Check to see if source exists.
704
        src_version_id = props[self.SERIAL]
705
        hash = props[self.HASH]
706
        size = props[self.SIZE]
707
        
708
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
709
        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
710
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
711
        return dest_version_id
712
    
713
    @backend_method
714
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
715
        """Copy an object's data and metadata."""
716
        
717
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
718
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
719
        return dest_version_id
720
    
721
    @backend_method
722
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
723
        """Move an object's data and metadata."""
724
        
725
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
726
        if user != src_account:
727
            raise NotAllowedError
728
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
729
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
730
            self._delete_object(user, src_account, src_container, src_name)
731
        return dest_version_id
732
    
733
    def _delete_object(self, user, account, container, name, until=None):
734
        if user != account:
735
            raise NotAllowedError
736
        
737
        if until is not None:
738
            path = '/'.join((account, container, name))
739
            node = self.node.node_lookup(path)
740
            if node is None:
741
                return
742
            hashes = []
743
            size = 0
744
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
745
            hashes += h
746
            size += s
747
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
748
            hashes += h
749
            size += s
750
            for h in hashes:
751
                self.store.map_delete(h)
752
            self.node.node_purge(node, until, CLUSTER_DELETED)
753
            try:
754
                props = self._get_version(node)
755
            except NameError:
756
                self.permissions.access_clear(path)
757
            self._report_size_change(user, account, -size, {'action': 'object purge'})
758
            return
759
        
760
        path, node = self._lookup_object(account, container, name)
761
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
762
        del_size = self._apply_versioning(account, container, src_version_id)
763
        if del_size:
764
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
765
        self.permissions.access_clear(path)
766
    
767
    @backend_method
768
    def delete_object(self, user, account, container, name, until=None):
769
        """Delete/purge an object."""
770
        
771
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
772
        self._delete_object(user, account, container, name, until)
773
    
774
    @backend_method
775
    def list_versions(self, user, account, container, name):
776
        """Return a list of all (version, version_timestamp) tuples for an object."""
777
        
778
        logger.debug("list_versions: %s %s %s", account, container, name)
779
        self._can_read(user, account, container, name)
780
        path, node = self._lookup_object(account, container, name)
781
        versions = self.node.node_get_versions(node)
782
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
783
    
784
    @backend_method
785
    def get_uuid(self, user, uuid):
786
        """Return the (account, container, name) for the UUID given."""
787
        
788
        logger.debug("get_uuid: %s", uuid)
789
        info = self.node.latest_uuid(uuid)
790
        if info is None:
791
            raise NameError
792
        path, serial = info
793
        account, container, name = path.split('/', 2)
794
        self._can_read(user, account, container, name)
795
        return (account, container, name)
796
    
797
    @backend_method
798
    def get_public(self, user, public):
799
        """Return the (account, container, name) for the public id given."""
800
        
801
        logger.debug("get_public: %s", public)
802
        if public is None or public < ULTIMATE_ANSWER:
803
            raise NameError
804
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
805
        if path is None:
806
            raise NameError
807
        account, container, name = path.split('/', 2)
808
        self._can_read(user, account, container, name)
809
        return (account, container, name)
810
    
811
    @backend_method(autocommit=0)
812
    def get_block(self, hash):
813
        """Return a block's data."""
814
        
815
        logger.debug("get_block: %s", hash)
816
        block = self.store.block_get(binascii.unhexlify(hash))
817
        if not block:
818
            raise NameError('Block does not exist')
819
        return block
820
    
821
    @backend_method(autocommit=0)
822
    def put_block(self, data):
823
        """Store a block and return the hash."""
824
        
825
        logger.debug("put_block: %s", len(data))
826
        return binascii.hexlify(self.store.block_put(data))
827
    
828
    @backend_method(autocommit=0)
829
    def update_block(self, hash, data, offset=0):
830
        """Update a known block and return the hash."""
831
        
832
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
833
        if offset == 0 and len(data) == self.block_size:
834
            return self.put_block(data)
835
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
836
        return binascii.hexlify(h)
837
    
838
    # Path functions.
839
    
840
    def _generate_uuid(self):
841
        return str(uuidlib.uuid4())
842
    
843
    def _put_object_node(self, path, parent, name):
844
        path = '/'.join((path, name))
845
        node = self.node.node_lookup(path)
846
        if node is None:
847
            node = self.node.node_create(parent, path)
848
        return path, node
849
    
850
    def _put_path(self, user, parent, path):
851
        node = self.node.node_create(parent, path)
852
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
853
        return node
854
    
855
    def _lookup_account(self, account, create=True):
856
        node = self.node.node_lookup(account)
857
        if node is None and create:
858
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
859
        return account, node
860
    
861
    def _lookup_container(self, account, container):
862
        path = '/'.join((account, container))
863
        node = self.node.node_lookup(path)
864
        if node is None:
865
            raise NameError('Container does not exist')
866
        return path, node
867
    
868
    def _lookup_object(self, account, container, name):
869
        path = '/'.join((account, container, name))
870
        node = self.node.node_lookup(path)
871
        if node is None:
872
            raise NameError('Object does not exist')
873
        return path, node
874
    
875
    def _get_properties(self, node, until=None):
876
        """Return properties until the timestamp given."""
877
        
878
        before = until if until is not None else inf
879
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
880
        if props is None and until is not None:
881
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
882
        if props is None:
883
            raise NameError('Path does not exist')
884
        return props
885
    
886
    def _get_statistics(self, node, until=None):
887
        """Return count, sum of size and latest timestamp of everything under node."""
888
        
889
        if until is None:
890
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
891
        else:
892
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
893
        if stats is None:
894
            stats = (0, 0, 0)
895
        return stats
896
    
897
    def _get_version(self, node, version=None):
898
        if version is None:
899
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
900
            if props is None:
901
                raise NameError('Object does not exist')
902
        else:
903
            try:
904
                version = int(version)
905
            except ValueError:
906
                raise IndexError('Version does not exist')
907
            props = self.node.version_get_properties(version)
908
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
909
                raise IndexError('Version does not exist')
910
        return props
911
    
912
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
913
        """Create a new version of the node."""
914
        
915
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
916
        if props is not None:
917
            src_version_id = props[self.SERIAL]
918
            src_hash = props[self.HASH]
919
            src_size = props[self.SIZE]
920
            src_type = props[self.TYPE]
921
            src_checksum = props[self.CHECKSUM]
922
        else:
923
            src_version_id = None
924
            src_hash = None
925
            src_size = 0
926
            src_type = ''
927
            src_checksum = ''
928
        if size is None: # Set metadata.
929
            hash = src_hash # This way hash can be set to None (account or container).
930
            size = src_size
931
        if type is None:
932
            type = src_type
933
        if checksum is None:
934
            checksum = src_checksum
935
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
936
        
937
        if src_node is None:
938
            pre_version_id = src_version_id
939
        else:
940
            pre_version_id = None
941
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
942
            if props is not None:
943
                pre_version_id = props[self.SERIAL]
944
        if pre_version_id is not None:
945
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
946
        
947
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
948
        return pre_version_id, dest_version_id
949
    
950
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
951
        if src_version_id is not None:
952
            self.node.attribute_copy(src_version_id, dest_version_id)
953
        if not replace:
954
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
955
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
956
        else:
957
            self.node.attribute_del(dest_version_id, domain)
958
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
959
    
960
    def _put_metadata(self, user, node, domain, meta, replace=False):
961
        """Create a new version and store metadata."""
962
        
963
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
964
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
965
        return src_version_id, dest_version_id
966
    
967
    def _list_limits(self, listing, marker, limit):
968
        start = 0
969
        if marker:
970
            try:
971
                start = listing.index(marker) + 1
972
            except ValueError:
973
                pass
974
        if not limit or limit > 10000:
975
            limit = 10000
976
        return start, limit
977
    
978
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
979
        cont_prefix = path + '/'
980
        prefix = cont_prefix + prefix
981
        start = cont_prefix + marker if marker else None
982
        before = until if until is not None else inf
983
        filterq = keys if domain else []
984
        sizeq = size_range
985
        
986
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
987
        objects.extend([(p, None) for p in prefixes] if virtual else [])
988
        objects.sort(key=lambda x: x[0])
989
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
990
        
991
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
992
        return objects[start:start + limit]
993
    
994
    # Reporting functions.
995
    
996
    def _report_size_change(self, user, account, size, details={}):
997
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
998
        account_node = self._lookup_account(account, True)[1]
999
        total = self._get_statistics(account_node)[1]
1000
        details.update({'user': user, 'total': total})
1001
        self.queue.send(account, 'diskspace', size, details)
1002
    
1003
    # Policy functions.
1004
    
1005
    def _check_policy(self, policy):
1006
        for k in policy.keys():
1007
            if policy[k] == '':
1008
                policy[k] = self.default_policy.get(k)
1009
        for k, v in policy.iteritems():
1010
            if k == 'quota':
1011
                q = int(v) # May raise ValueError.
1012
                if q < 0:
1013
                    raise ValueError
1014
            elif k == 'versioning':
1015
                if v not in ['auto', 'none']:
1016
                    raise ValueError
1017
            else:
1018
                raise ValueError
1019
    
1020
    def _put_policy(self, node, policy, replace):
1021
        if replace:
1022
            for k, v in self.default_policy.iteritems():
1023
                if k not in policy:
1024
                    policy[k] = v
1025
        self.node.policy_set(node, policy)
1026
    
1027
    def _get_policy(self, node):
1028
        policy = self.default_policy.copy()
1029
        policy.update(self.node.policy_get(node))
1030
        return policy
1031
    
1032
    def _apply_versioning(self, account, container, version_id):
1033
        """Delete the provided version if such is the policy.
1034
           Return size of object removed.
1035
        """
1036
        
1037
        if version_id is None:
1038
            return 0
1039
        path, node = self._lookup_container(account, container)
1040
        versioning = self._get_policy(node)['versioning']
1041
        if versioning != 'auto':
1042
            hash, size = self.node.version_remove(version_id)
1043
            self.store.map_delete(hash)
1044
            return size
1045
        return 0
1046
    
1047
    # Access control functions.
1048
    
1049
    def _check_groups(self, groups):
1050
        # raise ValueError('Bad characters in groups')
1051
        pass
1052
    
1053
    def _check_permissions(self, path, permissions):
1054
        # raise ValueError('Bad characters in permissions')
1055
        pass
1056
    
1057
    def _get_formatted_paths(self, paths):
1058
        formatted = []
1059
        for p in paths:
1060
            node = self.node.node_lookup(p)
1061
            if node is not None:
1062
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1063
            if props is not None:
1064
                if props[self.TYPE] in ('application/directory', 'application/folder'):
1065
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1066
                formatted.append((p, self.MATCH_EXACT))
1067
        return formatted
1068
    
1069
    def _get_permissions_path(self, account, container, name):
1070
        path = '/'.join((account, container, name))
1071
        permission_paths = self.permissions.access_inherit(path)
1072
        permission_paths.sort()
1073
        permission_paths.reverse()
1074
        for p in permission_paths:
1075
            if p == path:
1076
                return p
1077
            else:
1078
                if p.count('/') < 2:
1079
                    continue
1080
                node = self.node.node_lookup(p)
1081
                if node is not None:
1082
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1083
                if props is not None:
1084
                    if props[self.TYPE] in ('application/directory', 'application/folder'):
1085
                        return p
1086
        return None
1087
    
1088
    def _can_read(self, user, account, container, name):
1089
        if user == account:
1090
            return True
1091
        path = '/'.join((account, container, name))
1092
        if self.permissions.public_get(path) is not None:
1093
            return True
1094
        path = self._get_permissions_path(account, container, name)
1095
        if not path:
1096
            raise NotAllowedError
1097
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1098
            raise NotAllowedError
1099
    
1100
    def _can_write(self, user, account, container, name):
1101
        if user == account:
1102
            return True
1103
        path = '/'.join((account, container, name))
1104
        path = self._get_permissions_path(account, container, name)
1105
        if not path:
1106
            raise NotAllowedError
1107
        if not self.permissions.access_check(path, self.WRITE, user):
1108
            raise NotAllowedError
1109
    
1110
    def _allowed_accounts(self, user):
1111
        allow = set()
1112
        for path in self.permissions.access_list_paths(user):
1113
            allow.add(path.split('/', 1)[0])
1114
        return sorted(allow)
1115
    
1116
    def _allowed_containers(self, user, account):
1117
        allow = set()
1118
        for path in self.permissions.access_list_paths(user, account):
1119
            allow.add(path.split('/', 2)[1])
1120
        return sorted(allow)