Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 4a669c71

History | View | Annotate | Download (47.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import binascii
40

    
41
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42

    
43
from pithos.lib.hashmap import HashMap
44

    
45
# Default modules and settings.
46
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49
DEFAULT_BLOCK_PATH = 'data/'
50
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52

    
53
QUEUE_MESSAGE_KEY = '#'
54
QUEUE_CLIENT_ID = 2 # Pithos.
55

    
56
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
57

    
58
inf = float('inf')
59

    
60
ULTIMATE_ANSWER = 42
61

    
62

    
63
logger = logging.getLogger(__name__)
64

    
65

    
66
def backend_method(func=None, autocommit=1):
67
    if func is None:
68
        def fn(func):
69
            return backend_method(func, autocommit)
70
        return fn
71

    
72
    if not autocommit:
73
        return func
74
    def fn(self, *args, **kw):
75
        self.wrapper.execute()
76
        try:
77
            ret = func(self, *args, **kw)
78
            self.wrapper.commit()
79
            return ret
80
        except:
81
            self.wrapper.rollback()
82
            raise
83
    return fn
84

    
85

    
86
class ModularBackend(BaseBackend):
87
    """A modular backend.
88
    
89
    Uses modules for SQL functions and storage.
90
    """
91
    
92
    def __init__(self, db_module=None, db_connection=None,
93
                 block_module=None, block_path=None,
94
                 queue_module=None, queue_connection=None):
95
        db_module = db_module or DEFAULT_DB_MODULE
96
        db_connection = db_connection or DEFAULT_DB_CONNECTION
97
        block_module = block_module or DEFAULT_BLOCK_MODULE
98
        block_path = block_path or DEFAULT_BLOCK_PATH
99
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
100
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
101
        
102
        self.hash_algorithm = 'sha256'
103
        self.block_size = 4 * 1024 * 1024 # 4MB
104
        
105
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
106
        
107
        def load_module(m):
108
            __import__(m)
109
            return sys.modules[m]
110
        
111
        self.db_module = load_module(db_module)
112
        self.wrapper = self.db_module.DBWrapper(db_connection)
113
        params = {'wrapper': self.wrapper}
114
        self.permissions = self.db_module.Permissions(**params)
115
        for x in ['READ', 'WRITE']:
116
            setattr(self, x, getattr(self.db_module, x))
117
        self.node = self.db_module.Node(**params)
118
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
119
            setattr(self, x, getattr(self.db_module, x))
120
        
121
        self.block_module = load_module(block_module)
122
        params = {'path': block_path,
123
                  'block_size': self.block_size,
124
                  'hash_algorithm': self.hash_algorithm}
125
        self.store = self.block_module.Store(**params)
126

    
127
        if queue_module and queue_connection:
128
            self.queue_module = load_module(queue_module)
129
            params = {'exchange': queue_connection,
130
                      'message_key': QUEUE_MESSAGE_KEY,
131
                      'client_id': QUEUE_CLIENT_ID}
132
            self.queue = self.queue_module.Queue(**params)
133
        else:
134
            class NoQueue:
135
                def send(self, *args):
136
                    pass
137
                
138
                def close(self):
139
                    pass
140
            
141
            self.queue = NoQueue()
142
    
143
    def close(self):
144
        self.wrapper.close()
145
        self.queue.close()
146
    
147
    @backend_method
148
    def list_accounts(self, user, marker=None, limit=10000):
149
        """Return a list of accounts the user can access."""
150
        
151
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
152
        allowed = self._allowed_accounts(user)
153
        start, limit = self._list_limits(allowed, marker, limit)
154
        return allowed[start:start + limit]
155
    
156
    @backend_method
157
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
158
        """Return a dictionary with the account metadata for the domain."""
159
        
160
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
161
        path, node = self._lookup_account(account, user == account)
162
        if user != account:
163
            if until or node is None or account not in self._allowed_accounts(user):
164
                raise NotAllowedError
165
        try:
166
            props = self._get_properties(node, until)
167
            mtime = props[self.MTIME]
168
        except NameError:
169
            props = None
170
            mtime = until
171
        count, bytes, tstamp = self._get_statistics(node, until)
172
        tstamp = max(tstamp, mtime)
173
        if until is None:
174
            modified = tstamp
175
        else:
176
            modified = self._get_statistics(node)[2] # Overall last modification.
177
            modified = max(modified, mtime)
178
        
179
        if user != account:
180
            meta = {'name': account}
181
        else:
182
            meta = {}
183
            if props is not None and include_user_defined:
184
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
185
            if until is not None:
186
                meta.update({'until_timestamp': tstamp})
187
            meta.update({'name': account, 'count': count, 'bytes': bytes})
188
        meta.update({'modified': modified})
189
        return meta
190
    
191
    @backend_method
192
    def update_account_meta(self, user, account, domain, meta, replace=False):
193
        """Update the metadata associated with the account for the domain."""
194
        
195
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
196
        if user != account:
197
            raise NotAllowedError
198
        path, node = self._lookup_account(account, True)
199
        self._put_metadata(user, node, domain, meta, replace)
200
    
201
    @backend_method
202
    def get_account_groups(self, user, account):
203
        """Return a dictionary with the user groups defined for this account."""
204
        
205
        logger.debug("get_account_groups: %s", account)
206
        if user != account:
207
            if account not in self._allowed_accounts(user):
208
                raise NotAllowedError
209
            return {}
210
        self._lookup_account(account, True)
211
        return self.permissions.group_dict(account)
212
    
213
    @backend_method
214
    def update_account_groups(self, user, account, groups, replace=False):
215
        """Update the groups associated with the account."""
216
        
217
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
218
        if user != account:
219
            raise NotAllowedError
220
        self._lookup_account(account, True)
221
        self._check_groups(groups)
222
        if replace:
223
            self.permissions.group_destroy(account)
224
        for k, v in groups.iteritems():
225
            if not replace: # If not already deleted.
226
                self.permissions.group_delete(account, k)
227
            if v:
228
                self.permissions.group_addmany(account, k, v)
229
    
230
    @backend_method
231
    def get_account_policy(self, user, account):
232
        """Return a dictionary with the account policy."""
233
        
234
        logger.debug("get_account_policy: %s", account)
235
        if user != account:
236
            if account not in self._allowed_accounts(user):
237
                raise NotAllowedError
238
            return {}
239
        path, node = self._lookup_account(account, True)
240
        return self._get_policy(node)
241
    
242
    @backend_method
243
    def update_account_policy(self, user, account, policy, replace=False):
244
        """Update the policy associated with the account."""
245
        
246
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
247
        if user != account:
248
            raise NotAllowedError
249
        path, node = self._lookup_account(account, True)
250
        self._check_policy(policy)
251
        self._put_policy(node, policy, replace)
252
    
253
    @backend_method
254
    def put_account(self, user, account, policy={}):
255
        """Create a new account with the given name."""
256
        
257
        logger.debug("put_account: %s %s", account, policy)
258
        if user != account:
259
            raise NotAllowedError
260
        node = self.node.node_lookup(account)
261
        if node is not None:
262
            raise NameError('Account already exists')
263
        if policy:
264
            self._check_policy(policy)
265
        node = self._put_path(user, self.ROOTNODE, account)
266
        self._put_policy(node, policy, True)
267
    
268
    @backend_method
269
    def delete_account(self, user, account):
270
        """Delete the account with the given name."""
271
        
272
        logger.debug("delete_account: %s", account)
273
        if user != account:
274
            raise NotAllowedError
275
        node = self.node.node_lookup(account)
276
        if node is None:
277
            return
278
        if not self.node.node_remove(node):
279
            raise IndexError('Account is not empty')
280
        self.permissions.group_destroy(account)
281
    
282
    @backend_method
283
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
284
        """Return a list of containers existing under an account."""
285
        
286
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
287
        if user != account:
288
            if until or account not in self._allowed_accounts(user):
289
                raise NotAllowedError
290
            allowed = self._allowed_containers(user, account)
291
            start, limit = self._list_limits(allowed, marker, limit)
292
            return allowed[start:start + limit]
293
        if shared:
294
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
295
            allowed = list(set(allowed))
296
            start, limit = self._list_limits(allowed, marker, limit)
297
            return allowed[start:start + limit]
298
        node = self.node.node_lookup(account)
299
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
300
    
301
    @backend_method
302
    def list_container_meta(self, user, account, container, domain, until=None):
303
        """Return a list with all the container's object meta keys for the domain."""
304
        
305
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
306
        allowed = []
307
        if user != account:
308
            if until:
309
                raise NotAllowedError
310
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
311
            if not allowed:
312
                raise NotAllowedError
313
        path, node = self._lookup_container(account, container)
314
        before = until if until is not None else inf
315
        allowed = self._get_formatted_paths(allowed)
316
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
317
    
318
    @backend_method
319
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
320
        """Return a dictionary with the container metadata for the domain."""
321
        
322
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
323
        if user != account:
324
            if until or container not in self._allowed_containers(user, account):
325
                raise NotAllowedError
326
        path, node = self._lookup_container(account, container)
327
        props = self._get_properties(node, until)
328
        mtime = props[self.MTIME]
329
        count, bytes, tstamp = self._get_statistics(node, until)
330
        tstamp = max(tstamp, mtime)
331
        if until is None:
332
            modified = tstamp
333
        else:
334
            modified = self._get_statistics(node)[2] # Overall last modification.
335
            modified = max(modified, mtime)
336
        
337
        if user != account:
338
            meta = {'name': container}
339
        else:
340
            meta = {}
341
            if include_user_defined:
342
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
343
            if until is not None:
344
                meta.update({'until_timestamp': tstamp})
345
            meta.update({'name': container, 'count': count, 'bytes': bytes})
346
        meta.update({'modified': modified})
347
        return meta
348
    
349
    @backend_method
350
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
351
        """Update the metadata associated with the container for the domain."""
352
        
353
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
354
        if user != account:
355
            raise NotAllowedError
356
        path, node = self._lookup_container(account, container)
357
        self._put_metadata(user, node, domain, meta, replace)
358
    
359
    @backend_method
360
    def get_container_policy(self, user, account, container):
361
        """Return a dictionary with the container policy."""
362
        
363
        logger.debug("get_container_policy: %s %s", account, container)
364
        if user != account:
365
            if container not in self._allowed_containers(user, account):
366
                raise NotAllowedError
367
            return {}
368
        path, node = self._lookup_container(account, container)
369
        return self._get_policy(node)
370
    
371
    @backend_method
372
    def update_container_policy(self, user, account, container, policy, replace=False):
373
        """Update the policy associated with the container."""
374
        
375
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
376
        if user != account:
377
            raise NotAllowedError
378
        path, node = self._lookup_container(account, container)
379
        self._check_policy(policy)
380
        self._put_policy(node, policy, replace)
381
    
382
    @backend_method
383
    def put_container(self, user, account, container, policy={}):
384
        """Create a new container with the given name."""
385
        
386
        logger.debug("put_container: %s %s %s", account, container, policy)
387
        if user != account:
388
            raise NotAllowedError
389
        try:
390
            path, node = self._lookup_container(account, container)
391
        except NameError:
392
            pass
393
        else:
394
            raise NameError('Container already exists')
395
        if policy:
396
            self._check_policy(policy)
397
        path = '/'.join((account, container))
398
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
399
        self._put_policy(node, policy, True)
400
    
401
    @backend_method
402
    def delete_container(self, user, account, container, until=None):
403
        """Delete/purge the container with the given name."""
404
        
405
        logger.debug("delete_container: %s %s %s", account, container, until)
406
        if user != account:
407
            raise NotAllowedError
408
        path, node = self._lookup_container(account, container)
409
        
410
        if until is not None:
411
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
412
            for h in hashes:
413
                self.store.map_delete(h)
414
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
415
            self._report_size_change(user, account, -size, {'action': 'container purge'})
416
            return
417
        
418
        if self._get_statistics(node)[0] > 0:
419
            raise IndexError('Container is not empty')
420
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
421
        for h in hashes:
422
            self.store.map_delete(h)
423
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
424
        self.node.node_remove(node)
425
        self._report_size_change(user, account, -size, {'action': 'container delete'})
426
    
427
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
428
        if user != account and until:
429
            raise NotAllowedError
430
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
431
        if shared and not allowed:
432
            return []
433
        path, node = self._lookup_container(account, container)
434
        allowed = self._get_formatted_paths(allowed)
435
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
436
    
437
    def _list_object_permissions(self, user, account, container, prefix, shared):
438
        allowed = []
439
        path = '/'.join((account, container, prefix)).rstrip('/')
440
        if user != account:
441
            allowed = self.permissions.access_list_paths(user, path)
442
            if not allowed:
443
                raise NotAllowedError
444
        else:
445
            if shared:
446
                allowed = self.permissions.access_list_shared(path)
447
                if not allowed:
448
                    return []
449
        return allowed
450
    
451
    @backend_method
452
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
453
        """Return a list of object (name, version_id) tuples existing under a container."""
454
        
455
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
456
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
457
    
458
    @backend_method
459
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
460
        """Return a list of object metadata dicts existing under a container."""
461
        
462
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
463
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
464
        objects = []
465
        for p in props:
466
            if len(p) == 2:
467
                objects.append({'subdir': p[0]})
468
            else:
469
                objects.append({'name': p[0],
470
                                'bytes': p[self.SIZE + 1],
471
                                'type': p[self.TYPE + 1],
472
                                'hash': p[self.HASH + 1],
473
                                'version': p[self.SERIAL + 1],
474
                                'version_timestamp': p[self.MTIME + 1],
475
                                'modified': p[self.MTIME + 1] if until is None else None,
476
                                'modified_by': p[self.MUSER + 1],
477
                                'uuid': p[self.UUID + 1],
478
                                'checksum': p[self.CHECKSUM + 1]})
479
        return objects
480
    
481
    @backend_method
482
    def list_object_permissions(self, user, account, container, prefix=''):
483
        """Return a list of paths that enforce permissions under a container."""
484
        
485
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
486
        return self._list_object_permissions(user, account, container, prefix, True)
487
    
488
    @backend_method
489
    def list_object_public(self, user, account, container, prefix=''):
490
        """Return a dict mapping paths to public ids for objects that are public under a container."""
491
        
492
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
493
        public = {}
494
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
495
            public[path] = p + ULTIMATE_ANSWER
496
        return public
497
    
498
    @backend_method
499
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
500
        """Return a dictionary with the object metadata for the domain."""
501
        
502
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
503
        self._can_read(user, account, container, name)
504
        path, node = self._lookup_object(account, container, name)
505
        props = self._get_version(node, version)
506
        if version is None:
507
            modified = props[self.MTIME]
508
        else:
509
            try:
510
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
511
            except NameError: # Object may be deleted.
512
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
513
                if del_props is None:
514
                    raise NameError('Object does not exist')
515
                modified = del_props[self.MTIME]
516
        
517
        meta = {}
518
        if include_user_defined:
519
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
520
        meta.update({'name': name,
521
                     'bytes': props[self.SIZE],
522
                     'type': props[self.TYPE],
523
                     'hash': props[self.HASH],
524
                     'version': props[self.SERIAL],
525
                     'version_timestamp': props[self.MTIME],
526
                     'modified': modified,
527
                     'modified_by': props[self.MUSER],
528
                     'uuid': props[self.UUID],
529
                     'checksum': props[self.CHECKSUM]})
530
        return meta
531
    
532
    @backend_method
533
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
534
        """Update the metadata associated with the object for the domain and return the new version."""
535
        
536
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
537
        self._can_write(user, account, container, name)
538
        path, node = self._lookup_object(account, container, name)
539
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
540
        self._apply_versioning(account, container, src_version_id)
541
        return dest_version_id
542
    
543
    @backend_method
544
    def get_object_permissions(self, user, account, container, name):
545
        """Return the action allowed on the object, the path
546
        from which the object gets its permissions from,
547
        along with a dictionary containing the permissions."""
548
        
549
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
550
        allowed = 'write'
551
        permissions_path = self._get_permissions_path(account, container, name)
552
        if user != account:
553
            if self.permissions.access_check(permissions_path, self.WRITE, user):
554
                allowed = 'write'
555
            elif self.permissions.access_check(permissions_path, self.READ, user):
556
                allowed = 'read'
557
            else:
558
                raise NotAllowedError
559
        self._lookup_object(account, container, name)
560
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
561
    
562
    @backend_method
563
    def update_object_permissions(self, user, account, container, name, permissions):
564
        """Update the permissions associated with the object."""
565
        
566
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
567
        if user != account:
568
            raise NotAllowedError
569
        path = self._lookup_object(account, container, name)[0]
570
        self._check_permissions(path, permissions)
571
        self.permissions.access_set(path, permissions)
572
    
573
    @backend_method
574
    def get_object_public(self, user, account, container, name):
575
        """Return the public id of the object if applicable."""
576
        
577
        logger.debug("get_object_public: %s %s %s", account, container, name)
578
        self._can_read(user, account, container, name)
579
        path = self._lookup_object(account, container, name)[0]
580
        p = self.permissions.public_get(path)
581
        if p is not None:
582
            p += ULTIMATE_ANSWER
583
        return p
584
    
585
    @backend_method
586
    def update_object_public(self, user, account, container, name, public):
587
        """Update the public status of the object."""
588
        
589
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
590
        self._can_write(user, account, container, name)
591
        path = self._lookup_object(account, container, name)[0]
592
        if not public:
593
            self.permissions.public_unset(path)
594
        else:
595
            self.permissions.public_set(path)
596
    
597
    @backend_method
598
    def get_object_hashmap(self, user, account, container, name, version=None):
599
        """Return the object's size and a list with partial hashes."""
600
        
601
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
602
        self._can_read(user, account, container, name)
603
        path, node = self._lookup_object(account, container, name)
604
        props = self._get_version(node, version)
605
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
606
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
607
    
608
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
609
        if permissions is not None and user != account:
610
            raise NotAllowedError
611
        self._can_write(user, account, container, name)
612
        if permissions is not None:
613
            path = '/'.join((account, container, name))
614
            self._check_permissions(path, permissions)
615
        
616
        account_path, account_node = self._lookup_account(account, True)
617
        container_path, container_node = self._lookup_container(account, container)
618
        path, node = self._put_object_node(container_path, container_node, name)
619
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
620
        
621
        # Check quota.
622
        del_size = self._apply_versioning(account, container, pre_version_id)
623
        size_delta = size - del_size
624
        if size_delta > 0:
625
            account_quota = long(self._get_policy(account_node)['quota'])
626
            container_quota = long(self._get_policy(container_node)['quota'])
627
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
628
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
629
                # This must be executed in a transaction, so the version is never created if it fails.
630
                raise QuotaError
631
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
632
        
633
        if permissions is not None:
634
            self.permissions.access_set(path, permissions)
635
        return pre_version_id, dest_version_id
636
    
637
    @backend_method
638
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
639
        """Create/update an object with the specified size and partial hashes."""
640
        
641
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
642
        if size == 0: # No such thing as an empty hashmap.
643
            hashmap = [self.put_block('')]
644
        map = HashMap(self.block_size, self.hash_algorithm)
645
        map.extend([binascii.unhexlify(x) for x in hashmap])
646
        missing = self.store.block_search(map)
647
        if missing:
648
            ie = IndexError()
649
            ie.data = [binascii.hexlify(x) for x in missing]
650
            raise ie
651
        
652
        hash = map.hash()
653
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
654
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
655
        self.store.map_put(hash, map)
656
        return dest_version_id
657
    
658
    @backend_method
659
    def update_object_checksum(self, user, account, container, name, version, checksum):
660
        """Update an object's checksum."""
661
        
662
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
663
        # Update objects with greater version and same hashmap and size (fix metadata updates).
664
        self._can_write(user, account, container, name)
665
        path, node = self._lookup_object(account, container, name)
666
        props = self._get_version(node, version)
667
        versions = self.node.node_get_versions(node)
668
        for x in versions:
669
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
670
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
671
    
672
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
673
        self._can_read(user, src_account, src_container, src_name)
674
        path, node = self._lookup_object(src_account, src_container, src_name)
675
        # TODO: Will do another fetch of the properties in duplicate version...
676
        props = self._get_version(node, src_version) # Check to see if source exists.
677
        src_version_id = props[self.SERIAL]
678
        hash = props[self.HASH]
679
        size = props[self.SIZE]
680
        
681
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
682
        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
683
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
684
        return dest_version_id
685
    
686
    @backend_method
687
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
688
        """Copy an object's data and metadata."""
689
        
690
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
691
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
692
        return dest_version_id
693
    
694
    @backend_method
695
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
696
        """Move an object's data and metadata."""
697
        
698
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
699
        if user != src_account:
700
            raise NotAllowedError
701
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
702
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
703
            self._delete_object(user, src_account, src_container, src_name)
704
        return dest_version_id
705
    
706
    def _delete_object(self, user, account, container, name, until=None):
707
        if user != account:
708
            raise NotAllowedError
709
        
710
        if until is not None:
711
            path = '/'.join((account, container, name))
712
            node = self.node.node_lookup(path)
713
            if node is None:
714
                return
715
            hashes = []
716
            size = 0
717
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
718
            hashes += h
719
            size += s
720
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
721
            hashes += h
722
            size += s
723
            for h in hashes:
724
                self.store.map_delete(h)
725
            self.node.node_purge(node, until, CLUSTER_DELETED)
726
            try:
727
                props = self._get_version(node)
728
            except NameError:
729
                self.permissions.access_clear(path)
730
            self._report_size_change(user, account, -size, {'action': 'object purge'})
731
            return
732
        
733
        path, node = self._lookup_object(account, container, name)
734
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
735
        del_size = self._apply_versioning(account, container, src_version_id)
736
        if del_size:
737
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
738
        self.permissions.access_clear(path)
739
    
740
    @backend_method
741
    def delete_object(self, user, account, container, name, until=None):
742
        """Delete/purge an object."""
743
        
744
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
745
        self._delete_object(user, account, container, name, until)
746
    
747
    @backend_method
748
    def list_versions(self, user, account, container, name):
749
        """Return a list of all (version, version_timestamp) tuples for an object."""
750
        
751
        logger.debug("list_versions: %s %s %s", account, container, name)
752
        self._can_read(user, account, container, name)
753
        path, node = self._lookup_object(account, container, name)
754
        versions = self.node.node_get_versions(node)
755
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
756
    
757
    @backend_method
758
    def get_uuid(self, user, uuid):
759
        """Return the (account, container, name) for the UUID given."""
760
        
761
        logger.debug("get_uuid: %s", uuid)
762
        info = self.node.latest_uuid(uuid)
763
        if info is None:
764
            raise NameError
765
        path, serial = info
766
        account, container, name = path.split('/', 2)
767
        self._can_read(user, account, container, name)
768
        return (account, container, name)
769
    
770
    @backend_method
771
    def get_public(self, user, public):
772
        """Return the (account, container, name) for the public id given."""
773
        
774
        logger.debug("get_public: %s", public)
775
        if public is None or public < ULTIMATE_ANSWER:
776
            raise NameError
777
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
778
        if path is None:
779
            raise NameError
780
        account, container, name = path.split('/', 2)
781
        self._can_read(user, account, container, name)
782
        return (account, container, name)
783
    
784
    @backend_method(autocommit=0)
785
    def get_block(self, hash):
786
        """Return a block's data."""
787
        
788
        logger.debug("get_block: %s", hash)
789
        block = self.store.block_get(binascii.unhexlify(hash))
790
        if not block:
791
            raise NameError('Block does not exist')
792
        return block
793
    
794
    @backend_method(autocommit=0)
795
    def put_block(self, data):
796
        """Store a block and return the hash."""
797
        
798
        logger.debug("put_block: %s", len(data))
799
        return binascii.hexlify(self.store.block_put(data))
800
    
801
    @backend_method(autocommit=0)
802
    def update_block(self, hash, data, offset=0):
803
        """Update a known block and return the hash."""
804
        
805
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
806
        if offset == 0 and len(data) == self.block_size:
807
            return self.put_block(data)
808
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
809
        return binascii.hexlify(h)
810
    
811
    # Path functions.
812
    
813
    def _generate_uuid(self):
814
        return str(uuidlib.uuid4())
815
    
816
    def _put_object_node(self, path, parent, name):
817
        path = '/'.join((path, name))
818
        node = self.node.node_lookup(path)
819
        if node is None:
820
            node = self.node.node_create(parent, path)
821
        return path, node
822
    
823
    def _put_path(self, user, parent, path):
824
        node = self.node.node_create(parent, path)
825
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
826
        return node
827
    
828
    def _lookup_account(self, account, create=True):
829
        node = self.node.node_lookup(account)
830
        if node is None and create:
831
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
832
        return account, node
833
    
834
    def _lookup_container(self, account, container):
835
        path = '/'.join((account, container))
836
        node = self.node.node_lookup(path)
837
        if node is None:
838
            raise NameError('Container does not exist')
839
        return path, node
840
    
841
    def _lookup_object(self, account, container, name):
842
        path = '/'.join((account, container, name))
843
        node = self.node.node_lookup(path)
844
        if node is None:
845
            raise NameError('Object does not exist')
846
        return path, node
847
    
848
    def _get_properties(self, node, until=None):
849
        """Return properties until the timestamp given."""
850
        
851
        before = until if until is not None else inf
852
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
853
        if props is None and until is not None:
854
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
855
        if props is None:
856
            raise NameError('Path does not exist')
857
        return props
858
    
859
    def _get_statistics(self, node, until=None):
860
        """Return count, sum of size and latest timestamp of everything under node."""
861
        
862
        if until is None:
863
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
864
        else:
865
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
866
        if stats is None:
867
            stats = (0, 0, 0)
868
        return stats
869
    
870
    def _get_version(self, node, version=None):
871
        if version is None:
872
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
873
            if props is None:
874
                raise NameError('Object does not exist')
875
        else:
876
            try:
877
                version = int(version)
878
            except ValueError:
879
                raise IndexError('Version does not exist')
880
            props = self.node.version_get_properties(version)
881
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
882
                raise IndexError('Version does not exist')
883
        return props
884
    
885
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
886
        """Create a new version of the node."""
887
        
888
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
889
        if props is not None:
890
            src_version_id = props[self.SERIAL]
891
            src_hash = props[self.HASH]
892
            src_size = props[self.SIZE]
893
            src_type = props[self.TYPE]
894
            src_checksum = props[self.CHECKSUM]
895
        else:
896
            src_version_id = None
897
            src_hash = None
898
            src_size = 0
899
            src_type = ''
900
            src_checksum = ''
901
        if size is None: # Set metadata.
902
            hash = src_hash # This way hash can be set to None (account or container).
903
            size = src_size
904
        if type is None:
905
            type = src_type
906
        if checksum is None:
907
            checksum = src_checksum
908
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
909
        
910
        if src_node is None:
911
            pre_version_id = src_version_id
912
        else:
913
            pre_version_id = None
914
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
915
            if props is not None:
916
                pre_version_id = props[self.SERIAL]
917
        if pre_version_id is not None:
918
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
919
        
920
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
921
        return pre_version_id, dest_version_id
922
    
923
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
924
        if src_version_id is not None:
925
            self.node.attribute_copy(src_version_id, dest_version_id)
926
        if not replace:
927
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
928
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
929
        else:
930
            self.node.attribute_del(dest_version_id, domain)
931
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
932
    
933
    def _put_metadata(self, user, node, domain, meta, replace=False):
934
        """Create a new version and store metadata."""
935
        
936
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
937
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
938
        return src_version_id, dest_version_id
939
    
940
    def _list_limits(self, listing, marker, limit):
941
        start = 0
942
        if marker:
943
            try:
944
                start = listing.index(marker) + 1
945
            except ValueError:
946
                pass
947
        if not limit or limit > 10000:
948
            limit = 10000
949
        return start, limit
950
    
951
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
952
        cont_prefix = path + '/'
953
        prefix = cont_prefix + prefix
954
        start = cont_prefix + marker if marker else None
955
        before = until if until is not None else inf
956
        filterq = keys if domain else []
957
        sizeq = size_range
958
        
959
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
960
        objects.extend([(p, None) for p in prefixes] if virtual else [])
961
        objects.sort(key=lambda x: x[0])
962
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
963
        
964
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
965
        return objects[start:start + limit]
966
    
967
    # Reporting functions.
968
    
969
    def _report_size_change(self, user, account, size, details={}):
970
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
971
        account_node = self._lookup_account(account, True)[1]
972
        total = self._get_statistics(account_node)[1]
973
        details.update({'user': user, 'total': total})
974
        self.queue.send(account, 'diskspace', size, details)
975
    
976
    # Policy functions.
977
    
978
    def _check_policy(self, policy):
979
        for k in policy.keys():
980
            if policy[k] == '':
981
                policy[k] = self.default_policy.get(k)
982
        for k, v in policy.iteritems():
983
            if k == 'quota':
984
                q = int(v) # May raise ValueError.
985
                if q < 0:
986
                    raise ValueError
987
            elif k == 'versioning':
988
                if v not in ['auto', 'none']:
989
                    raise ValueError
990
            else:
991
                raise ValueError
992
    
993
    def _put_policy(self, node, policy, replace):
994
        if replace:
995
            for k, v in self.default_policy.iteritems():
996
                if k not in policy:
997
                    policy[k] = v
998
        self.node.policy_set(node, policy)
999
    
1000
    def _get_policy(self, node):
1001
        policy = self.default_policy.copy()
1002
        policy.update(self.node.policy_get(node))
1003
        return policy
1004
    
1005
    def _apply_versioning(self, account, container, version_id):
1006
        """Delete the provided version if such is the policy.
1007
           Return size of object removed.
1008
        """
1009
        
1010
        if version_id is None:
1011
            return 0
1012
        path, node = self._lookup_container(account, container)
1013
        versioning = self._get_policy(node)['versioning']
1014
        if versioning != 'auto':
1015
            hash, size = self.node.version_remove(version_id)
1016
            self.store.map_delete(hash)
1017
            return size
1018
        return 0
1019
    
1020
    # Access control functions.
1021
    
1022
    def _check_groups(self, groups):
1023
        # raise ValueError('Bad characters in groups')
1024
        pass
1025
    
1026
    def _check_permissions(self, path, permissions):
1027
        # raise ValueError('Bad characters in permissions')
1028
        pass
1029
    
1030
    def _get_formatted_paths(self, paths):
1031
        formatted = []
1032
        for p in paths:
1033
            node = self.node.node_lookup(p)
1034
            if node is not None:
1035
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1036
            if props is not None:
1037
                if props[self.TYPE] in ('application/directory', 'application/folder'):
1038
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1039
                formatted.append((p, self.MATCH_EXACT))
1040
        return formatted
1041
    
1042
    def _get_permissions_path(self, account, container, name):
1043
        path = '/'.join((account, container, name))
1044
        permission_paths = self.permissions.access_inherit(path)
1045
        permission_paths.sort()
1046
        permission_paths.reverse()
1047
        for p in permission_paths:
1048
            if p == path:
1049
                return p
1050
            else:
1051
                if p.count('/') < 2:
1052
                    continue
1053
                node = self.node.node_lookup(p)
1054
                if node is not None:
1055
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1056
                if props is not None:
1057
                    if props[self.TYPE] in ('application/directory', 'application/folder'):
1058
                        return p
1059
        return None
1060
    
1061
    def _can_read(self, user, account, container, name):
1062
        if user == account:
1063
            return True
1064
        path = '/'.join((account, container, name))
1065
        if self.permissions.public_get(path) is not None:
1066
            return True
1067
        path = self._get_permissions_path(account, container, name)
1068
        if not path:
1069
            raise NotAllowedError
1070
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1071
            raise NotAllowedError
1072
    
1073
    def _can_write(self, user, account, container, name):
1074
        if user == account:
1075
            return True
1076
        path = '/'.join((account, container, name))
1077
        path = self._get_permissions_path(account, container, name)
1078
        if not path:
1079
            raise NotAllowedError
1080
        if not self.permissions.access_check(path, self.WRITE, user):
1081
            raise NotAllowedError
1082
    
1083
    def _allowed_accounts(self, user):
1084
        allow = set()
1085
        for path in self.permissions.access_list_paths(user):
1086
            allow.add(path.split('/', 1)[0])
1087
        return sorted(allow)
1088
    
1089
    def _allowed_containers(self, user, account):
1090
        allow = set()
1091
        for path in self.permissions.access_list_paths(user, account):
1092
            allow.add(path.split('/', 2)[1])
1093
        return sorted(allow)