Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d20081ab

History | View | Annotate | Download (77.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    @list_method
388
    def list_accounts(self, user, marker=None, limit=10000):
389
        """Return a list of accounts the user can access."""
390

    
391
        return self._allowed_accounts(user)
392

    
393
    def _get_account_quotas(self, account):
394
        """Get account usage from astakos."""
395

    
396
        quotas = self.astakosclient.service_get_quotas(account)[account]
397
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
398
                                                  {})
399

    
400
    @debug_method
401
    @backend_method
402
    def get_account_meta(self, user, account, domain, until=None,
403
                         include_user_defined=True):
404
        """Return a dictionary with the account metadata for the domain."""
405

    
406
        self._can_read_account(user, account)
407
        path, node = self._lookup_account(account, user == account)
408
        if user != account:
409
            if until or (node is None):
410
                raise NotAllowedError
411
        try:
412
            props = self._get_properties(node, until)
413
            mtime = props[self.MTIME]
414
        except NameError:
415
            props = None
416
            mtime = until
417
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
418
        tstamp = max(tstamp, mtime)
419
        if until is None:
420
            modified = tstamp
421
        else:
422
            modified = self._get_statistics(
423
                node, compute=True)[2]  # Overall last modification.
424
            modified = max(modified, mtime)
425

    
426
        if user != account:
427
            meta = {'name': account}
428
        else:
429
            meta = {}
430
            if props is not None and include_user_defined:
431
                meta.update(
432
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
433
            if until is not None:
434
                meta.update({'until_timestamp': tstamp})
435
            meta.update({'name': account, 'count': count, 'bytes': bytes})
436
            if self.using_external_quotaholder:
437
                external_quota = self._get_account_quotas(account)
438
                meta['bytes'] = external_quota.get('usage', 0)
439
        meta.update({'modified': modified})
440
        return meta
441

    
442
    @debug_method
443
    @backend_method
444
    def update_account_meta(self, user, account, domain, meta, replace=False):
445
        """Update the metadata associated with the account for the domain."""
446

    
447
        self._can_write_account(user, account)
448
        path, node = self._lookup_account(account, True)
449
        self._put_metadata(user, node, domain, meta, replace,
450
                           update_statistics_ancestors_depth=-1)
451

    
452
    @debug_method
453
    @backend_method
454
    def get_account_groups(self, user, account):
455
        """Return a dictionary with the user groups defined for the account."""
456

    
457
        self._can_read_account(user, account)
458
        if user != account:
459
            return {}
460
        self._lookup_account(account, True)
461
        return self.permissions.group_dict(account)
462

    
463
    @debug_method
464
    @backend_method
465
    def update_account_groups(self, user, account, groups, replace=False):
466
        """Update the groups associated with the account."""
467

    
468
        self._can_write_account(user, account)
469
        self._lookup_account(account, True)
470
        self._check_groups(groups)
471
        if replace:
472
            self.permissions.group_destroy(account)
473
        for k, v in groups.iteritems():
474
            if not replace:  # If not already deleted.
475
                self.permissions.group_delete(account, k)
476
            if v:
477
                self.permissions.group_addmany(account, k, v)
478

    
479
    @debug_method
480
    @backend_method
481
    def get_account_policy(self, user, account):
482
        """Return a dictionary with the account policy."""
483

    
484
        self._can_read_account(user, account)
485
        if user != account:
486
            return {}
487
        path, node = self._lookup_account(account, True)
488
        policy = self._get_policy(node, is_account_policy=True)
489
        if self.using_external_quotaholder:
490
            external_quota = self._get_account_quotas(account)
491
            policy['quota'] = external_quota.get('limit', 0)
492
        return policy
493

    
494
    @debug_method
495
    @backend_method
496
    def update_account_policy(self, user, account, policy, replace=False):
497
        """Update the policy associated with the account."""
498

    
499
        self._can_write_account(user, account)
500
        path, node = self._lookup_account(account, True)
501
        self._check_policy(policy, is_account_policy=True)
502
        self._put_policy(node, policy, replace, is_account_policy=True)
503

    
504
    @debug_method
505
    @backend_method
506
    def put_account(self, user, account, policy=None):
507
        """Create a new account with the given name."""
508

    
509
        policy = policy or {}
510
        self._can_write_account(user, account)
511
        node = self.node.node_lookup(account)
512
        if node is not None:
513
            raise AccountExists('Account already exists')
514
        if policy:
515
            self._check_policy(policy, is_account_policy=True)
516
        node = self._put_path(user, self.ROOTNODE, account,
517
                              update_statistics_ancestors_depth=-1)
518
        self._put_policy(node, policy, True, is_account_policy=True)
519

    
520
    @debug_method
521
    @backend_method
522
    def delete_account(self, user, account):
523
        """Delete the account with the given name."""
524

    
525
        self._can_write_account(user, account)
526
        node = self.node.node_lookup(account)
527
        if node is None:
528
            return
529
        if not self.node.node_remove(node,
530
                                     update_statistics_ancestors_depth=-1):
531
            raise AccountNotEmpty('Account is not empty')
532
        self.permissions.group_destroy(account)
533

    
534
        # remove all the cached allowed paths
535
        # removing the specific path could be more expensive
536
        self._reset_allowed_paths()
537

    
538
    @debug_method
539
    @backend_method
540
    @list_method
541
    def list_containers(self, user, account, marker=None, limit=10000,
542
                        shared=False, until=None, public=False):
543
        """Return a list of containers existing under an account."""
544

    
545
        self._can_read_account(user, account)
546
        if user != account:
547
            if until:
548
                raise NotAllowedError
549
            return self._allowed_containers(user, account)
550
        if shared or public:
551
            allowed = set()
552
            if shared:
553
                allowed.update([x.split('/', 2)[1] for x in
554
                               self.permissions.access_list_shared(account)])
555
            if public:
556
                allowed.update([x[0].split('/', 2)[1] for x in
557
                               self.permissions.public_list(account)])
558
            return sorted(allowed)
559
        node = self.node.node_lookup(account)
560
        return [x[0] for x in self._list_object_properties(
561
            node, account, '', '/', marker, limit, False, None, [], until)]
562

    
563
    @debug_method
564
    @backend_method
565
    def list_container_meta(self, user, account, container, domain,
566
                            until=None):
567
        """Return a list of the container's object meta keys for a domain."""
568

    
569
        self._can_read_container(user, account, container)
570
        allowed = []
571
        if user != account:
572
            if until:
573
                raise NotAllowedError
574
        path, node = self._lookup_container(account, container)
575
        before = until if until is not None else inf
576
        allowed = self._get_formatted_paths(allowed)
577
        return self.node.latest_attribute_keys(node, domain, before,
578
                                               CLUSTER_DELETED, allowed)
579

    
580
    @debug_method
581
    @backend_method
582
    def get_container_meta(self, user, account, container, domain, until=None,
583
                           include_user_defined=True):
584
        """Return a dictionary with the container metadata for the domain."""
585

    
586
        self._can_read_container(user, account, container)
587
        if user != account:
588
            if until:
589
                raise NotAllowedError
590
        path, node = self._lookup_container(account, container)
591
        props = self._get_properties(node, until)
592
        mtime = props[self.MTIME]
593
        count, bytes, tstamp = self._get_statistics(node, until)
594
        tstamp = max(tstamp, mtime)
595
        if until is None:
596
            modified = tstamp
597
        else:
598
            modified = self._get_statistics(
599
                node)[2]  # Overall last modification.
600
            modified = max(modified, mtime)
601

    
602
        if user != account:
603
            meta = {'name': container}
604
        else:
605
            meta = {}
606
            if include_user_defined:
607
                meta.update(
608
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
609
            if until is not None:
610
                meta.update({'until_timestamp': tstamp})
611
            meta.update({'name': container, 'count': count, 'bytes': bytes})
612
        meta.update({'modified': modified})
613
        return meta
614

    
615
    @debug_method
616
    @backend_method
617
    def update_container_meta(self, user, account, container, domain, meta,
618
                              replace=False):
619
        """Update the metadata associated with the container for the domain."""
620

    
621
        self._can_write_container(user, account, container)
622
        path, node = self._lookup_container(account, container)
623
        src_version_id, dest_version_id = self._put_metadata(
624
            user, node, domain, meta, replace,
625
            update_statistics_ancestors_depth=0)
626
        if src_version_id is not None:
627
            versioning = self._get_policy(
628
                node, is_account_policy=False)['versioning']
629
            if versioning != 'auto':
630
                self.node.version_remove(src_version_id,
631
                                         update_statistics_ancestors_depth=0)
632

    
633
    @debug_method
634
    @backend_method
635
    def get_container_policy(self, user, account, container):
636
        """Return a dictionary with the container policy."""
637

    
638
        self._can_read_container(user, account, container)
639
        if user != account:
640
            return {}
641
        path, node = self._lookup_container(account, container)
642
        return self._get_policy(node, is_account_policy=False)
643

    
644
    @debug_method
645
    @backend_method
646
    def update_container_policy(self, user, account, container, policy,
647
                                replace=False):
648
        """Update the policy associated with the container."""
649

    
650
        self._can_write_container(user, account, container)
651
        path, node = self._lookup_container(account, container)
652
        self._check_policy(policy, is_account_policy=False)
653
        self._put_policy(node, policy, replace, is_account_policy=False)
654

    
655
    @debug_method
656
    @backend_method
657
    def put_container(self, user, account, container, policy=None):
658
        """Create a new container with the given name."""
659

    
660
        policy = policy or {}
661
        self._can_write_container(user, account, container)
662
        try:
663
            path, node = self._lookup_container(account, container)
664
        except NameError:
665
            pass
666
        else:
667
            raise ContainerExists('Container already exists')
668
        if policy:
669
            self._check_policy(policy, is_account_policy=False)
670
        path = '/'.join((account, container))
671
        node = self._put_path(
672
            user, self._lookup_account(account, True)[1], path,
673
            update_statistics_ancestors_depth=-1)
674
        self._put_policy(node, policy, True, is_account_policy=False)
675

    
676
    @debug_method
677
    @backend_method
678
    def delete_container(self, user, account, container, until=None, prefix='',
679
                         delimiter=None):
680
        """Delete/purge the container with the given name."""
681

    
682
        self._can_write_container(user, account, container)
683
        path, node = self._lookup_container(account, container)
684

    
685
        if until is not None:
686
            hashes, size, serials = self.node.node_purge_children(
687
                node, until, CLUSTER_HISTORY,
688
                update_statistics_ancestors_depth=0)
689
            for h in hashes:
690
                self.store.map_delete(h)
691
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
692
                                          update_statistics_ancestors_depth=0)
693
            if not self.free_versioning:
694
                self._report_size_change(
695
                    user, account, -size, {
696
                        'action': 'container purge',
697
                        'path': path,
698
                        'versions': ','.join(str(i) for i in serials)
699
                    }
700
                )
701
            return
702

    
703
        if not delimiter:
704
            if self._get_statistics(node)[0] > 0:
705
                raise ContainerNotEmpty('Container is not empty')
706
            hashes, size, serials = self.node.node_purge_children(
707
                node, inf, CLUSTER_HISTORY,
708
                update_statistics_ancestors_depth=0)
709
            for h in hashes:
710
                self.store.map_delete(h)
711
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
712
                                          update_statistics_ancestors_depth=0)
713
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
714
            if not self.free_versioning:
715
                self._report_size_change(
716
                    user, account, -size, {
717
                        'action': 'container purge',
718
                        'path': path,
719
                        'versions': ','.join(str(i) for i in serials)
720
                    }
721
                )
722
        else:
723
            # remove only contents
724
            src_names = self._list_objects_no_limit(
725
                user, account, container, prefix='', delimiter=None,
726
                virtual=False, domain=None, keys=[], shared=False, until=None,
727
                size_range=None, all_props=True, public=False)
728
            paths = []
729
            for t in src_names:
730
                path = '/'.join((account, container, t[0]))
731
                node = t[2]
732
                if not self._exists(node):
733
                    continue
734
                src_version_id, dest_version_id = self._put_version_duplicate(
735
                    user, node, size=0, type='', hash=None, checksum='',
736
                    cluster=CLUSTER_DELETED,
737
                    update_statistics_ancestors_depth=1)
738
                del_size = self._apply_versioning(
739
                    account, container, src_version_id,
740
                    update_statistics_ancestors_depth=1)
741
                self._report_size_change(
742
                    user, account, -del_size, {
743
                        'action': 'object delete',
744
                        'path': path,
745
                        'versions': ','.join([str(dest_version_id)])})
746
                self._report_object_change(
747
                    user, account, path, details={'action': 'object delete'})
748
                paths.append(path)
749
            self.permissions.access_clear_bulk(paths)
750

    
751
        # remove all the cached allowed paths
752
        # removing the specific path could be more expensive
753
        self._reset_allowed_paths()
754

    
755
    def _list_objects(self, user, account, container, prefix, delimiter,
756
                      marker, limit, virtual, domain, keys, shared, until,
757
                      size_range, all_props, public):
758
        if user != account and until:
759
            raise NotAllowedError
760

    
761
        objects = []
762
        if shared and public:
763
            # get shared first
764
            shared_paths = self._list_object_permissions(
765
                user, account, container, prefix, shared=True, public=False)
766
            if shared_paths:
767
                path, node = self._lookup_container(account, container)
768
                shared_paths = self._get_formatted_paths(shared_paths)
769
                objects = set(self._list_object_properties(
770
                    node, path, prefix, delimiter, marker, limit, virtual,
771
                    domain, keys, until, size_range, shared_paths, all_props))
772

    
773
            # get public
774
            objects |= set(self._list_public_object_properties(
775
                user, account, container, prefix, all_props))
776
            objects = list(objects)
777

    
778
            objects.sort(key=lambda x: x[0])
779
        elif public:
780
            objects = self._list_public_object_properties(
781
                user, account, container, prefix, all_props)
782
        else:
783
            allowed = self._list_object_permissions(
784
                user, account, container, prefix, shared, public=False)
785
            if shared and not allowed:
786
                return []
787
            path, node = self._lookup_container(account, container)
788
            allowed = self._get_formatted_paths(allowed)
789
            objects = self._list_object_properties(
790
                node, path, prefix, delimiter, marker, limit, virtual, domain,
791
                keys, until, size_range, allowed, all_props)
792

    
793
        # apply limits
794
        start, limit = self._list_limits(objects, marker, limit)
795
        return objects[start:start + limit]
796

    
797
    def _list_public_object_properties(self, user, account, container, prefix,
798
                                       all_props):
799
        public = self._list_object_permissions(
800
            user, account, container, prefix, shared=False, public=True)
801
        paths, nodes = self._lookup_objects(public)
802
        path = '/'.join((account, container))
803
        cont_prefix = path + '/'
804
        paths = [x[len(cont_prefix):] for x in paths]
805
        objects = [(p,) + props for p, props in
806
                   zip(paths, self.node.version_lookup_bulk(
807
                       nodes, all_props=all_props, order_by_path=True))]
808
        return objects
809

    
810
    def _list_objects_no_limit(self, user, account, container, prefix,
811
                               delimiter, virtual, domain, keys, shared, until,
812
                               size_range, all_props, public):
813
        objects = []
814
        while True:
815
            marker = objects[-1] if objects else None
816
            limit = 10000
817
            l = self._list_objects(
818
                user, account, container, prefix, delimiter, marker, limit,
819
                virtual, domain, keys, shared, until, size_range, all_props,
820
                public)
821
            objects.extend(l)
822
            if not l or len(l) < limit:
823
                break
824
        return objects
825

    
826
    def _list_object_permissions(self, user, account, container, prefix,
827
                                 shared, public):
828
        allowed = []
829
        path = '/'.join((account, container, prefix)).rstrip('/')
830
        if user != account:
831
            allowed = self.permissions.access_list_paths(user, path)
832
            if not allowed:
833
                raise NotAllowedError
834
        else:
835
            allowed = set()
836
            if shared:
837
                allowed.update(self.permissions.access_list_shared(path))
838
            if public:
839
                allowed.update(
840
                    [x[0] for x in self.permissions.public_list(path)])
841
            allowed = sorted(allowed)
842
            if not allowed:
843
                return []
844
        return allowed
845

    
846
    @debug_method
847
    @backend_method
848
    def list_objects(self, user, account, container, prefix='', delimiter=None,
849
                     marker=None, limit=10000, virtual=True, domain=None,
850
                     keys=None, shared=False, until=None, size_range=None,
851
                     public=False):
852
        """List (object name, object version_id) under a container."""
853

    
854
        keys = keys or []
855
        return self._list_objects(
856
            user, account, container, prefix, delimiter, marker, limit,
857
            virtual, domain, keys, shared, until, size_range, False, public)
858

    
859
    @debug_method
860
    @backend_method
861
    def list_object_meta(self, user, account, container, prefix='',
862
                         delimiter=None, marker=None, limit=10000,
863
                         virtual=True, domain=None, keys=None, shared=False,
864
                         until=None, size_range=None, public=False):
865
        """Return a list of metadata dicts of objects under a container."""
866

    
867
        keys = keys or []
868
        props = self._list_objects(
869
            user, account, container, prefix, delimiter, marker, limit,
870
            virtual, domain, keys, shared, until, size_range, True, public)
871
        objects = []
872
        for p in props:
873
            if len(p) == 2:
874
                objects.append({'subdir': p[0]})
875
            else:
876
                objects.append({
877
                    'name': p[0],
878
                    'bytes': p[self.SIZE + 1],
879
                    'type': p[self.TYPE + 1],
880
                    'hash': p[self.HASH + 1],
881
                    'version': p[self.SERIAL + 1],
882
                    'version_timestamp': p[self.MTIME + 1],
883
                    'modified': p[self.MTIME + 1] if until is None else None,
884
                    'modified_by': p[self.MUSER + 1],
885
                    'uuid': p[self.UUID + 1],
886
                    'checksum': p[self.CHECKSUM + 1]})
887
        return objects
888

    
889
    @debug_method
890
    @backend_method
891
    def list_object_permissions(self, user, account, container, prefix=''):
892
        """Return a list of paths enforce permissions under a container."""
893

    
894
        return self._list_object_permissions(user, account, container, prefix,
895
                                             True, False)
896

    
897
    @debug_method
898
    @backend_method
899
    def list_object_public(self, user, account, container, prefix=''):
900
        """Return a mapping of object paths to public ids under a container."""
901

    
902
        public = {}
903
        for path, p in self.permissions.public_list('/'.join((account,
904
                                                              container,
905
                                                              prefix))):
906
            public[path] = p
907
        return public
908

    
909
    @debug_method
910
    @backend_method
911
    def get_object_meta(self, user, account, container, name, domain,
912
                        version=None, include_user_defined=True):
913
        """Return a dictionary with the object metadata for the domain."""
914

    
915
        self._can_read_object(user, account, container, name)
916
        path, node = self._lookup_object(account, container, name)
917
        props = self._get_version(node, version)
918
        if version is None:
919
            modified = props[self.MTIME]
920
        else:
921
            try:
922
                modified = self._get_version(
923
                    node)[self.MTIME]  # Overall last modification.
924
            except NameError:  # Object may be deleted.
925
                del_props = self.node.version_lookup(
926
                    node, inf, CLUSTER_DELETED)
927
                if del_props is None:
928
                    raise ItemNotExists('Object does not exist')
929
                modified = del_props[self.MTIME]
930

    
931
        meta = {}
932
        if include_user_defined:
933
            meta.update(
934
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
935
        meta.update({'name': name,
936
                     'bytes': props[self.SIZE],
937
                     'type': props[self.TYPE],
938
                     'hash': props[self.HASH],
939
                     'version': props[self.SERIAL],
940
                     'version_timestamp': props[self.MTIME],
941
                     'modified': modified,
942
                     'modified_by': props[self.MUSER],
943
                     'uuid': props[self.UUID],
944
                     'checksum': props[self.CHECKSUM]})
945
        return meta
946

    
947
    @debug_method
948
    @backend_method
949
    def update_object_meta(self, user, account, container, name, domain, meta,
950
                           replace=False):
951
        """Update object metadata for a domain and return the new version."""
952

    
953
        self._can_write_object(user, account, container, name)
954

    
955
        path, node = self._lookup_object(account, container, name,
956
                                         lock_container=True)
957
        src_version_id, dest_version_id = self._put_metadata(
958
            user, node, domain, meta, replace,
959
            update_statistics_ancestors_depth=1)
960
        self._apply_versioning(account, container, src_version_id,
961
                               update_statistics_ancestors_depth=1)
962
        return dest_version_id
963

    
964
    @debug_method
965
    @backend_method
966
    def get_object_permissions_bulk(self, user, account, container, names):
967
        """Return the action allowed on the object, the path
968
        from which the object gets its permissions from,
969
        along with a dictionary containing the permissions."""
970

    
971
        permissions_path = self._get_permissions_path_bulk(account, container,
972
                                                           names)
973
        access_objects = self.permissions.access_check_bulk(permissions_path,
974
                                                            user)
975
        #group_parents = access_objects['group_parents']
976
        nobject_permissions = {}
977
        cpath = '/'.join((account, container, ''))
978
        cpath_idx = len(cpath)
979
        for path in permissions_path:
980
            allowed = 1
981
            name = path[cpath_idx:]
982
            if user != account:
983
                try:
984
                    allowed = access_objects[path]
985
                except KeyError:
986
                    raise NotAllowedError
987
            access_dict, allowed = \
988
                self.permissions.access_get_for_bulk(access_objects[path])
989
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
990
                                         access_dict)
991
        self._lookup_objects(permissions_path)
992
        return nobject_permissions
993

    
994
    @debug_method
995
    @backend_method
996
    def get_object_permissions(self, user, account, container, name):
997
        """Return the action allowed on the object, the path
998
        from which the object gets its permissions from,
999
        along with a dictionary containing the permissions."""
1000

    
1001
        allowed = 'write'
1002
        permissions_path = self._get_permissions_path(account, container, name)
1003
        if user != account:
1004
            if self.permissions.access_check(permissions_path, self.WRITE,
1005
                                             user):
1006
                allowed = 'write'
1007
            elif self.permissions.access_check(permissions_path, self.READ,
1008
                                               user):
1009
                allowed = 'read'
1010
            else:
1011
                raise NotAllowedError
1012
        self._lookup_object(account, container, name)
1013
        return (allowed,
1014
                permissions_path,
1015
                self.permissions.access_get(permissions_path))
1016

    
1017
    @debug_method
1018
    @backend_method
1019
    def update_object_permissions(self, user, account, container, name,
1020
                                  permissions):
1021
        """Update the permissions associated with the object."""
1022

    
1023
        if user != account:
1024
            raise NotAllowedError
1025
        path = self._lookup_object(account, container, name,
1026
                                   lock_container=True)[0]
1027
        self._check_permissions(path, permissions)
1028
        try:
1029
            self.permissions.access_set(path, permissions)
1030
        except:
1031
            raise ValueError
1032
        else:
1033
            self._report_sharing_change(user, account, path, {'members':
1034
                                        self.permissions.access_members(path)})
1035

    
1036
        # remove all the cached allowed paths
1037
        # filtering out only those affected could be more expensive
1038
        self._reset_allowed_paths()
1039

    
1040
    @debug_method
1041
    @backend_method
1042
    def get_object_public(self, user, account, container, name):
1043
        """Return the public id of the object if applicable."""
1044

    
1045
        self._can_read_object(user, account, container, name)
1046
        path = self._lookup_object(account, container, name)[0]
1047
        p = self.permissions.public_get(path)
1048
        return p
1049

    
1050
    @debug_method
1051
    @backend_method
1052
    def update_object_public(self, user, account, container, name, public):
1053
        """Update the public status of the object."""
1054

    
1055
        self._can_write_object(user, account, container, name)
1056
        path = self._lookup_object(account, container, name,
1057
                                   lock_container=True)[0]
1058
        if not public:
1059
            self.permissions.public_unset(path)
1060
        else:
1061
            self.permissions.public_set(
1062
                path, self.public_url_security, self.public_url_alphabet)
1063

    
1064
    @debug_method
1065
    @backend_method
1066
    def get_object_hashmap(self, user, account, container, name, version=None):
1067
        """Return the object's size and a list with partial hashes."""
1068

    
1069
        self._can_read_object(user, account, container, name)
1070
        path, node = self._lookup_object(account, container, name)
1071
        props = self._get_version(node, version)
1072
        if props[self.HASH] is None:
1073
            return 0, ()
1074
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1075
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1076

    
1077
    def _update_object_hash(self, user, account, container, name, size, type,
1078
                            hash, checksum, domain, meta, replace_meta,
1079
                            permissions, src_node=None, src_version_id=None,
1080
                            is_copy=False, report_size_change=True):
1081
        if permissions is not None and user != account:
1082
            raise NotAllowedError
1083
        self._can_write_object(user, account, container, name)
1084
        if permissions is not None:
1085
            path = '/'.join((account, container, name))
1086
            self._check_permissions(path, permissions)
1087

    
1088
        account_path, account_node = self._lookup_account(account, True)
1089
        container_path, container_node = self._lookup_container(
1090
            account, container)
1091

    
1092
        path, node = self._put_object_node(
1093
            container_path, container_node, name)
1094
        pre_version_id, dest_version_id = self._put_version_duplicate(
1095
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1096
            checksum=checksum, is_copy=is_copy,
1097
            update_statistics_ancestors_depth=1)
1098

    
1099
        # Handle meta.
1100
        if src_version_id is None:
1101
            src_version_id = pre_version_id
1102
        self._put_metadata_duplicate(
1103
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1104

    
1105
        del_size = self._apply_versioning(account, container, pre_version_id,
1106
                                          update_statistics_ancestors_depth=1)
1107
        size_delta = size - del_size
1108
        if size_delta > 0:
1109
            # Check account quota.
1110
            if not self.using_external_quotaholder:
1111
                account_quota = long(self._get_policy(
1112
                    account_node, is_account_policy=True)['quota'])
1113
                account_usage = self._get_statistics(account_node,
1114
                                                     compute=True)[1]
1115
                if (account_quota > 0 and account_usage > account_quota):
1116
                    raise QuotaError(
1117
                        'Account quota exceeded: limit: %s, usage: %s' % (
1118
                            account_quota, account_usage))
1119

    
1120
            # Check container quota.
1121
            container_quota = long(self._get_policy(
1122
                container_node, is_account_policy=False)['quota'])
1123
            container_usage = self._get_statistics(container_node)[1]
1124
            if (container_quota > 0 and container_usage > container_quota):
1125
                # This must be executed in a transaction, so the version is
1126
                # never created if it fails.
1127
                raise QuotaError(
1128
                    'Container quota exceeded: limit: %s, usage: %s' % (
1129
                        container_quota, container_usage
1130
                    )
1131
                )
1132

    
1133
        if report_size_change:
1134
            self._report_size_change(
1135
                user, account, size_delta,
1136
                {'action': 'object update', 'path': path,
1137
                 'versions': ','.join([str(dest_version_id)])})
1138
        if permissions is not None:
1139
            self.permissions.access_set(path, permissions)
1140
            self._report_sharing_change(
1141
                user, account, path,
1142
                {'members': self.permissions.access_members(path)})
1143

    
1144
        self._report_object_change(
1145
            user, account, path,
1146
            details={'version': dest_version_id, 'action': 'object update'})
1147
        return dest_version_id
1148

    
1149
    @debug_method
1150
    def update_object_hashmap(self, user, account, container, name, size, type,
1151
                              hashmap, checksum, domain, meta=None,
1152
                              replace_meta=False, permissions=None):
1153
        """Create/update an object's hashmap and return the new version."""
1154

    
1155
        meta = meta or {}
1156
        if size == 0:  # No such thing as an empty hashmap.
1157
            hashmap = [self.put_block('')]
1158
        map = HashMap(self.block_size, self.hash_algorithm)
1159
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1160
        missing = self.store.block_search(map)
1161
        if missing:
1162
            ie = IndexError()
1163
            ie.data = [binascii.hexlify(x) for x in missing]
1164
            raise ie
1165

    
1166
        hash = map.hash()
1167
        hexlified = binascii.hexlify(hash)
1168
        # _update_object_hash() locks destination path
1169
        dest_version_id = self._update_object_hash(
1170
            user, account, container, name, size, type, hexlified, checksum,
1171
            domain, meta, replace_meta, permissions)
1172
        self.store.map_put(hash, map)
1173
        return dest_version_id, hexlified
1174

    
1175
    @debug_method
1176
    @backend_method
1177
    def update_object_checksum(self, user, account, container, name, version,
1178
                               checksum):
1179
        """Update an object's checksum."""
1180

    
1181
        # Update objects with greater version and same hashmap
1182
        # and size (fix metadata updates).
1183
        self._can_write_object(user, account, container, name)
1184
        path, node = self._lookup_object(account, container, name,
1185
                                         lock_container=True)
1186
        props = self._get_version(node, version)
1187
        versions = self.node.node_get_versions(node)
1188
        for x in versions:
1189
            if (x[self.SERIAL] >= int(version) and
1190
                x[self.HASH] == props[self.HASH] and
1191
                    x[self.SIZE] == props[self.SIZE]):
1192
                self.node.version_put_property(
1193
                    x[self.SERIAL], 'checksum', checksum)
1194

    
1195
    def _copy_object(self, user, src_account, src_container, src_name,
1196
                     dest_account, dest_container, dest_name, type,
1197
                     dest_domain=None, dest_meta=None, replace_meta=False,
1198
                     permissions=None, src_version=None, is_move=False,
1199
                     delimiter=None):
1200

    
1201
        report_size_change = not is_move
1202
        dest_meta = dest_meta or {}
1203
        dest_version_ids = []
1204
        self._can_read_object(user, src_account, src_container, src_name)
1205

    
1206
        src_container_path = '/'.join((src_account, src_container))
1207
        dest_container_path = '/'.join((dest_account, dest_container))
1208
        # Lock container paths in alphabetical order
1209
        if src_container_path < dest_container_path:
1210
            self._lookup_container(src_account, src_container)
1211
            self._lookup_container(dest_account, dest_container)
1212
        else:
1213
            self._lookup_container(dest_account, dest_container)
1214
            self._lookup_container(src_account, src_container)
1215

    
1216
        path, node = self._lookup_object(src_account, src_container, src_name)
1217
        # TODO: Will do another fetch of the properties in duplicate version...
1218
        props = self._get_version(
1219
            node, src_version)  # Check to see if source exists.
1220
        src_version_id = props[self.SERIAL]
1221
        hash = props[self.HASH]
1222
        size = props[self.SIZE]
1223
        is_copy = not is_move and (src_account, src_container, src_name) != (
1224
            dest_account, dest_container, dest_name)  # New uuid.
1225
        dest_version_ids.append(self._update_object_hash(
1226
            user, dest_account, dest_container, dest_name, size, type, hash,
1227
            None, dest_domain, dest_meta, replace_meta, permissions,
1228
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1229
            report_size_change=report_size_change))
1230
        if is_move and ((src_account, src_container, src_name) !=
1231
                        (dest_account, dest_container, dest_name)):
1232
            self._delete_object(user, src_account, src_container, src_name,
1233
                                report_size_change=report_size_change)
1234

    
1235
        if delimiter:
1236
            prefix = (src_name + delimiter if not
1237
                      src_name.endswith(delimiter) else src_name)
1238
            src_names = self._list_objects_no_limit(
1239
                user, src_account, src_container, prefix, delimiter=None,
1240
                virtual=False, domain=None, keys=[], shared=False, until=None,
1241
                size_range=None, all_props=True, public=False)
1242
            src_names.sort(key=lambda x: x[2])  # order by nodes
1243
            paths = [elem[0] for elem in src_names]
1244
            nodes = [elem[2] for elem in src_names]
1245
            # TODO: Will do another fetch of the properties
1246
            # in duplicate version...
1247
            props = self._get_versions(nodes)  # Check to see if source exists.
1248

    
1249
            for prop, path, node in zip(props, paths, nodes):
1250
                src_version_id = prop[self.SERIAL]
1251
                hash = prop[self.HASH]
1252
                vtype = prop[self.TYPE]
1253
                size = prop[self.SIZE]
1254
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1255
                    delimiter) else dest_name
1256
                vdest_name = path.replace(prefix, dest_prefix, 1)
1257
                # _update_object_hash() locks destination path
1258
                dest_version_ids.append(self._update_object_hash(
1259
                    user, dest_account, dest_container, vdest_name, size,
1260
                    vtype, hash, None, dest_domain, meta={},
1261
                    replace_meta=False, permissions=None, src_node=node,
1262
                    src_version_id=src_version_id, is_copy=is_copy,
1263
                    report_size_change=report_size_change))
1264
                if is_move and ((src_account, src_container, src_name) !=
1265
                                (dest_account, dest_container, dest_name)):
1266
                    self._delete_object(user, src_account, src_container, path,
1267
                                        report_size_change=report_size_change)
1268
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1269
                dest_version_ids)
1270

    
1271
    @debug_method
1272
    @backend_method
1273
    def copy_object(self, user, src_account, src_container, src_name,
1274
                    dest_account, dest_container, dest_name, type, domain,
1275
                    meta=None, replace_meta=False, permissions=None,
1276
                    src_version=None, delimiter=None):
1277
        """Copy an object's data and metadata."""
1278

    
1279
        meta = meta or {}
1280
        dest_version_id = self._copy_object(
1281
            user, src_account, src_container, src_name, dest_account,
1282
            dest_container, dest_name, type, domain, meta, replace_meta,
1283
            permissions, src_version, False, delimiter)
1284
        return dest_version_id
1285

    
1286
    @debug_method
1287
    @backend_method
1288
    def move_object(self, user, src_account, src_container, src_name,
1289
                    dest_account, dest_container, dest_name, type, domain,
1290
                    meta=None, replace_meta=False, permissions=None,
1291
                    delimiter=None):
1292
        """Move an object's data and metadata."""
1293

    
1294
        meta = meta or {}
1295
        if user != src_account:
1296
            raise NotAllowedError
1297
        dest_version_id = self._move_object(
1298
            user, src_account, src_container, src_name, dest_account,
1299
            dest_container, dest_name, type, domain, meta, replace_meta,
1300
            permissions, None, delimiter=delimiter)
1301
        return dest_version_id
1302

    
1303
    def _delete_object(self, user, account, container, name, until=None,
1304
                       delimiter=None, report_size_change=True):
1305
        if user != account:
1306
            raise NotAllowedError
1307

    
1308
        # lookup object and lock container path also
1309
        path, node = self._lookup_object(account, container, name,
1310
                                         lock_container=True)
1311

    
1312
        if until is not None:
1313
            if node is None:
1314
                return
1315
            hashes = []
1316
            size = 0
1317
            serials = []
1318
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1319
                                           update_statistics_ancestors_depth=1)
1320
            hashes += h
1321
            size += s
1322
            serials += v
1323
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1324
                                           update_statistics_ancestors_depth=1)
1325
            hashes += h
1326
            if not self.free_versioning:
1327
                size += s
1328
            serials += v
1329
            for h in hashes:
1330
                self.store.map_delete(h)
1331
            self.node.node_purge(node, until, CLUSTER_DELETED,
1332
                                 update_statistics_ancestors_depth=1)
1333
            try:
1334
                self._get_version(node)
1335
            except NameError:
1336
                self.permissions.access_clear(path)
1337
            self._report_size_change(
1338
                user, account, -size, {
1339
                    'action': 'object purge',
1340
                    'path': path,
1341
                    'versions': ','.join(str(i) for i in serials)
1342
                }
1343
            )
1344
            return
1345

    
1346
        if not self._exists(node):
1347
            raise ItemNotExists('Object is deleted.')
1348

    
1349
        src_version_id, dest_version_id = self._put_version_duplicate(
1350
            user, node, size=0, type='', hash=None, checksum='',
1351
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1352
        del_size = self._apply_versioning(account, container, src_version_id,
1353
                                          update_statistics_ancestors_depth=1)
1354
        if report_size_change:
1355
            self._report_size_change(
1356
                user, account, -del_size,
1357
                {'action': 'object delete',
1358
                 'path': path,
1359
                 'versions': ','.join([str(dest_version_id)])})
1360
        self._report_object_change(
1361
            user, account, path, details={'action': 'object delete'})
1362
        self.permissions.access_clear(path)
1363

    
1364
        if delimiter:
1365
            prefix = name + delimiter if not name.endswith(delimiter) else name
1366
            src_names = self._list_objects_no_limit(
1367
                user, account, container, prefix, delimiter=None,
1368
                virtual=False, domain=None, keys=[], shared=False, until=None,
1369
                size_range=None, all_props=True, public=False)
1370
            paths = []
1371
            for t in src_names:
1372
                path = '/'.join((account, container, t[0]))
1373
                node = t[2]
1374
                if not self._exists(node):
1375
                    continue
1376
                src_version_id, dest_version_id = self._put_version_duplicate(
1377
                    user, node, size=0, type='', hash=None, checksum='',
1378
                    cluster=CLUSTER_DELETED,
1379
                    update_statistics_ancestors_depth=1)
1380
                del_size = self._apply_versioning(
1381
                    account, container, src_version_id,
1382
                    update_statistics_ancestors_depth=1)
1383
                if report_size_change:
1384
                    self._report_size_change(
1385
                        user, account, -del_size,
1386
                        {'action': 'object delete',
1387
                         'path': path,
1388
                         'versions': ','.join([str(dest_version_id)])})
1389
                self._report_object_change(
1390
                    user, account, path, details={'action': 'object delete'})
1391
                paths.append(path)
1392
            self.permissions.access_clear_bulk(paths)
1393

    
1394
        # remove all the cached allowed paths
1395
        # removing the specific path could be more expensive
1396
        self._reset_allowed_paths()
1397

    
1398
    @debug_method
1399
    @backend_method
1400
    def delete_object(self, user, account, container, name, until=None,
1401
                      prefix='', delimiter=None):
1402
        """Delete/purge an object."""
1403

    
1404
        self._delete_object(user, account, container, name, until, delimiter)
1405

    
1406
    @debug_method
1407
    @backend_method
1408
    def list_versions(self, user, account, container, name):
1409
        """Return a list of all object (version, version_timestamp) tuples."""
1410

    
1411
        self._can_read_object(user, account, container, name)
1412
        path, node = self._lookup_object(account, container, name)
1413
        versions = self.node.node_get_versions(node)
1414
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1415
                x[self.CLUSTER] != CLUSTER_DELETED]
1416

    
1417
    @debug_method
1418
    @backend_method
1419
    def get_uuid(self, user, uuid, check_permissions=True):
1420
        """Return the (account, container, name) for the UUID given."""
1421

    
1422
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1423
        if info is None:
1424
            raise NameError
1425
        path, serial = info
1426
        account, container, name = path.split('/', 2)
1427
        if check_permissions:
1428
            self._can_read_object(user, account, container, name)
1429
        return (account, container, name)
1430

    
1431
    @debug_method
1432
    @backend_method
1433
    def get_public(self, user, public):
1434
        """Return the (account, container, name) for the public id given."""
1435

    
1436
        path = self.permissions.public_path(public)
1437
        if path is None:
1438
            raise NameError
1439
        account, container, name = path.split('/', 2)
1440
        self._can_read_object(user, account, container, name)
1441
        return (account, container, name)
1442

    
1443
    def get_block(self, hash):
1444
        """Return a block's data."""
1445

    
1446
        logger.debug("get_block: %s", hash)
1447
        block = self.store.block_get(self._unhexlify_hash(hash))
1448
        if not block:
1449
            raise ItemNotExists('Block does not exist')
1450
        return block
1451

    
1452
    def put_block(self, data):
1453
        """Store a block and return the hash."""
1454

    
1455
        logger.debug("put_block: %s", len(data))
1456
        return binascii.hexlify(self.store.block_put(data))
1457

    
1458
    def update_block(self, hash, data, offset=0):
1459
        """Update a known block and return the hash."""
1460

    
1461
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1462
        if offset == 0 and len(data) == self.block_size:
1463
            return self.put_block(data)
1464
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1465
        return binascii.hexlify(h)
1466

    
1467
    # Path functions.
1468

    
1469
    def _generate_uuid(self):
1470
        return str(uuidlib.uuid4())
1471

    
1472
    def _put_object_node(self, path, parent, name):
1473
        path = '/'.join((path, name))
1474
        node = self.node.node_lookup(path)
1475
        if node is None:
1476
            node = self.node.node_create(parent, path)
1477
        return path, node
1478

    
1479
    def _put_path(self, user, parent, path,
1480
                  update_statistics_ancestors_depth=None):
1481
        node = self.node.node_create(parent, path)
1482
        self.node.version_create(node, None, 0, '', None, user,
1483
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1484
                                 update_statistics_ancestors_depth)
1485
        return node
1486

    
1487
    def _lookup_account(self, account, create=True):
1488
        node = self.node.node_lookup(account)
1489
        if node is None and create:
1490
            node = self._put_path(
1491
                account, self.ROOTNODE, account,
1492
                update_statistics_ancestors_depth=-1)  # User is account.
1493
        return account, node
1494

    
1495
    def _lookup_container(self, account, container):
1496
        for_update = True if self.lock_container_path else False
1497
        path = '/'.join((account, container))
1498
        node = self.node.node_lookup(path, for_update)
1499
        if node is None:
1500
            raise ItemNotExists('Container does not exist')
1501
        return path, node
1502

    
1503
    def _lookup_object(self, account, container, name, lock_container=False):
1504
        if lock_container:
1505
            self._lookup_container(account, container)
1506

    
1507
        path = '/'.join((account, container, name))
1508
        node = self.node.node_lookup(path)
1509
        if node is None:
1510
            raise ItemNotExists('Object does not exist')
1511
        return path, node
1512

    
1513
    def _lookup_objects(self, paths):
1514
        nodes = self.node.node_lookup_bulk(paths)
1515
        return paths, nodes
1516

    
1517
    def _get_properties(self, node, until=None):
1518
        """Return properties until the timestamp given."""
1519

    
1520
        before = until if until is not None else inf
1521
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1522
        if props is None and until is not None:
1523
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1524
        if props is None:
1525
            raise ItemNotExists('Path does not exist')
1526
        return props
1527

    
1528
    def _get_statistics(self, node, until=None, compute=False):
1529
        """Return (count, sum of size, timestamp) of everything under node."""
1530

    
1531
        if until is not None:
1532
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1533
        elif compute:
1534
            stats = self.node.statistics_latest(node,
1535
                                                except_cluster=CLUSTER_DELETED)
1536
        else:
1537
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1538
        if stats is None:
1539
            stats = (0, 0, 0)
1540
        return stats
1541

    
1542
    def _get_version(self, node, version=None):
1543
        if version is None:
1544
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1545
            if props is None:
1546
                raise ItemNotExists('Object does not exist')
1547
        else:
1548
            try:
1549
                version = int(version)
1550
            except ValueError:
1551
                raise VersionNotExists('Version does not exist')
1552
            props = self.node.version_get_properties(version, node=node)
1553
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1554
                raise VersionNotExists('Version does not exist')
1555
        return props
1556

    
1557
    def _get_versions(self, nodes):
1558
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1559

    
1560
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1561
                               type=None, hash=None, checksum=None,
1562
                               cluster=CLUSTER_NORMAL, is_copy=False,
1563
                               update_statistics_ancestors_depth=None):
1564
        """Create a new version of the node."""
1565

    
1566
        props = self.node.version_lookup(
1567
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1568
        if props is not None:
1569
            src_version_id = props[self.SERIAL]
1570
            src_hash = props[self.HASH]
1571
            src_size = props[self.SIZE]
1572
            src_type = props[self.TYPE]
1573
            src_checksum = props[self.CHECKSUM]
1574
        else:
1575
            src_version_id = None
1576
            src_hash = None
1577
            src_size = 0
1578
            src_type = ''
1579
            src_checksum = ''
1580
        if size is None:  # Set metadata.
1581
            hash = src_hash  # This way hash can be set to None
1582
                             # (account or container).
1583
            size = src_size
1584
        if type is None:
1585
            type = src_type
1586
        if checksum is None:
1587
            checksum = src_checksum
1588
        uuid = self._generate_uuid(
1589
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1590

    
1591
        if src_node is None:
1592
            pre_version_id = src_version_id
1593
        else:
1594
            pre_version_id = None
1595
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1596
            if props is not None:
1597
                pre_version_id = props[self.SERIAL]
1598
        if pre_version_id is not None:
1599
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1600
                                        update_statistics_ancestors_depth)
1601

    
1602
        dest_version_id, mtime = self.node.version_create(
1603
            node, hash, size, type, src_version_id, user, uuid, checksum,
1604
            cluster, update_statistics_ancestors_depth)
1605

    
1606
        self.node.attribute_unset_is_latest(node, dest_version_id)
1607

    
1608
        return pre_version_id, dest_version_id
1609

    
1610
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1611
                                node, meta, replace=False):
1612
        if src_version_id is not None:
1613
            self.node.attribute_copy(src_version_id, dest_version_id)
1614
        if not replace:
1615
            self.node.attribute_del(dest_version_id, domain, (
1616
                k for k, v in meta.iteritems() if v == ''))
1617
            self.node.attribute_set(dest_version_id, domain, node, (
1618
                (k, v) for k, v in meta.iteritems() if v != ''))
1619
        else:
1620
            self.node.attribute_del(dest_version_id, domain)
1621
            self.node.attribute_set(dest_version_id, domain, node, ((
1622
                k, v) for k, v in meta.iteritems()))
1623

    
1624
    def _put_metadata(self, user, node, domain, meta, replace=False,
1625
                      update_statistics_ancestors_depth=None):
1626
        """Create a new version and store metadata."""
1627

    
1628
        src_version_id, dest_version_id = self._put_version_duplicate(
1629
            user, node,
1630
            update_statistics_ancestors_depth=
1631
            update_statistics_ancestors_depth)
1632
        self._put_metadata_duplicate(
1633
            src_version_id, dest_version_id, domain, node, meta, replace)
1634
        return src_version_id, dest_version_id
1635

    
1636
    def _list_limits(self, listing, marker, limit):
1637
        start = 0
1638
        if marker:
1639
            try:
1640
                start = listing.index(marker) + 1
1641
            except ValueError:
1642
                pass
1643
        if not limit or limit > 10000:
1644
            limit = 10000
1645
        return start, limit
1646

    
1647
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1648
                                marker=None, limit=10000, virtual=True,
1649
                                domain=None, keys=None, until=None,
1650
                                size_range=None, allowed=None,
1651
                                all_props=False):
1652
        keys = keys or []
1653
        allowed = allowed or []
1654
        cont_prefix = path + '/'
1655
        prefix = cont_prefix + prefix
1656
        start = cont_prefix + marker if marker else None
1657
        before = until if until is not None else inf
1658
        filterq = keys if domain else []
1659
        sizeq = size_range
1660

    
1661
        objects, prefixes = self.node.latest_version_list(
1662
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1663
            allowed, domain, filterq, sizeq, all_props)
1664
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1665
        objects.sort(key=lambda x: x[0])
1666
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1667
        return objects
1668

    
1669
    # Reporting functions.
1670

    
1671
    @debug_method
1672
    @backend_method
1673
    def _report_size_change(self, user, account, size, details=None):
1674
        details = details or {}
1675

    
1676
        if size == 0:
1677
            return
1678

    
1679
        account_node = self._lookup_account(account, True)[1]
1680
        total = self._get_statistics(account_node, compute=True)[1]
1681
        details.update({'user': user, 'total': total})
1682
        self.messages.append(
1683
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1684
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1685

    
1686
        if not self.using_external_quotaholder:
1687
            return
1688

    
1689
        try:
1690
            name = details['path'] if 'path' in details else ''
1691
            serial = self.astakosclient.issue_one_commission(
1692
                holder=account,
1693
                source=DEFAULT_SOURCE,
1694
                provisions={'pithos.diskspace': size},
1695
                name=name)
1696
        except BaseException, e:
1697
            raise QuotaError(e)
1698
        else:
1699
            self.serials.append(serial)
1700

    
1701
    @debug_method
1702
    @backend_method
1703
    def _report_object_change(self, user, account, path, details=None):
1704
        details = details or {}
1705
        details.update({'user': user})
1706
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1707
                              account, QUEUE_INSTANCE_ID, 'object', path,
1708
                              details))
1709

    
1710
    @debug_method
1711
    @backend_method
1712
    def _report_sharing_change(self, user, account, path, details=None):
1713
        details = details or {}
1714
        details.update({'user': user})
1715
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1716
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1717
                              details))
1718

    
1719
    # Policy functions.
1720

    
1721
    def _check_policy(self, policy, is_account_policy=True):
1722
        default_policy = self.default_account_policy \
1723
            if is_account_policy else self.default_container_policy
1724
        for k in policy.keys():
1725
            if policy[k] == '':
1726
                policy[k] = default_policy.get(k)
1727
        for k, v in policy.iteritems():
1728
            if k == 'quota':
1729
                q = int(v)  # May raise ValueError.
1730
                if q < 0:
1731
                    raise ValueError
1732
            elif k == 'versioning':
1733
                if v not in ['auto', 'none']:
1734
                    raise ValueError
1735
            else:
1736
                raise ValueError
1737

    
1738
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1739
        default_policy = self.default_account_policy \
1740
            if is_account_policy else self.default_container_policy
1741
        if replace:
1742
            for k, v in default_policy.iteritems():
1743
                if k not in policy:
1744
                    policy[k] = v
1745
        self.node.policy_set(node, policy)
1746

    
1747
    def _get_policy(self, node, is_account_policy=True):
1748
        default_policy = self.default_account_policy \
1749
            if is_account_policy else self.default_container_policy
1750
        policy = default_policy.copy()
1751
        policy.update(self.node.policy_get(node))
1752
        return policy
1753

    
1754
    def _apply_versioning(self, account, container, version_id,
1755
                          update_statistics_ancestors_depth=None):
1756
        """Delete the provided version if such is the policy.
1757
           Return size of object removed.
1758
        """
1759

    
1760
        if version_id is None:
1761
            return 0
1762
        path, node = self._lookup_container(account, container)
1763
        versioning = self._get_policy(
1764
            node, is_account_policy=False)['versioning']
1765
        if versioning != 'auto':
1766
            hash, size = self.node.version_remove(
1767
                version_id, update_statistics_ancestors_depth)
1768
            self.store.map_delete(hash)
1769
            return size
1770
        elif self.free_versioning:
1771
            return self.node.version_get_properties(
1772
                version_id, keys=('size',))[0]
1773
        return 0
1774

    
1775
    # Access control functions.
1776

    
1777
    def _check_groups(self, groups):
1778
        # raise ValueError('Bad characters in groups')
1779
        pass
1780

    
1781
    def _check_permissions(self, path, permissions):
1782
        # raise ValueError('Bad characters in permissions')
1783
        pass
1784

    
1785
    def _get_formatted_paths(self, paths):
1786
        formatted = []
1787
        if len(paths) == 0:
1788
            return formatted
1789
        props = self.node.get_props(paths)
1790
        if props:
1791
            for prop in props:
1792
                if prop[1].split(';', 1)[0].strip() in (
1793
                        'application/directory', 'application/folder'):
1794
                    formatted.append((prop[0].rstrip('/') + '/',
1795
                                      self.MATCH_PREFIX))
1796
                formatted.append((prop[0], self.MATCH_EXACT))
1797
        return formatted
1798

    
1799
    def _get_permissions_path(self, account, container, name):
1800
        path = '/'.join((account, container, name))
1801
        permission_paths = self.permissions.access_inherit(path)
1802
        permission_paths.sort()
1803
        permission_paths.reverse()
1804
        for p in permission_paths:
1805
            if p == path:
1806
                return p
1807
            else:
1808
                if p.count('/') < 2:
1809
                    continue
1810
                node = self.node.node_lookup(p)
1811
                props = None
1812
                if node is not None:
1813
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1814
                if props is not None:
1815
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1816
                            'application/directory', 'application/folder'):
1817
                        return p
1818
        return None
1819

    
1820
    def _get_permissions_path_bulk(self, account, container, names):
1821
        formatted_paths = []
1822
        for name in names:
1823
            path = '/'.join((account, container, name))
1824
            formatted_paths.append(path)
1825
        permission_paths = self.permissions.access_inherit_bulk(
1826
            formatted_paths)
1827
        permission_paths.sort()
1828
        permission_paths.reverse()
1829
        permission_paths_list = []
1830
        lookup_list = []
1831
        for p in permission_paths:
1832
            if p in formatted_paths:
1833
                permission_paths_list.append(p)
1834
            else:
1835
                if p.count('/') < 2:
1836
                    continue
1837
                lookup_list.append(p)
1838

    
1839
        if len(lookup_list) > 0:
1840
            props = self.node.get_props(lookup_list)
1841
            if props:
1842
                for prop in props:
1843
                    if prop[1].split(';', 1)[0].strip() in (
1844
                            'application/directory', 'application/folder'):
1845
                        permission_paths_list.append(prop[0])
1846

    
1847
        if len(permission_paths_list) > 0:
1848
            return permission_paths_list
1849

    
1850
        return None
1851

    
1852
    def _reset_allowed_paths(self):
1853
        self.read_allowed_paths = defaultdict(set)
1854
        self.write_allowed_paths = defaultdict(set)
1855

    
1856
    @check_allowed_paths(action=0)
1857
    def _can_read_account(self, user, account):
1858
        if user != account:
1859
            if account not in self._allowed_accounts(user):
1860
                raise NotAllowedError
1861

    
1862
    @check_allowed_paths(action=1)
1863
    def _can_write_account(self, user, account):
1864
        if user != account:
1865
            raise NotAllowedError
1866

    
1867
    @check_allowed_paths(action=0)
1868
    def _can_read_container(self, user, account, container):
1869
        if user != account:
1870
            if container not in self._allowed_containers(user, account):
1871
                raise NotAllowedError
1872

    
1873
    @check_allowed_paths(action=1)
1874
    def _can_write_container(self, user, account, container):
1875
        if user != account:
1876
            raise NotAllowedError
1877

    
1878
    @check_allowed_paths(action=0)
1879
    def _can_read_object(self, user, account, container, name):
1880
        if user == account:
1881
            return True
1882
        path = '/'.join((account, container, name))
1883
        if self.permissions.public_get(path) is not None:
1884
            return True
1885
        path = self._get_permissions_path(account, container, name)
1886
        if not path:
1887
            raise NotAllowedError
1888
        if (not self.permissions.access_check(path, self.READ, user) and not
1889
                self.permissions.access_check(path, self.WRITE, user)):
1890
            raise NotAllowedError
1891

    
1892
    @check_allowed_paths(action=1)
1893
    def _can_write_object(self, user, account, container, name):
1894
        if user == account:
1895
            return True
1896
        path = '/'.join((account, container, name))
1897
        path = self._get_permissions_path(account, container, name)
1898
        if not path:
1899
            raise NotAllowedError
1900
        if not self.permissions.access_check(path, self.WRITE, user):
1901
            raise NotAllowedError
1902

    
1903
    def _allowed_accounts(self, user):
1904
        allow = set()
1905
        for path in self.permissions.access_list_paths(user):
1906
            p = path.split('/', 1)[0]
1907
            allow.add(p)
1908
        self.read_allowed_paths[user] |= allow
1909
        return sorted(allow)
1910

    
1911
    def _allowed_containers(self, user, account):
1912
        allow = set()
1913
        for path in self.permissions.access_list_paths(user, account):
1914
            p = path.split('/', 2)[1]
1915
            allow.add(p)
1916
        self.read_allowed_paths[user] |= allow
1917
        return sorted(allow)
1918

    
1919
    # Domain functions
1920

    
1921
    @debug_method
1922
    @backend_method
1923
    def get_domain_objects(self, domain, user=None):
1924
        allowed_paths = self.permissions.access_list_paths(
1925
            user, include_owned=user is not None, include_containers=False)
1926
        if not allowed_paths:
1927
            return []
1928
        obj_list = self.node.domain_object_list(
1929
            domain, allowed_paths, CLUSTER_NORMAL)
1930
        return [(path,
1931
                 self._build_metadata(props, user_defined_meta),
1932
                 self.permissions.access_get(path)) for
1933
                path, props, user_defined_meta in obj_list]
1934

    
1935
    # util functions
1936

    
1937
    def _build_metadata(self, props, user_defined=None,
1938
                        include_user_defined=True):
1939
        meta = {'bytes': props[self.SIZE],
1940
                'type': props[self.TYPE],
1941
                'hash': props[self.HASH],
1942
                'version': props[self.SERIAL],
1943
                'version_timestamp': props[self.MTIME],
1944
                'modified_by': props[self.MUSER],
1945
                'uuid': props[self.UUID],
1946
                'checksum': props[self.CHECKSUM]}
1947
        if include_user_defined and user_defined is not None:
1948
            meta.update(user_defined)
1949
        return meta
1950

    
1951
    def _exists(self, node):
1952
        try:
1953
            self._get_version(node)
1954
        except ItemNotExists:
1955
            return False
1956
        else:
1957
            return True
1958

    
1959
    def _unhexlify_hash(self, hash):
1960
        try:
1961
            return binascii.unhexlify(hash)
1962
        except TypeError:
1963
            raise InvalidHash(hash)