Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 46286f5f

History | View | Annotate | Download (40.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import binascii
40

    
41
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42

    
43
from pithos.lib.hashmap import HashMap
44

    
45
# Default modules and settings.
46
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49
DEFAULT_BLOCK_PATH = 'data/'
50
DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52

    
53
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
54

    
55
inf = float('inf')
56

    
57
ULTIMATE_ANSWER = 42
58

    
59

    
60
logger = logging.getLogger(__name__)
61

    
62

    
63
def backend_method(func=None, autocommit=1):
64
    if func is None:
65
        def fn(func):
66
            return backend_method(func, autocommit)
67
        return fn
68

    
69
    if not autocommit:
70
        return func
71
    def fn(self, *args, **kw):
72
        self.wrapper.execute()
73
        try:
74
            ret = func(self, *args, **kw)
75
            self.wrapper.commit()
76
            return ret
77
        except:
78
            self.wrapper.rollback()
79
            raise
80
    return fn
81

    
82

    
83
class ModularBackend(BaseBackend):
84
    """A modular backend.
85
    
86
    Uses modules for SQL functions and storage.
87
    """
88
    
89
    def __init__(self, db_module=None, db_connection=None,
90
                 block_module=None, block_path=None,
91
                 queue_module=None, queue_connection=None):
92
        db_module = db_module or DEFAULT_DB_MODULE
93
        db_connection = db_connection or DEFAULT_DB_CONNECTION
94
        block_module = block_module or DEFAULT_BLOCK_MODULE
95
        block_path = block_path or DEFAULT_BLOCK_PATH
96
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
97
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
98
        
99
        self.hash_algorithm = 'sha256'
100
        self.block_size = 4 * 1024 * 1024 # 4MB
101
        
102
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
103
        
104
        def load_module(m):
105
            __import__(m)
106
            return sys.modules[m]
107
        
108
        self.db_module = load_module(db_module)
109
        self.wrapper = self.db_module.DBWrapper(db_connection)
110
        params = {'wrapper': self.wrapper}
111
        self.permissions = self.db_module.Permissions(**params)
112
        for x in ['READ', 'WRITE']:
113
            setattr(self, x, getattr(self.db_module, x))
114
        self.node = self.db_module.Node(**params)
115
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
116
            setattr(self, x, getattr(self.db_module, x))
117
        
118
        self.block_module = load_module(block_module)
119
        params = {'path': block_path,
120
                  'block_size': self.block_size,
121
                  'hash_algorithm': self.hash_algorithm}
122
        self.store = self.block_module.Store(**params)
123

    
124
        if queue_module and queue_connection:
125
            self.queue_module = load_module(queue_module)
126
            params = {'exchange': queue_connection}
127
            self.queue = self.queue_module.Queue(**params)
128
        else:
129
            class NoQueue:
130
                def send(self, key, value):
131
                    pass
132
            
133
            self.queue = NoQueue()
134
    
135
    def close(self):
136
        self.wrapper.close()
137
    
138
    @backend_method
139
    def list_accounts(self, user, marker=None, limit=10000):
140
        """Return a list of accounts the user can access."""
141
        
142
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
143
        allowed = self._allowed_accounts(user)
144
        start, limit = self._list_limits(allowed, marker, limit)
145
        return allowed[start:start + limit]
146
    
147
    @backend_method
148
    def get_account_meta(self, user, account, domain, until=None):
149
        """Return a dictionary with the account metadata for the domain."""
150
        
151
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
152
        path, node = self._lookup_account(account, user == account)
153
        if user != account:
154
            if until or node is None or account not in self._allowed_accounts(user):
155
                raise NotAllowedError
156
        try:
157
            props = self._get_properties(node, until)
158
            mtime = props[self.MTIME]
159
        except NameError:
160
            props = None
161
            mtime = until
162
        count, bytes, tstamp = self._get_statistics(node, until)
163
        tstamp = max(tstamp, mtime)
164
        if until is None:
165
            modified = tstamp
166
        else:
167
            modified = self._get_statistics(node)[2] # Overall last modification.
168
            modified = max(modified, mtime)
169
        
170
        if user != account:
171
            meta = {'name': account}
172
        else:
173
            meta = {}
174
            if props is not None:
175
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
176
            if until is not None:
177
                meta.update({'until_timestamp': tstamp})
178
            meta.update({'name': account, 'count': count, 'bytes': bytes})
179
        meta.update({'modified': modified})
180
        return meta
181
    
182
    @backend_method
183
    def update_account_meta(self, user, account, domain, meta, replace=False):
184
        """Update the metadata associated with the account for the domain."""
185
        
186
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
187
        if user != account:
188
            raise NotAllowedError
189
        path, node = self._lookup_account(account, True)
190
        self._put_metadata(user, node, domain, meta, replace)
191
    
192
    @backend_method
193
    def get_account_groups(self, user, account):
194
        """Return a dictionary with the user groups defined for this account."""
195
        
196
        logger.debug("get_account_groups: %s", account)
197
        if user != account:
198
            if account not in self._allowed_accounts(user):
199
                raise NotAllowedError
200
            return {}
201
        self._lookup_account(account, True)
202
        return self.permissions.group_dict(account)
203
    
204
    @backend_method
205
    def update_account_groups(self, user, account, groups, replace=False):
206
        """Update the groups associated with the account."""
207
        
208
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
209
        if user != account:
210
            raise NotAllowedError
211
        self._lookup_account(account, True)
212
        self._check_groups(groups)
213
        if replace:
214
            self.permissions.group_destroy(account)
215
        for k, v in groups.iteritems():
216
            if not replace: # If not already deleted.
217
                self.permissions.group_delete(account, k)
218
            if v:
219
                self.permissions.group_addmany(account, k, v)
220
    
221
    @backend_method
222
    def get_account_policy(self, user, account):
223
        """Return a dictionary with the account policy."""
224
        
225
        logger.debug("get_account_policy: %s", account)
226
        if user != account:
227
            if account not in self._allowed_accounts(user):
228
                raise NotAllowedError
229
            return {}
230
        path, node = self._lookup_account(account, True)
231
        return self._get_policy(node)
232
    
233
    @backend_method
234
    def update_account_policy(self, user, account, policy, replace=False):
235
        """Update the policy associated with the account."""
236
        
237
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
238
        if user != account:
239
            raise NotAllowedError
240
        path, node = self._lookup_account(account, True)
241
        self._check_policy(policy)
242
        self._put_policy(node, policy, replace)
243
    
244
    @backend_method
245
    def put_account(self, user, account, policy={}):
246
        """Create a new account with the given name."""
247
        
248
        logger.debug("put_account: %s %s", account, policy)
249
        if user != account:
250
            raise NotAllowedError
251
        node = self.node.node_lookup(account)
252
        if node is not None:
253
            raise NameError('Account already exists')
254
        if policy:
255
            self._check_policy(policy)
256
        node = self._put_path(user, self.ROOTNODE, account)
257
        self._put_policy(node, policy, True)
258
    
259
    @backend_method
260
    def delete_account(self, user, account):
261
        """Delete the account with the given name."""
262
        
263
        logger.debug("delete_account: %s", account)
264
        if user != account:
265
            raise NotAllowedError
266
        node = self.node.node_lookup(account)
267
        if node is None:
268
            return
269
        if not self.node.node_remove(node):
270
            raise IndexError('Account is not empty')
271
        self.permissions.group_destroy(account)
272
    
273
    @backend_method
274
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
275
        """Return a list of containers existing under an account."""
276
        
277
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
278
        if user != account:
279
            if until or account not in self._allowed_accounts(user):
280
                raise NotAllowedError
281
            allowed = self._allowed_containers(user, account)
282
            start, limit = self._list_limits(allowed, marker, limit)
283
            return allowed[start:start + limit]
284
        if shared:
285
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
286
            allowed = list(set(allowed))
287
            start, limit = self._list_limits(allowed, marker, limit)
288
            return allowed[start:start + limit]
289
        node = self.node.node_lookup(account)
290
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
291
    
292
    @backend_method
293
    def get_container_meta(self, user, account, container, domain, until=None):
294
        """Return a dictionary with the container metadata for the domain."""
295
        
296
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
297
        if user != account:
298
            if until or container not in self._allowed_containers(user, account):
299
                raise NotAllowedError
300
        path, node = self._lookup_container(account, container)
301
        props = self._get_properties(node, until)
302
        mtime = props[self.MTIME]
303
        count, bytes, tstamp = self._get_statistics(node, until)
304
        tstamp = max(tstamp, mtime)
305
        if until is None:
306
            modified = tstamp
307
        else:
308
            modified = self._get_statistics(node)[2] # Overall last modification.
309
            modified = max(modified, mtime)
310
        
311
        if user != account:
312
            meta = {'name': container}
313
        else:
314
            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
315
            if until is not None:
316
                meta.update({'until_timestamp': tstamp})
317
            meta.update({'name': container, 'count': count, 'bytes': bytes})
318
        meta.update({'modified': modified})
319
        return meta
320
    
321
    @backend_method
322
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
323
        """Update the metadata associated with the container for the domain."""
324
        
325
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
326
        if user != account:
327
            raise NotAllowedError
328
        path, node = self._lookup_container(account, container)
329
        self._put_metadata(user, node, domain, meta, replace)
330
    
331
    @backend_method
332
    def get_container_policy(self, user, account, container):
333
        """Return a dictionary with the container policy."""
334
        
335
        logger.debug("get_container_policy: %s %s", account, container)
336
        if user != account:
337
            if container not in self._allowed_containers(user, account):
338
                raise NotAllowedError
339
            return {}
340
        path, node = self._lookup_container(account, container)
341
        return self._get_policy(node)
342
    
343
    @backend_method
344
    def update_container_policy(self, user, account, container, policy, replace=False):
345
        """Update the policy associated with the container."""
346
        
347
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
348
        if user != account:
349
            raise NotAllowedError
350
        path, node = self._lookup_container(account, container)
351
        self._check_policy(policy)
352
        self._put_policy(node, policy, replace)
353
    
354
    @backend_method
355
    def put_container(self, user, account, container, policy={}):
356
        """Create a new container with the given name."""
357
        
358
        logger.debug("put_container: %s %s %s", account, container, policy)
359
        if user != account:
360
            raise NotAllowedError
361
        try:
362
            path, node = self._lookup_container(account, container)
363
        except NameError:
364
            pass
365
        else:
366
            raise NameError('Container already exists')
367
        if policy:
368
            self._check_policy(policy)
369
        path = '/'.join((account, container))
370
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
371
        self._put_policy(node, policy, True)
372
    
373
    @backend_method
374
    def delete_container(self, user, account, container, until=None):
375
        """Delete/purge the container with the given name."""
376
        
377
        logger.debug("delete_container: %s %s %s", account, container, until)
378
        if user != account:
379
            raise NotAllowedError
380
        path, node = self._lookup_container(account, container)
381
        
382
        if until is not None:
383
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
384
            for h in hashes:
385
                self.store.map_delete(h)
386
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
387
            self.queue.send('#', {'op': 'del objects'})
388
            return
389
        
390
        if self._get_statistics(node)[0] > 0:
391
            raise IndexError('Container is not empty')
392
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
393
        for h in hashes:
394
            self.store.map_delete(h)
395
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
396
        self.node.node_remove(node)
397
        self.queue.send('#', {'op': 'del objects'})
398
    
399
    @backend_method
400
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
401
        """Return a list of objects existing under a container."""
402
        
403
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
404
        allowed = []
405
        if user != account:
406
            if until:
407
                raise NotAllowedError
408
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
409
            if not allowed:
410
                raise NotAllowedError
411
        else:
412
            if shared:
413
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
414
                if not allowed:
415
                    return []
416
        path, node = self._lookup_container(account, container)
417
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
418
    
419
    @backend_method
420
    def list_object_meta(self, user, account, container, domain, until=None):
421
        """Return a list with all the container's object meta keys for the domain."""
422
        
423
        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
424
        allowed = []
425
        if user != account:
426
            if until:
427
                raise NotAllowedError
428
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
429
            if not allowed:
430
                raise NotAllowedError
431
        path, node = self._lookup_container(account, container)
432
        before = until if until is not None else inf
433
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
434
    
435
    @backend_method
436
    def get_object_meta(self, user, account, container, name, domain, version=None):
437
        """Return a dictionary with the object metadata for the domain."""
438
        
439
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
440
        self._can_read(user, account, container, name)
441
        path, node = self._lookup_object(account, container, name)
442
        props = self._get_version(node, version)
443
        if version is None:
444
            modified = props[self.MTIME]
445
        else:
446
            try:
447
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
448
            except NameError: # Object may be deleted.
449
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
450
                if del_props is None:
451
                    raise NameError('Object does not exist')
452
                modified = del_props[self.MTIME]
453
        
454
        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
455
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
456
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
457
        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
458
        return meta
459
    
460
    @backend_method
461
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
462
        """Update the metadata associated with the object for the domain and return the new version."""
463
        
464
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
465
        self._can_write(user, account, container, name)
466
        path, node = self._lookup_object(account, container, name)
467
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
468
        self._apply_versioning(account, container, src_version_id)
469
        return dest_version_id
470
    
471
    @backend_method
472
    def get_object_permissions(self, user, account, container, name):
473
        """Return the action allowed on the object, the path
474
        from which the object gets its permissions from,
475
        along with a dictionary containing the permissions."""
476
        
477
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
478
        allowed = 'write'
479
        if user != account:
480
            path = '/'.join((account, container, name))
481
            if self.permissions.access_check(path, self.WRITE, user):
482
                allowed = 'write'
483
            elif self.permissions.access_check(path, self.READ, user):
484
                allowed = 'read'
485
            else:
486
                raise NotAllowedError
487
        path = self._lookup_object(account, container, name)[0]
488
        return (allowed,) + self.permissions.access_inherit(path)
489
    
490
    @backend_method
491
    def update_object_permissions(self, user, account, container, name, permissions):
492
        """Update the permissions associated with the object."""
493
        
494
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
495
        if user != account:
496
            raise NotAllowedError
497
        path = self._lookup_object(account, container, name)[0]
498
        self._check_permissions(path, permissions)
499
        self.permissions.access_set(path, permissions)
500
    
501
    @backend_method
502
    def get_object_public(self, user, account, container, name):
503
        """Return the public id of the object if applicable."""
504
        
505
        logger.debug("get_object_public: %s %s %s", account, container, name)
506
        self._can_read(user, account, container, name)
507
        path = self._lookup_object(account, container, name)[0]
508
        p = self.permissions.public_get(path)
509
        if p is not None:
510
            p += ULTIMATE_ANSWER
511
        return p
512
    
513
    @backend_method
514
    def update_object_public(self, user, account, container, name, public):
515
        """Update the public status of the object."""
516
        
517
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
518
        self._can_write(user, account, container, name)
519
        path = self._lookup_object(account, container, name)[0]
520
        if not public:
521
            self.permissions.public_unset(path)
522
        else:
523
            self.permissions.public_set(path)
524
    
525
    @backend_method
526
    def get_object_hashmap(self, user, account, container, name, version=None):
527
        """Return the object's size and a list with partial hashes."""
528
        
529
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
530
        self._can_read(user, account, container, name)
531
        path, node = self._lookup_object(account, container, name)
532
        props = self._get_version(node, version)
533
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
534
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
535
    
536
    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
537
        if permissions is not None and user != account:
538
            raise NotAllowedError
539
        self._can_write(user, account, container, name)
540
        if permissions is not None:
541
            path = '/'.join((account, container, name))
542
            self._check_permissions(path, permissions)
543
        
544
        account_path, account_node = self._lookup_account(account, True)
545
        container_path, container_node = self._lookup_container(account, container)
546
        path, node = self._put_object_node(container_path, container_node, name)
547
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
548
        
549
        # Check quota.
550
        versioning = self._get_policy(container_node)['versioning']
551
        if versioning != 'auto':
552
            size_delta = size - 0 # TODO: Get previous size.
553
        else:
554
            size_delta = size
555
        if size_delta > 0:
556
            account_quota = long(self._get_policy(account_node)['quota'])
557
            container_quota = long(self._get_policy(container_node)['quota'])
558
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
559
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
560
                # This must be executed in a transaction, so the version is never created if it fails.
561
                raise QuotaError
562
        
563
        if permissions is not None:
564
            self.permissions.access_set(path, permissions)
565
        self._apply_versioning(account, container, pre_version_id)
566
        return pre_version_id, dest_version_id
567
    
568
    @backend_method
569
    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
570
        """Create/update an object with the specified size and partial hashes."""
571
        
572
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
573
        if size == 0: # No such thing as an empty hashmap.
574
            hashmap = [self.put_block('')]
575
        map = HashMap(self.block_size, self.hash_algorithm)
576
        map.extend([binascii.unhexlify(x) for x in hashmap])
577
        missing = self.store.block_search(map)
578
        if missing:
579
            ie = IndexError()
580
            ie.data = [binascii.hexlify(x) for x in missing]
581
            raise ie
582
        
583
        hash = map.hash()
584
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
585
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
586
        self.store.map_put(hash, map)
587
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
588
        return dest_version_id
589
    
590
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
591
        self._can_read(user, src_account, src_container, src_name)
592
        path, node = self._lookup_object(src_account, src_container, src_name)
593
        # TODO: Will do another fetch of the properties in duplicate version...
594
        props = self._get_version(node, src_version) # Check to see if source exists.
595
        src_version_id = props[self.SERIAL]
596
        hash = props[self.HASH]
597
        size = props[self.SIZE]
598
        
599
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
600
        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
601
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
602
        return dest_version_id
603
    
604
    @backend_method
605
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
606
        """Copy an object's data and metadata."""
607
        
608
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
609
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
610
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
611
        return dest_version_id
612
    
613
    @backend_method
614
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
615
        """Move an object's data and metadata."""
616
        
617
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
618
        if user != src_account:
619
            raise NotAllowedError
620
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
621
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
622
            self._delete_object(user, src_account, src_container, src_name)
623
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
624
        return dest_version_id
625
    
626
    def _delete_object(self, user, account, container, name, until=None):
627
        if user != account:
628
            raise NotAllowedError
629
        
630
        if until is not None:
631
            path = '/'.join((account, container, name))
632
            node = self.node.node_lookup(path)
633
            if node is None:
634
                return
635
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
636
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
637
            for h in hashes:
638
                self.store.map_delete(h)
639
            self.node.node_purge(node, until, CLUSTER_DELETED)
640
            try:
641
                props = self._get_version(node)
642
            except NameError:
643
                self.permissions.access_clear(path)
644
            self.queue.send('#', {'op': 'del objects'})
645
            return
646
        
647
        path, node = self._lookup_object(account, container, name)
648
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
649
        self._apply_versioning(account, container, src_version_id)
650
        self.permissions.access_clear(path)
651
    
652
    @backend_method
653
    def delete_object(self, user, account, container, name, until=None):
654
        """Delete/purge an object."""
655
        
656
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
657
        self._delete_object(user, account, container, name, until)
658
    
659
    @backend_method
660
    def list_versions(self, user, account, container, name):
661
        """Return a list of all (version, version_timestamp) tuples for an object."""
662
        
663
        logger.debug("list_versions: %s %s %s", account, container, name)
664
        self._can_read(user, account, container, name)
665
        path, node = self._lookup_object(account, container, name)
666
        versions = self.node.node_get_versions(node)
667
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
668
    
669
    @backend_method
670
    def get_uuid(self, user, uuid):
671
        """Return the (account, container, name) for the UUID given."""
672
        
673
        logger.debug("get_uuid: %s", uuid)
674
        info = self.node.latest_uuid(uuid)
675
        if info is None:
676
            raise NameError
677
        path, serial = info
678
        account, container, name = path.split('/', 2)
679
        self._can_read(user, account, container, name)
680
        return (account, container, name)
681
    
682
    @backend_method
683
    def get_public(self, user, public):
684
        """Return the (account, container, name) for the public id given."""
685
        
686
        logger.debug("get_public: %s", public)
687
        if public is None or public < ULTIMATE_ANSWER:
688
            raise NameError
689
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
690
        if path is None:
691
            raise NameError
692
        account, container, name = path.split('/', 2)
693
        self._can_read(user, account, container, name)
694
        return (account, container, name)
695
    
696
    @backend_method(autocommit=0)
697
    def get_block(self, hash):
698
        """Return a block's data."""
699
        
700
        logger.debug("get_block: %s", hash)
701
        block = self.store.block_get(binascii.unhexlify(hash))
702
        if not block:
703
            raise NameError('Block does not exist')
704
        return block
705
    
706
    @backend_method(autocommit=0)
707
    def put_block(self, data):
708
        """Store a block and return the hash."""
709
        
710
        logger.debug("put_block: %s", len(data))
711
        return binascii.hexlify(self.store.block_put(data))
712
    
713
    @backend_method(autocommit=0)
714
    def update_block(self, hash, data, offset=0):
715
        """Update a known block and return the hash."""
716
        
717
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
718
        if offset == 0 and len(data) == self.block_size:
719
            return self.put_block(data)
720
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
721
        return binascii.hexlify(h)
722
    
723
    # Path functions.
724
    
725
    def _generate_uuid(self):
726
        return str(uuidlib.uuid4())
727
    
728
    def _put_object_node(self, path, parent, name):
729
        path = '/'.join((path, name))
730
        node = self.node.node_lookup(path)
731
        if node is None:
732
            node = self.node.node_create(parent, path)
733
        return path, node
734
    
735
    def _put_path(self, user, parent, path):
736
        node = self.node.node_create(parent, path)
737
        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
738
        return node
739
    
740
    def _lookup_account(self, account, create=True):
741
        node = self.node.node_lookup(account)
742
        if node is None and create:
743
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
744
        return account, node
745
    
746
    def _lookup_container(self, account, container):
747
        path = '/'.join((account, container))
748
        node = self.node.node_lookup(path)
749
        if node is None:
750
            raise NameError('Container does not exist')
751
        return path, node
752
    
753
    def _lookup_object(self, account, container, name):
754
        path = '/'.join((account, container, name))
755
        node = self.node.node_lookup(path)
756
        if node is None:
757
            raise NameError('Object does not exist')
758
        return path, node
759
    
760
    def _get_properties(self, node, until=None):
761
        """Return properties until the timestamp given."""
762
        
763
        before = until if until is not None else inf
764
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
765
        if props is None and until is not None:
766
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
767
        if props is None:
768
            raise NameError('Path does not exist')
769
        return props
770
    
771
    def _get_statistics(self, node, until=None):
772
        """Return count, sum of size and latest timestamp of everything under node."""
773
        
774
        if until is None:
775
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
776
        else:
777
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
778
        if stats is None:
779
            stats = (0, 0, 0)
780
        return stats
781
    
782
    def _get_version(self, node, version=None):
783
        if version is None:
784
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
785
            if props is None:
786
                raise NameError('Object does not exist')
787
        else:
788
            try:
789
                version = int(version)
790
            except ValueError:
791
                raise IndexError('Version does not exist')
792
            props = self.node.version_get_properties(version)
793
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
794
                raise IndexError('Version does not exist')
795
        return props
796
    
797
    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
798
        """Create a new version of the node."""
799
        
800
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
801
        if props is not None:
802
            src_version_id = props[self.SERIAL]
803
            src_hash = props[self.HASH]
804
            src_size = props[self.SIZE]
805
        else:
806
            src_version_id = None
807
            src_hash = None
808
            src_size = 0
809
        if size is None:
810
            hash = src_hash # This way hash can be set to None.
811
            size = src_size
812
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
813
        
814
        if src_node is None:
815
            pre_version_id = src_version_id
816
        else:
817
            pre_version_id = None
818
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
819
            if props is not None:
820
                pre_version_id = props[self.SERIAL]
821
        if pre_version_id is not None:
822
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
823
        
824
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
825
        return pre_version_id, dest_version_id
826
    
827
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
828
        if src_version_id is not None:
829
            self.node.attribute_copy(src_version_id, dest_version_id)
830
        if not replace:
831
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
832
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
833
        else:
834
            self.node.attribute_del(dest_version_id, domain)
835
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
836
    
837
    def _put_metadata(self, user, node, domain, meta, replace=False):
838
        """Create a new version and store metadata."""
839
        
840
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
841
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
842
        return src_version_id, dest_version_id
843
    
844
    def _list_limits(self, listing, marker, limit):
845
        start = 0
846
        if marker:
847
            try:
848
                start = listing.index(marker) + 1
849
            except ValueError:
850
                pass
851
        if not limit or limit > 10000:
852
            limit = 10000
853
        return start, limit
854
    
855
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
856
        cont_prefix = path + '/'
857
        prefix = cont_prefix + prefix
858
        start = cont_prefix + marker if marker else None
859
        before = until if until is not None else inf
860
        filterq = keys if domain else []
861
        sizeq = size_range
862
        
863
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
864
        objects.extend([(p, None) for p in prefixes] if virtual else [])
865
        objects.sort(key=lambda x: x[0])
866
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
867
        
868
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
869
        return objects[start:start + limit]
870
    
871
    # Policy functions.
872
    
873
    def _check_policy(self, policy):
874
        for k in policy.keys():
875
            if policy[k] == '':
876
                policy[k] = self.default_policy.get(k)
877
        for k, v in policy.iteritems():
878
            if k == 'quota':
879
                q = int(v) # May raise ValueError.
880
                if q < 0:
881
                    raise ValueError
882
            elif k == 'versioning':
883
                if v not in ['auto', 'none']:
884
                    raise ValueError
885
            else:
886
                raise ValueError
887
    
888
    def _put_policy(self, node, policy, replace):
889
        if replace:
890
            for k, v in self.default_policy.iteritems():
891
                if k not in policy:
892
                    policy[k] = v
893
        self.node.policy_set(node, policy)
894
    
895
    def _get_policy(self, node):
896
        policy = self.default_policy.copy()
897
        policy.update(self.node.policy_get(node))
898
        return policy
899
    
900
    def _apply_versioning(self, account, container, version_id):
901
        if version_id is None:
902
            return
903
        path, node = self._lookup_container(account, container)
904
        versioning = self._get_policy(node)['versioning']
905
        if versioning != 'auto':
906
            hash = self.node.version_remove(version_id)
907
            self.store.map_delete(hash)
908
    
909
    # Access control functions.
910
    
911
    def _check_groups(self, groups):
912
        # raise ValueError('Bad characters in groups')
913
        pass
914
    
915
    def _check_permissions(self, path, permissions):
916
        # raise ValueError('Bad characters in permissions')
917
        
918
        # Check for existing permissions.
919
        paths = self.permissions.access_list(path)
920
        if paths:
921
            ae = AttributeError()
922
            ae.data = paths
923
            raise ae
924
    
925
    def _can_read(self, user, account, container, name):
926
        if user == account:
927
            return True
928
        path = '/'.join((account, container, name))
929
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
930
            raise NotAllowedError
931
    
932
    def _can_write(self, user, account, container, name):
933
        if user == account:
934
            return True
935
        path = '/'.join((account, container, name))
936
        if not self.permissions.access_check(path, self.WRITE, user):
937
            raise NotAllowedError
938
    
939
    def _allowed_accounts(self, user):
940
        allow = set()
941
        for path in self.permissions.access_list_paths(user):
942
            allow.add(path.split('/', 1)[0])
943
        return sorted(allow)
944
    
945
    def _allowed_containers(self, user, account):
946
        allow = set()
947
        for path in self.permissions.access_list_paths(user, account):
948
            allow.add(path.split('/', 2)[1])
949
        return sorted(allow)