Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ e809b989

History | View | Annotate | Download (41.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import binascii
40

    
41
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42

    
43
from pithos.lib.hashmap import HashMap
44

    
45
# Default modules and settings.
46
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49
DEFAULT_BLOCK_PATH = 'data/'
50
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52

    
53
QUEUE_MESSAGE_KEY = '#'
54
QUEUE_CLIENT_ID = 2 # Pithos.
55

    
56
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
57

    
58
inf = float('inf')
59

    
60
ULTIMATE_ANSWER = 42
61

    
62

    
63
logger = logging.getLogger(__name__)
64

    
65

    
66
def backend_method(func=None, autocommit=1):
67
    if func is None:
68
        def fn(func):
69
            return backend_method(func, autocommit)
70
        return fn
71

    
72
    if not autocommit:
73
        return func
74
    def fn(self, *args, **kw):
75
        self.wrapper.execute()
76
        try:
77
            ret = func(self, *args, **kw)
78
            self.wrapper.commit()
79
            return ret
80
        except:
81
            self.wrapper.rollback()
82
            raise
83
    return fn
84

    
85

    
86
class ModularBackend(BaseBackend):
87
    """A modular backend.
88
    
89
    Uses modules for SQL functions and storage.
90
    """
91
    
92
    def __init__(self, db_module=None, db_connection=None,
93
                 block_module=None, block_path=None,
94
                 queue_module=None, queue_connection=None):
95
        db_module = db_module or DEFAULT_DB_MODULE
96
        db_connection = db_connection or DEFAULT_DB_CONNECTION
97
        block_module = block_module or DEFAULT_BLOCK_MODULE
98
        block_path = block_path or DEFAULT_BLOCK_PATH
99
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
100
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
101
        
102
        self.hash_algorithm = 'sha256'
103
        self.block_size = 4 * 1024 * 1024 # 4MB
104
        
105
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
106
        
107
        def load_module(m):
108
            __import__(m)
109
            return sys.modules[m]
110
        
111
        self.db_module = load_module(db_module)
112
        self.wrapper = self.db_module.DBWrapper(db_connection)
113
        params = {'wrapper': self.wrapper}
114
        self.permissions = self.db_module.Permissions(**params)
115
        for x in ['READ', 'WRITE']:
116
            setattr(self, x, getattr(self.db_module, x))
117
        self.node = self.db_module.Node(**params)
118
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
119
            setattr(self, x, getattr(self.db_module, x))
120
        
121
        self.block_module = load_module(block_module)
122
        params = {'path': block_path,
123
                  'block_size': self.block_size,
124
                  'hash_algorithm': self.hash_algorithm}
125
        self.store = self.block_module.Store(**params)
126

    
127
        if queue_module and queue_connection:
128
            self.queue_module = load_module(queue_module)
129
            params = {'exchange': queue_connection,
130
                      'message_key': QUEUE_MESSAGE_KEY,
131
                      'client_id': QUEUE_CLIENT_ID}
132
            self.queue = self.queue_module.Queue(**params)
133
        else:
134
            class NoQueue:
135
                def send(self, *args):
136
                    pass
137
            
138
            self.queue = NoQueue()
139
    
140
    def close(self):
141
        self.wrapper.close()
142
    
143
    @backend_method
144
    def list_accounts(self, user, marker=None, limit=10000):
145
        """Return a list of accounts the user can access."""
146
        
147
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
148
        allowed = self._allowed_accounts(user)
149
        start, limit = self._list_limits(allowed, marker, limit)
150
        return allowed[start:start + limit]
151
    
152
    @backend_method
153
    def get_account_meta(self, user, account, domain, until=None):
154
        """Return a dictionary with the account metadata for the domain."""
155
        
156
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
157
        path, node = self._lookup_account(account, user == account)
158
        if user != account:
159
            if until or node is None or account not in self._allowed_accounts(user):
160
                raise NotAllowedError
161
        try:
162
            props = self._get_properties(node, until)
163
            mtime = props[self.MTIME]
164
        except NameError:
165
            props = None
166
            mtime = until
167
        count, bytes, tstamp = self._get_statistics(node, until)
168
        tstamp = max(tstamp, mtime)
169
        if until is None:
170
            modified = tstamp
171
        else:
172
            modified = self._get_statistics(node)[2] # Overall last modification.
173
            modified = max(modified, mtime)
174
        
175
        if user != account:
176
            meta = {'name': account}
177
        else:
178
            meta = {}
179
            if props is not None:
180
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
181
            if until is not None:
182
                meta.update({'until_timestamp': tstamp})
183
            meta.update({'name': account, 'count': count, 'bytes': bytes})
184
        meta.update({'modified': modified})
185
        return meta
186
    
187
    @backend_method
188
    def update_account_meta(self, user, account, domain, meta, replace=False):
189
        """Update the metadata associated with the account for the domain."""
190
        
191
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
192
        if user != account:
193
            raise NotAllowedError
194
        path, node = self._lookup_account(account, True)
195
        self._put_metadata(user, node, domain, meta, replace)
196
    
197
    @backend_method
198
    def get_account_groups(self, user, account):
199
        """Return a dictionary with the user groups defined for this account."""
200
        
201
        logger.debug("get_account_groups: %s", account)
202
        if user != account:
203
            if account not in self._allowed_accounts(user):
204
                raise NotAllowedError
205
            return {}
206
        self._lookup_account(account, True)
207
        return self.permissions.group_dict(account)
208
    
209
    @backend_method
210
    def update_account_groups(self, user, account, groups, replace=False):
211
        """Update the groups associated with the account."""
212
        
213
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
214
        if user != account:
215
            raise NotAllowedError
216
        self._lookup_account(account, True)
217
        self._check_groups(groups)
218
        if replace:
219
            self.permissions.group_destroy(account)
220
        for k, v in groups.iteritems():
221
            if not replace: # If not already deleted.
222
                self.permissions.group_delete(account, k)
223
            if v:
224
                self.permissions.group_addmany(account, k, v)
225
    
226
    @backend_method
227
    def get_account_policy(self, user, account):
228
        """Return a dictionary with the account policy."""
229
        
230
        logger.debug("get_account_policy: %s", account)
231
        if user != account:
232
            if account not in self._allowed_accounts(user):
233
                raise NotAllowedError
234
            return {}
235
        path, node = self._lookup_account(account, True)
236
        return self._get_policy(node)
237
    
238
    @backend_method
239
    def update_account_policy(self, user, account, policy, replace=False):
240
        """Update the policy associated with the account."""
241
        
242
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
243
        if user != account:
244
            raise NotAllowedError
245
        path, node = self._lookup_account(account, True)
246
        self._check_policy(policy)
247
        self._put_policy(node, policy, replace)
248
    
249
    @backend_method
250
    def put_account(self, user, account, policy={}):
251
        """Create a new account with the given name."""
252
        
253
        logger.debug("put_account: %s %s", account, policy)
254
        if user != account:
255
            raise NotAllowedError
256
        node = self.node.node_lookup(account)
257
        if node is not None:
258
            raise NameError('Account already exists')
259
        if policy:
260
            self._check_policy(policy)
261
        node = self._put_path(user, self.ROOTNODE, account)
262
        self._put_policy(node, policy, True)
263
    
264
    @backend_method
265
    def delete_account(self, user, account):
266
        """Delete the account with the given name."""
267
        
268
        logger.debug("delete_account: %s", account)
269
        if user != account:
270
            raise NotAllowedError
271
        node = self.node.node_lookup(account)
272
        if node is None:
273
            return
274
        if not self.node.node_remove(node):
275
            raise IndexError('Account is not empty')
276
        self.permissions.group_destroy(account)
277
    
278
    @backend_method
279
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
280
        """Return a list of containers existing under an account."""
281
        
282
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
283
        if user != account:
284
            if until or account not in self._allowed_accounts(user):
285
                raise NotAllowedError
286
            allowed = self._allowed_containers(user, account)
287
            start, limit = self._list_limits(allowed, marker, limit)
288
            return allowed[start:start + limit]
289
        if shared:
290
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
291
            allowed = list(set(allowed))
292
            start, limit = self._list_limits(allowed, marker, limit)
293
            return allowed[start:start + limit]
294
        node = self.node.node_lookup(account)
295
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
296
    
297
    @backend_method
298
    def get_container_meta(self, user, account, container, domain, until=None):
299
        """Return a dictionary with the container metadata for the domain."""
300
        
301
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
302
        if user != account:
303
            if until or container not in self._allowed_containers(user, account):
304
                raise NotAllowedError
305
        path, node = self._lookup_container(account, container)
306
        props = self._get_properties(node, until)
307
        mtime = props[self.MTIME]
308
        count, bytes, tstamp = self._get_statistics(node, until)
309
        tstamp = max(tstamp, mtime)
310
        if until is None:
311
            modified = tstamp
312
        else:
313
            modified = self._get_statistics(node)[2] # Overall last modification.
314
            modified = max(modified, mtime)
315
        
316
        if user != account:
317
            meta = {'name': container}
318
        else:
319
            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
320
            if until is not None:
321
                meta.update({'until_timestamp': tstamp})
322
            meta.update({'name': container, 'count': count, 'bytes': bytes})
323
        meta.update({'modified': modified})
324
        return meta
325
    
326
    @backend_method
327
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
328
        """Update the metadata associated with the container for the domain."""
329
        
330
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
331
        if user != account:
332
            raise NotAllowedError
333
        path, node = self._lookup_container(account, container)
334
        self._put_metadata(user, node, domain, meta, replace)
335
    
336
    @backend_method
337
    def get_container_policy(self, user, account, container):
338
        """Return a dictionary with the container policy."""
339
        
340
        logger.debug("get_container_policy: %s %s", account, container)
341
        if user != account:
342
            if container not in self._allowed_containers(user, account):
343
                raise NotAllowedError
344
            return {}
345
        path, node = self._lookup_container(account, container)
346
        return self._get_policy(node)
347
    
348
    @backend_method
349
    def update_container_policy(self, user, account, container, policy, replace=False):
350
        """Update the policy associated with the container."""
351
        
352
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
353
        if user != account:
354
            raise NotAllowedError
355
        path, node = self._lookup_container(account, container)
356
        self._check_policy(policy)
357
        self._put_policy(node, policy, replace)
358
    
359
    @backend_method
360
    def put_container(self, user, account, container, policy={}):
361
        """Create a new container with the given name."""
362
        
363
        logger.debug("put_container: %s %s %s", account, container, policy)
364
        if user != account:
365
            raise NotAllowedError
366
        try:
367
            path, node = self._lookup_container(account, container)
368
        except NameError:
369
            pass
370
        else:
371
            raise NameError('Container already exists')
372
        if policy:
373
            self._check_policy(policy)
374
        path = '/'.join((account, container))
375
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
376
        self._put_policy(node, policy, True)
377
    
378
    @backend_method
379
    def delete_container(self, user, account, container, until=None):
380
        """Delete/purge the container with the given name."""
381
        
382
        logger.debug("delete_container: %s %s %s", account, container, until)
383
        if user != account:
384
            raise NotAllowedError
385
        path, node = self._lookup_container(account, container)
386
        
387
        if until is not None:
388
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
389
            for h in hashes:
390
                self.store.map_delete(h)
391
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
392
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
393
            return
394
        
395
        if self._get_statistics(node)[0] > 0:
396
            raise IndexError('Container is not empty')
397
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
398
        for h in hashes:
399
            self.store.map_delete(h)
400
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
401
        self.node.node_remove(node)
402
        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
403
    
404
    # XXX: Up to here...
405
    
406
    @backend_method
407
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
408
        """Return a list of objects existing under a container."""
409
        
410
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
411
        allowed = []
412
        if user != account:
413
            if until:
414
                raise NotAllowedError
415
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
416
            if not allowed:
417
                raise NotAllowedError
418
        else:
419
            if shared:
420
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
421
                if not allowed:
422
                    return []
423
        path, node = self._lookup_container(account, container)
424
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
425
    
426
    @backend_method
427
    def list_object_meta(self, user, account, container, domain, until=None):
428
        """Return a list with all the container's object meta keys for the domain."""
429
        
430
        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
431
        allowed = []
432
        if user != account:
433
            if until:
434
                raise NotAllowedError
435
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
436
            if not allowed:
437
                raise NotAllowedError
438
        path, node = self._lookup_container(account, container)
439
        before = until if until is not None else inf
440
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
441
    
442
    @backend_method
443
    def get_object_meta(self, user, account, container, name, domain, version=None):
444
        """Return a dictionary with the object metadata for the domain."""
445
        
446
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
447
        self._can_read(user, account, container, name)
448
        path, node = self._lookup_object(account, container, name)
449
        props = self._get_version(node, version)
450
        if version is None:
451
            modified = props[self.MTIME]
452
        else:
453
            try:
454
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
455
            except NameError: # Object may be deleted.
456
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
457
                if del_props is None:
458
                    raise NameError('Object does not exist')
459
                modified = del_props[self.MTIME]
460
        
461
        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
462
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
463
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
464
        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
465
        return meta
466
    
467
    @backend_method
468
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
469
        """Update the metadata associated with the object for the domain and return the new version."""
470
        
471
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
472
        self._can_write(user, account, container, name)
473
        path, node = self._lookup_object(account, container, name)
474
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
475
        self._apply_versioning(account, container, src_version_id)
476
        return dest_version_id
477
    
478
    @backend_method
479
    def get_object_permissions(self, user, account, container, name):
480
        """Return the action allowed on the object, the path
481
        from which the object gets its permissions from,
482
        along with a dictionary containing the permissions."""
483
        
484
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
485
        allowed = 'write'
486
        if user != account:
487
            path = '/'.join((account, container, name))
488
            if self.permissions.access_check(path, self.WRITE, user):
489
                allowed = 'write'
490
            elif self.permissions.access_check(path, self.READ, user):
491
                allowed = 'read'
492
            else:
493
                raise NotAllowedError
494
        path = self._lookup_object(account, container, name)[0]
495
        return (allowed,) + self.permissions.access_inherit(path)
496
    
497
    @backend_method
498
    def update_object_permissions(self, user, account, container, name, permissions):
499
        """Update the permissions associated with the object."""
500
        
501
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
502
        if user != account:
503
            raise NotAllowedError
504
        path = self._lookup_object(account, container, name)[0]
505
        self._check_permissions(path, permissions)
506
        self.permissions.access_set(path, permissions)
507
    
508
    @backend_method
509
    def get_object_public(self, user, account, container, name):
510
        """Return the public id of the object if applicable."""
511
        
512
        logger.debug("get_object_public: %s %s %s", account, container, name)
513
        self._can_read(user, account, container, name)
514
        path = self._lookup_object(account, container, name)[0]
515
        p = self.permissions.public_get(path)
516
        if p is not None:
517
            p += ULTIMATE_ANSWER
518
        return p
519
    
520
    @backend_method
521
    def update_object_public(self, user, account, container, name, public):
522
        """Update the public status of the object."""
523
        
524
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
525
        self._can_write(user, account, container, name)
526
        path = self._lookup_object(account, container, name)[0]
527
        if not public:
528
            self.permissions.public_unset(path)
529
        else:
530
            self.permissions.public_set(path)
531
    
532
    @backend_method
533
    def get_object_hashmap(self, user, account, container, name, version=None):
534
        """Return the object's size and a list with partial hashes."""
535
        
536
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
537
        self._can_read(user, account, container, name)
538
        path, node = self._lookup_object(account, container, name)
539
        props = self._get_version(node, version)
540
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
541
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
542
    
543
    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
544
        if permissions is not None and user != account:
545
            raise NotAllowedError
546
        self._can_write(user, account, container, name)
547
        if permissions is not None:
548
            path = '/'.join((account, container, name))
549
            self._check_permissions(path, permissions)
550
        
551
        account_path, account_node = self._lookup_account(account, True)
552
        container_path, container_node = self._lookup_container(account, container)
553
        path, node = self._put_object_node(container_path, container_node, name)
554
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
555
        
556
        # Check quota.
557
        versioning = self._get_policy(container_node)['versioning']
558
        if versioning != 'auto':
559
            size_delta = size - 0 # TODO: Get previous size.
560
        else:
561
            size_delta = size
562
        if size_delta > 0:
563
            account_quota = long(self._get_policy(account_node)['quota'])
564
            container_quota = long(self._get_policy(container_node)['quota'])
565
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
566
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
567
                # This must be executed in a transaction, so the version is never created if it fails.
568
                raise QuotaError
569
        
570
        if permissions is not None:
571
            self.permissions.access_set(path, permissions)
572
        self._apply_versioning(account, container, pre_version_id)
573
        return pre_version_id, dest_version_id
574
    
575
    @backend_method
576
    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
577
        """Create/update an object with the specified size and partial hashes."""
578
        
579
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
580
        if size == 0: # No such thing as an empty hashmap.
581
            hashmap = [self.put_block('')]
582
        map = HashMap(self.block_size, self.hash_algorithm)
583
        map.extend([binascii.unhexlify(x) for x in hashmap])
584
        missing = self.store.block_search(map)
585
        if missing:
586
            ie = IndexError()
587
            ie.data = [binascii.hexlify(x) for x in missing]
588
            raise ie
589
        
590
        hash = map.hash()
591
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
592
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
593
        self.store.map_put(hash, map)
594
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
595
        return dest_version_id
596
    
597
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
598
        self._can_read(user, src_account, src_container, src_name)
599
        path, node = self._lookup_object(src_account, src_container, src_name)
600
        # TODO: Will do another fetch of the properties in duplicate version...
601
        props = self._get_version(node, src_version) # Check to see if source exists.
602
        src_version_id = props[self.SERIAL]
603
        hash = props[self.HASH]
604
        size = props[self.SIZE]
605
        
606
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
607
        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
608
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
609
        return dest_version_id
610
    
611
    @backend_method
612
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
613
        """Copy an object's data and metadata."""
614
        
615
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
616
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
617
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
618
        return dest_version_id
619
    
620
    @backend_method
621
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
622
        """Move an object's data and metadata."""
623
        
624
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
625
        if user != src_account:
626
            raise NotAllowedError
627
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
628
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
629
            self._delete_object(user, src_account, src_container, src_name)
630
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
631
        return dest_version_id
632
    
633
    def _delete_object(self, user, account, container, name, until=None):
634
        if user != account:
635
            raise NotAllowedError
636
        
637
        if until is not None:
638
            path = '/'.join((account, container, name))
639
            node = self.node.node_lookup(path)
640
            if node is None:
641
                return
642
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
643
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
644
            for h in hashes:
645
                self.store.map_delete(h)
646
            self.node.node_purge(node, until, CLUSTER_DELETED)
647
            try:
648
                props = self._get_version(node)
649
            except NameError:
650
                self.permissions.access_clear(path)
651
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
652
            return
653
        
654
        path, node = self._lookup_object(account, container, name)
655
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
656
        self._apply_versioning(account, container, src_version_id)
657
        self.permissions.access_clear(path)
658
    
659
    @backend_method
660
    def delete_object(self, user, account, container, name, until=None):
661
        """Delete/purge an object."""
662
        
663
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
664
        self._delete_object(user, account, container, name, until)
665
    
666
    @backend_method
667
    def list_versions(self, user, account, container, name):
668
        """Return a list of all (version, version_timestamp) tuples for an object."""
669
        
670
        logger.debug("list_versions: %s %s %s", account, container, name)
671
        self._can_read(user, account, container, name)
672
        path, node = self._lookup_object(account, container, name)
673
        versions = self.node.node_get_versions(node)
674
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
675
    
676
    @backend_method
677
    def get_uuid(self, user, uuid):
678
        """Return the (account, container, name) for the UUID given."""
679
        
680
        logger.debug("get_uuid: %s", uuid)
681
        info = self.node.latest_uuid(uuid)
682
        if info is None:
683
            raise NameError
684
        path, serial = info
685
        account, container, name = path.split('/', 2)
686
        self._can_read(user, account, container, name)
687
        return (account, container, name)
688
    
689
    @backend_method
690
    def get_public(self, user, public):
691
        """Return the (account, container, name) for the public id given."""
692
        
693
        logger.debug("get_public: %s", public)
694
        if public is None or public < ULTIMATE_ANSWER:
695
            raise NameError
696
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
697
        if path is None:
698
            raise NameError
699
        account, container, name = path.split('/', 2)
700
        self._can_read(user, account, container, name)
701
        return (account, container, name)
702
    
703
    @backend_method(autocommit=0)
704
    def get_block(self, hash):
705
        """Return a block's data."""
706
        
707
        logger.debug("get_block: %s", hash)
708
        block = self.store.block_get(binascii.unhexlify(hash))
709
        if not block:
710
            raise NameError('Block does not exist')
711
        return block
712
    
713
    @backend_method(autocommit=0)
714
    def put_block(self, data):
715
        """Store a block and return the hash."""
716
        
717
        logger.debug("put_block: %s", len(data))
718
        return binascii.hexlify(self.store.block_put(data))
719
    
720
    @backend_method(autocommit=0)
721
    def update_block(self, hash, data, offset=0):
722
        """Update a known block and return the hash."""
723
        
724
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
725
        if offset == 0 and len(data) == self.block_size:
726
            return self.put_block(data)
727
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
728
        return binascii.hexlify(h)
729
    
730
    # Path functions.
731
    
732
    def _generate_uuid(self):
733
        return str(uuidlib.uuid4())
734
    
735
    def _put_object_node(self, path, parent, name):
736
        path = '/'.join((path, name))
737
        node = self.node.node_lookup(path)
738
        if node is None:
739
            node = self.node.node_create(parent, path)
740
        return path, node
741
    
742
    def _put_path(self, user, parent, path):
743
        node = self.node.node_create(parent, path)
744
        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
745
        return node
746
    
747
    def _lookup_account(self, account, create=True):
748
        node = self.node.node_lookup(account)
749
        if node is None and create:
750
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
751
        return account, node
752
    
753
    def _lookup_container(self, account, container):
754
        path = '/'.join((account, container))
755
        node = self.node.node_lookup(path)
756
        if node is None:
757
            raise NameError('Container does not exist')
758
        return path, node
759
    
760
    def _lookup_object(self, account, container, name):
761
        path = '/'.join((account, container, name))
762
        node = self.node.node_lookup(path)
763
        if node is None:
764
            raise NameError('Object does not exist')
765
        return path, node
766
    
767
    def _get_properties(self, node, until=None):
768
        """Return properties until the timestamp given."""
769
        
770
        before = until if until is not None else inf
771
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
772
        if props is None and until is not None:
773
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
774
        if props is None:
775
            raise NameError('Path does not exist')
776
        return props
777
    
778
    def _get_statistics(self, node, until=None):
779
        """Return count, sum of size and latest timestamp of everything under node."""
780
        
781
        if until is None:
782
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
783
        else:
784
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
785
        if stats is None:
786
            stats = (0, 0, 0)
787
        return stats
788
    
789
    def _get_version(self, node, version=None):
790
        if version is None:
791
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
792
            if props is None:
793
                raise NameError('Object does not exist')
794
        else:
795
            try:
796
                version = int(version)
797
            except ValueError:
798
                raise IndexError('Version does not exist')
799
            props = self.node.version_get_properties(version)
800
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
801
                raise IndexError('Version does not exist')
802
        return props
803
    
804
    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
805
        """Create a new version of the node."""
806
        
807
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
808
        if props is not None:
809
            src_version_id = props[self.SERIAL]
810
            src_hash = props[self.HASH]
811
            src_size = props[self.SIZE]
812
        else:
813
            src_version_id = None
814
            src_hash = None
815
            src_size = 0
816
        if size is None:
817
            hash = src_hash # This way hash can be set to None.
818
            size = src_size
819
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
820
        
821
        if src_node is None:
822
            pre_version_id = src_version_id
823
        else:
824
            pre_version_id = None
825
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
826
            if props is not None:
827
                pre_version_id = props[self.SERIAL]
828
        if pre_version_id is not None:
829
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
830
        
831
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
832
        return pre_version_id, dest_version_id
833
    
834
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
835
        if src_version_id is not None:
836
            self.node.attribute_copy(src_version_id, dest_version_id)
837
        if not replace:
838
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
839
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
840
        else:
841
            self.node.attribute_del(dest_version_id, domain)
842
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
843
    
844
    def _put_metadata(self, user, node, domain, meta, replace=False):
845
        """Create a new version and store metadata."""
846
        
847
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
848
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
849
        return src_version_id, dest_version_id
850
    
851
    def _list_limits(self, listing, marker, limit):
852
        start = 0
853
        if marker:
854
            try:
855
                start = listing.index(marker) + 1
856
            except ValueError:
857
                pass
858
        if not limit or limit > 10000:
859
            limit = 10000
860
        return start, limit
861
    
862
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
863
        cont_prefix = path + '/'
864
        prefix = cont_prefix + prefix
865
        start = cont_prefix + marker if marker else None
866
        before = until if until is not None else inf
867
        filterq = keys if domain else []
868
        sizeq = size_range
869
        
870
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
871
        objects.extend([(p, None) for p in prefixes] if virtual else [])
872
        objects.sort(key=lambda x: x[0])
873
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
874
        
875
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
876
        return objects[start:start + limit]
877
    
878
    # Policy functions.
879
    
880
    def _check_policy(self, policy):
881
        for k in policy.keys():
882
            if policy[k] == '':
883
                policy[k] = self.default_policy.get(k)
884
        for k, v in policy.iteritems():
885
            if k == 'quota':
886
                q = int(v) # May raise ValueError.
887
                if q < 0:
888
                    raise ValueError
889
            elif k == 'versioning':
890
                if v not in ['auto', 'none']:
891
                    raise ValueError
892
            else:
893
                raise ValueError
894
    
895
    def _put_policy(self, node, policy, replace):
896
        if replace:
897
            for k, v in self.default_policy.iteritems():
898
                if k not in policy:
899
                    policy[k] = v
900
        self.node.policy_set(node, policy)
901
    
902
    def _get_policy(self, node):
903
        policy = self.default_policy.copy()
904
        policy.update(self.node.policy_get(node))
905
        return policy
906
    
907
    def _apply_versioning(self, account, container, version_id):
908
        if version_id is None:
909
            return
910
        path, node = self._lookup_container(account, container)
911
        versioning = self._get_policy(node)['versioning']
912
        if versioning != 'auto':
913
            hash = self.node.version_remove(version_id)
914
            self.store.map_delete(hash)
915
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
916
    
917
    # Access control functions.
918
    
919
    def _check_groups(self, groups):
920
        # raise ValueError('Bad characters in groups')
921
        pass
922
    
923
    def _check_permissions(self, path, permissions):
924
        # raise ValueError('Bad characters in permissions')
925
        pass
926
    
927
    def _get_permissions_path(self, account, container, name):
928
        path = '/'.join((account, container, name))
929
        permission_paths = self.permissions.access_inherit(path)
930
        permission_paths.sort()
931
        permission_paths.reverse()
932
        for p in permission_paths:
933
            if p == path:
934
                return p
935
            else:
936
                try:
937
                    parts = p.split('/', 2)
938
                    if len(parts) != 3:
939
                        return None
940
                    path, node = self._lookup_object(*p.split('/', 2))
941
                    props = self._get_version(node)
942
                    # XXX: Put type in properties...
943
                    meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
944
                    if meta['Content-Type'] == 'application/directory':
945
                        return p
946
                except NameError:
947
                    pass
948
        return None
949
    
950
    def _can_read(self, user, account, container, name):
951
        if user == account:
952
            return True
953
        path = self._get_permissions_path(account, container, name)
954
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
955
            raise NotAllowedError
956
    
957
    def _can_write(self, user, account, container, name):
958
        if user == account:
959
            return True
960
        path = '/'.join((account, container, name))
961
        if not self.permissions.access_check(path, self.WRITE, user):
962
            raise NotAllowedError
963
    
964
    def _allowed_accounts(self, user):
965
        allow = set()
966
        for path in self.permissions.access_list_paths(user):
967
            allow.add(path.split('/', 1)[0])
968
        return sorted(allow)
969
    
970
    def _allowed_containers(self, user, account):
971
        allow = set()
972
        for path in self.permissions.access_list_paths(user, account):
973
            allow.add(path.split('/', 2)[1])
974
        return sorted(allow)