Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ ebdbac7a

History | View | Annotate | Download (77.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    @list_method
388
    def list_accounts(self, user, marker=None, limit=10000):
389
        """Return a list of accounts the user can access."""
390

    
391
        return self._allowed_accounts(user)
392

    
393
    def _get_account_quotas(self, account):
394
        """Get account usage from astakos."""
395

    
396
        quotas = self.astakosclient.service_get_quotas(account)[account]
397
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
398
                                                  {})
399

    
400
    def _get_account_quotas(self, account):
401
        """Get account usage from astakos."""
402

    
403
        quotas = self.astakosclient.service_get_quotas(account)[account]
404
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
405
                                                  {})
406

    
407
    @debug_method
408
    @backend_method
409
    def get_account_meta(self, user, account, domain, until=None,
410
                         include_user_defined=True):
411
        """Return a dictionary with the account metadata for the domain."""
412

    
413
        self._can_read_account(user, account)
414
        path, node = self._lookup_account(account, user == account)
415
        if user != account:
416
            if until or (node is None):
417
                raise NotAllowedError
418
        try:
419
            props = self._get_properties(node, until)
420
            mtime = props[self.MTIME]
421
        except NameError:
422
            props = None
423
            mtime = until
424
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
425
        tstamp = max(tstamp, mtime)
426
        if until is None:
427
            modified = tstamp
428
        else:
429
            modified = self._get_statistics(
430
                node, compute=True)[2]  # Overall last modification.
431
            modified = max(modified, mtime)
432

    
433
        if user != account:
434
            meta = {'name': account}
435
        else:
436
            meta = {}
437
            if props is not None and include_user_defined:
438
                meta.update(
439
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
440
            if until is not None:
441
                meta.update({'until_timestamp': tstamp})
442
            meta.update({'name': account, 'count': count, 'bytes': bytes})
443
            if self.using_external_quotaholder:
444
                external_quota = self._get_account_quotas(account)
445
                meta['bytes'] = external_quota.get('usage', 0)
446
        meta.update({'modified': modified})
447
        return meta
448

    
449
    @debug_method
450
    @backend_method
451
    def update_account_meta(self, user, account, domain, meta, replace=False):
452
        """Update the metadata associated with the account for the domain."""
453

    
454
        self._can_write_account(user, account)
455
        path, node = self._lookup_account(account, True)
456
        self._put_metadata(user, node, domain, meta, replace,
457
                           update_statistics_ancestors_depth=-1)
458

    
459
    @debug_method
460
    @backend_method
461
    def get_account_groups(self, user, account):
462
        """Return a dictionary with the user groups defined for the account."""
463

    
464
        self._can_read_account(user, account)
465
        if user != account:
466
            return {}
467
        self._lookup_account(account, True)
468
        return self.permissions.group_dict(account)
469

    
470
    @debug_method
471
    @backend_method
472
    def update_account_groups(self, user, account, groups, replace=False):
473
        """Update the groups associated with the account."""
474

    
475
        self._can_write_account(user, account)
476
        self._lookup_account(account, True)
477
        self._check_groups(groups)
478
        if replace:
479
            self.permissions.group_destroy(account)
480
        for k, v in groups.iteritems():
481
            if not replace:  # If not already deleted.
482
                self.permissions.group_delete(account, k)
483
            if v:
484
                self.permissions.group_addmany(account, k, v)
485

    
486
    @debug_method
487
    @backend_method
488
    def get_account_policy(self, user, account):
489
        """Return a dictionary with the account policy."""
490

    
491
        self._can_read_account(user, account)
492
        if user != account:
493
            return {}
494
        path, node = self._lookup_account(account, True)
495
        policy = self._get_policy(node, is_account_policy=True)
496
        if self.using_external_quotaholder:
497
            external_quota = self._get_account_quotas(account)
498
            policy['quota'] = external_quota.get('limit', 0)
499
        return policy
500

    
501
    @debug_method
502
    @backend_method
503
    def update_account_policy(self, user, account, policy, replace=False):
504
        """Update the policy associated with the account."""
505

    
506
        self._can_write_account(user, account)
507
        path, node = self._lookup_account(account, True)
508
        self._check_policy(policy, is_account_policy=True)
509
        self._put_policy(node, policy, replace, is_account_policy=True)
510

    
511
    @debug_method
512
    @backend_method
513
    def put_account(self, user, account, policy=None):
514
        """Create a new account with the given name."""
515

    
516
        policy = policy or {}
517
        self._can_write_account(user, account)
518
        node = self.node.node_lookup(account)
519
        if node is not None:
520
            raise AccountExists('Account already exists')
521
        if policy:
522
            self._check_policy(policy, is_account_policy=True)
523
        node = self._put_path(user, self.ROOTNODE, account,
524
                              update_statistics_ancestors_depth=-1)
525
        self._put_policy(node, policy, True, is_account_policy=True)
526

    
527
    @debug_method
528
    @backend_method
529
    def delete_account(self, user, account):
530
        """Delete the account with the given name."""
531

    
532
        self._can_write_account(user, account)
533
        node = self.node.node_lookup(account)
534
        if node is None:
535
            return
536
        if not self.node.node_remove(node,
537
                                     update_statistics_ancestors_depth=-1):
538
            raise AccountNotEmpty('Account is not empty')
539
        self.permissions.group_destroy(account)
540

    
541
        # remove all the cached allowed paths
542
        # removing the specific path could be more expensive
543
        self._reset_allowed_paths()
544

    
545
    @debug_method
546
    @backend_method
547
    @list_method
548
    def list_containers(self, user, account, marker=None, limit=10000,
549
                        shared=False, until=None, public=False):
550
        """Return a list of containers existing under an account."""
551

    
552
        self._can_read_account(user, account)
553
        if user != account:
554
            if until:
555
                raise NotAllowedError
556
            return self._allowed_containers(user, account)
557
        if shared or public:
558
            allowed = set()
559
            if shared:
560
                allowed.update([x.split('/', 2)[1] for x in
561
                               self.permissions.access_list_shared(account)])
562
            if public:
563
                allowed.update([x[0].split('/', 2)[1] for x in
564
                               self.permissions.public_list(account)])
565
            return sorted(allowed)
566
        node = self.node.node_lookup(account)
567
        return [x[0] for x in self._list_object_properties(
568
            node, account, '', '/', marker, limit, False, None, [], until)]
569

    
570
    @debug_method
571
    @backend_method
572
    def list_container_meta(self, user, account, container, domain,
573
                            until=None):
574
        """Return a list of the container's object meta keys for a domain."""
575

    
576
        self._can_read_container(user, account, container)
577
        allowed = []
578
        if user != account:
579
            if until:
580
                raise NotAllowedError
581
        path, node = self._lookup_container(account, container)
582
        before = until if until is not None else inf
583
        allowed = self._get_formatted_paths(allowed)
584
        return self.node.latest_attribute_keys(node, domain, before,
585
                                               CLUSTER_DELETED, allowed)
586

    
587
    @debug_method
588
    @backend_method
589
    def get_container_meta(self, user, account, container, domain, until=None,
590
                           include_user_defined=True):
591
        """Return a dictionary with the container metadata for the domain."""
592

    
593
        self._can_read_container(user, account, container)
594
        if user != account:
595
            if until:
596
                raise NotAllowedError
597
        path, node = self._lookup_container(account, container)
598
        props = self._get_properties(node, until)
599
        mtime = props[self.MTIME]
600
        count, bytes, tstamp = self._get_statistics(node, until)
601
        tstamp = max(tstamp, mtime)
602
        if until is None:
603
            modified = tstamp
604
        else:
605
            modified = self._get_statistics(
606
                node)[2]  # Overall last modification.
607
            modified = max(modified, mtime)
608

    
609
        if user != account:
610
            meta = {'name': container}
611
        else:
612
            meta = {}
613
            if include_user_defined:
614
                meta.update(
615
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
616
            if until is not None:
617
                meta.update({'until_timestamp': tstamp})
618
            meta.update({'name': container, 'count': count, 'bytes': bytes})
619
        meta.update({'modified': modified})
620
        return meta
621

    
622
    @debug_method
623
    @backend_method
624
    def update_container_meta(self, user, account, container, domain, meta,
625
                              replace=False):
626
        """Update the metadata associated with the container for the domain."""
627

    
628
        self._can_write_container(user, account, container)
629
        path, node = self._lookup_container(account, container)
630
        src_version_id, dest_version_id = self._put_metadata(
631
            user, node, domain, meta, replace,
632
            update_statistics_ancestors_depth=0)
633
        if src_version_id is not None:
634
            versioning = self._get_policy(
635
                node, is_account_policy=False)['versioning']
636
            if versioning != 'auto':
637
                self.node.version_remove(src_version_id,
638
                                         update_statistics_ancestors_depth=0)
639

    
640
    @debug_method
641
    @backend_method
642
    def get_container_policy(self, user, account, container):
643
        """Return a dictionary with the container policy."""
644

    
645
        self._can_read_container(user, account, container)
646
        if user != account:
647
            return {}
648
        path, node = self._lookup_container(account, container)
649
        return self._get_policy(node, is_account_policy=False)
650

    
651
    @debug_method
652
    @backend_method
653
    def update_container_policy(self, user, account, container, policy,
654
                                replace=False):
655
        """Update the policy associated with the container."""
656

    
657
        self._can_write_container(user, account, container)
658
        path, node = self._lookup_container(account, container)
659
        self._check_policy(policy, is_account_policy=False)
660
        self._put_policy(node, policy, replace, is_account_policy=False)
661

    
662
    @debug_method
663
    @backend_method
664
    def put_container(self, user, account, container, policy=None):
665
        """Create a new container with the given name."""
666

    
667
        policy = policy or {}
668
        self._can_write_container(user, account, container)
669
        try:
670
            path, node = self._lookup_container(account, container)
671
        except NameError:
672
            pass
673
        else:
674
            raise ContainerExists('Container already exists')
675
        if policy:
676
            self._check_policy(policy, is_account_policy=False)
677
        path = '/'.join((account, container))
678
        node = self._put_path(
679
            user, self._lookup_account(account, True)[1], path,
680
            update_statistics_ancestors_depth=-1)
681
        self._put_policy(node, policy, True, is_account_policy=False)
682

    
683
    @debug_method
684
    @backend_method
685
    def delete_container(self, user, account, container, until=None, prefix='',
686
                         delimiter=None):
687
        """Delete/purge the container with the given name."""
688

    
689
        self._can_write_container(user, account, container)
690
        path, node = self._lookup_container(account, container)
691

    
692
        if until is not None:
693
            hashes, size, serials = self.node.node_purge_children(
694
                node, until, CLUSTER_HISTORY,
695
                update_statistics_ancestors_depth=0)
696
            for h in hashes:
697
                self.store.map_delete(h)
698
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
699
                                          update_statistics_ancestors_depth=0)
700
            if not self.free_versioning:
701
                self._report_size_change(
702
                    user, account, -size, {
703
                        'action': 'container purge',
704
                        'path': path,
705
                        'versions': ','.join(str(i) for i in serials)
706
                    }
707
                )
708
            return
709

    
710
        if not delimiter:
711
            if self._get_statistics(node)[0] > 0:
712
                raise ContainerNotEmpty('Container is not empty')
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, inf, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
721
            if not self.free_versioning:
722
                self._report_size_change(
723
                    user, account, -size, {
724
                        'action': 'container purge',
725
                        'path': path,
726
                        'versions': ','.join(str(i) for i in serials)
727
                    }
728
                )
729
        else:
730
            # remove only contents
731
            src_names = self._list_objects_no_limit(
732
                user, account, container, prefix='', delimiter=None,
733
                virtual=False, domain=None, keys=[], shared=False, until=None,
734
                size_range=None, all_props=True, public=False)
735
            paths = []
736
            for t in src_names:
737
                path = '/'.join((account, container, t[0]))
738
                node = t[2]
739
                if not self._exists(node):
740
                    continue
741
                src_version_id, dest_version_id = self._put_version_duplicate(
742
                    user, node, size=0, type='', hash=None, checksum='',
743
                    cluster=CLUSTER_DELETED,
744
                    update_statistics_ancestors_depth=1)
745
                del_size = self._apply_versioning(
746
                    account, container, src_version_id,
747
                    update_statistics_ancestors_depth=1)
748
                self._report_size_change(
749
                    user, account, -del_size, {
750
                        'action': 'object delete',
751
                        'path': path,
752
                        'versions': ','.join([str(dest_version_id)])})
753
                self._report_object_change(
754
                    user, account, path, details={'action': 'object delete'})
755
                paths.append(path)
756
            self.permissions.access_clear_bulk(paths)
757

    
758
        # remove all the cached allowed paths
759
        # removing the specific path could be more expensive
760
        self._reset_allowed_paths()
761

    
762
    def _list_objects(self, user, account, container, prefix, delimiter,
763
                      marker, limit, virtual, domain, keys, shared, until,
764
                      size_range, all_props, public):
765
        if user != account and until:
766
            raise NotAllowedError
767

    
768
        objects = []
769
        if shared and public:
770
            # get shared first
771
            shared_paths = self._list_object_permissions(
772
                user, account, container, prefix, shared=True, public=False)
773
            if shared_paths:
774
                path, node = self._lookup_container(account, container)
775
                shared_paths = self._get_formatted_paths(shared_paths)
776
                objects = set(self._list_object_properties(
777
                    node, path, prefix, delimiter, marker, limit, virtual,
778
                    domain, keys, until, size_range, shared_paths, all_props))
779

    
780
            # get public
781
            objects |= set(self._list_public_object_properties(
782
                user, account, container, prefix, all_props))
783
            objects = list(objects)
784

    
785
            objects.sort(key=lambda x: x[0])
786
        elif public:
787
            objects = self._list_public_object_properties(
788
                user, account, container, prefix, all_props)
789
        else:
790
            allowed = self._list_object_permissions(
791
                user, account, container, prefix, shared, public=False)
792
            if shared and not allowed:
793
                return []
794
            path, node = self._lookup_container(account, container)
795
            allowed = self._get_formatted_paths(allowed)
796
            objects = self._list_object_properties(
797
                node, path, prefix, delimiter, marker, limit, virtual, domain,
798
                keys, until, size_range, allowed, all_props)
799

    
800
        # apply limits
801
        start, limit = self._list_limits(objects, marker, limit)
802
        return objects[start:start + limit]
803

    
804
    def _list_public_object_properties(self, user, account, container, prefix,
805
                                       all_props):
806
        public = self._list_object_permissions(
807
            user, account, container, prefix, shared=False, public=True)
808
        paths, nodes = self._lookup_objects(public)
809
        path = '/'.join((account, container))
810
        cont_prefix = path + '/'
811
        paths = [x[len(cont_prefix):] for x in paths]
812
        objects = [(p,) + props for p, props in
813
                   zip(paths, self.node.version_lookup_bulk(
814
                       nodes, all_props=all_props, order_by_path=True))]
815
        return objects
816

    
817
    def _list_objects_no_limit(self, user, account, container, prefix,
818
                               delimiter, virtual, domain, keys, shared, until,
819
                               size_range, all_props, public):
820
        objects = []
821
        while True:
822
            marker = objects[-1] if objects else None
823
            limit = 10000
824
            l = self._list_objects(
825
                user, account, container, prefix, delimiter, marker, limit,
826
                virtual, domain, keys, shared, until, size_range, all_props,
827
                public)
828
            objects.extend(l)
829
            if not l or len(l) < limit:
830
                break
831
        return objects
832

    
833
    def _list_object_permissions(self, user, account, container, prefix,
834
                                 shared, public):
835
        allowed = []
836
        path = '/'.join((account, container, prefix)).rstrip('/')
837
        if user != account:
838
            allowed = self.permissions.access_list_paths(user, path)
839
            if not allowed:
840
                raise NotAllowedError
841
        else:
842
            allowed = set()
843
            if shared:
844
                allowed.update(self.permissions.access_list_shared(path))
845
            if public:
846
                allowed.update(
847
                    [x[0] for x in self.permissions.public_list(path)])
848
            allowed = sorted(allowed)
849
            if not allowed:
850
                return []
851
        return allowed
852

    
853
    @debug_method
854
    @backend_method
855
    def list_objects(self, user, account, container, prefix='', delimiter=None,
856
                     marker=None, limit=10000, virtual=True, domain=None,
857
                     keys=None, shared=False, until=None, size_range=None,
858
                     public=False):
859
        """List (object name, object version_id) under a container."""
860

    
861
        keys = keys or []
862
        return self._list_objects(
863
            user, account, container, prefix, delimiter, marker, limit,
864
            virtual, domain, keys, shared, until, size_range, False, public)
865

    
866
    @debug_method
867
    @backend_method
868
    def list_object_meta(self, user, account, container, prefix='',
869
                         delimiter=None, marker=None, limit=10000,
870
                         virtual=True, domain=None, keys=None, shared=False,
871
                         until=None, size_range=None, public=False):
872
        """Return a list of metadata dicts of objects under a container."""
873

    
874
        keys = keys or []
875
        props = self._list_objects(
876
            user, account, container, prefix, delimiter, marker, limit,
877
            virtual, domain, keys, shared, until, size_range, True, public)
878
        objects = []
879
        for p in props:
880
            if len(p) == 2:
881
                objects.append({'subdir': p[0]})
882
            else:
883
                objects.append({
884
                    'name': p[0],
885
                    'bytes': p[self.SIZE + 1],
886
                    'type': p[self.TYPE + 1],
887
                    'hash': p[self.HASH + 1],
888
                    'version': p[self.SERIAL + 1],
889
                    'version_timestamp': p[self.MTIME + 1],
890
                    'modified': p[self.MTIME + 1] if until is None else None,
891
                    'modified_by': p[self.MUSER + 1],
892
                    'uuid': p[self.UUID + 1],
893
                    'checksum': p[self.CHECKSUM + 1]})
894
        return objects
895

    
896
    @debug_method
897
    @backend_method
898
    def list_object_permissions(self, user, account, container, prefix=''):
899
        """Return a list of paths enforce permissions under a container."""
900

    
901
        return self._list_object_permissions(user, account, container, prefix,
902
                                             True, False)
903

    
904
    @debug_method
905
    @backend_method
906
    def list_object_public(self, user, account, container, prefix=''):
907
        """Return a mapping of object paths to public ids under a container."""
908

    
909
        public = {}
910
        for path, p in self.permissions.public_list('/'.join((account,
911
                                                              container,
912
                                                              prefix))):
913
            public[path] = p
914
        return public
915

    
916
    @debug_method
917
    @backend_method
918
    def get_object_meta(self, user, account, container, name, domain,
919
                        version=None, include_user_defined=True):
920
        """Return a dictionary with the object metadata for the domain."""
921

    
922
        self._can_read_object(user, account, container, name)
923
        path, node = self._lookup_object(account, container, name)
924
        props = self._get_version(node, version)
925
        if version is None:
926
            modified = props[self.MTIME]
927
        else:
928
            try:
929
                modified = self._get_version(
930
                    node)[self.MTIME]  # Overall last modification.
931
            except NameError:  # Object may be deleted.
932
                del_props = self.node.version_lookup(
933
                    node, inf, CLUSTER_DELETED)
934
                if del_props is None:
935
                    raise ItemNotExists('Object does not exist')
936
                modified = del_props[self.MTIME]
937

    
938
        meta = {}
939
        if include_user_defined:
940
            meta.update(
941
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
942
        meta.update({'name': name,
943
                     'bytes': props[self.SIZE],
944
                     'type': props[self.TYPE],
945
                     'hash': props[self.HASH],
946
                     'version': props[self.SERIAL],
947
                     'version_timestamp': props[self.MTIME],
948
                     'modified': modified,
949
                     'modified_by': props[self.MUSER],
950
                     'uuid': props[self.UUID],
951
                     'checksum': props[self.CHECKSUM]})
952
        return meta
953

    
954
    @debug_method
955
    @backend_method
956
    def update_object_meta(self, user, account, container, name, domain, meta,
957
                           replace=False):
958
        """Update object metadata for a domain and return the new version."""
959

    
960
        self._can_write_object(user, account, container, name)
961

    
962
        path, node = self._lookup_object(account, container, name,
963
                                         lock_container=True)
964
        src_version_id, dest_version_id = self._put_metadata(
965
            user, node, domain, meta, replace,
966
            update_statistics_ancestors_depth=1)
967
        self._apply_versioning(account, container, src_version_id,
968
                               update_statistics_ancestors_depth=1)
969
        return dest_version_id
970

    
971
    @debug_method
972
    @backend_method
973
    def get_object_permissions_bulk(self, user, account, container, names):
974
        """Return the action allowed on the object, the path
975
        from which the object gets its permissions from,
976
        along with a dictionary containing the permissions."""
977

    
978
        permissions_path = self._get_permissions_path_bulk(account, container,
979
                                                           names)
980
        access_objects = self.permissions.access_check_bulk(permissions_path,
981
                                                            user)
982
        #group_parents = access_objects['group_parents']
983
        nobject_permissions = {}
984
        cpath = '/'.join((account, container, ''))
985
        cpath_idx = len(cpath)
986
        for path in permissions_path:
987
            allowed = 1
988
            name = path[cpath_idx:]
989
            if user != account:
990
                try:
991
                    allowed = access_objects[path]
992
                except KeyError:
993
                    raise NotAllowedError
994
            access_dict, allowed = \
995
                self.permissions.access_get_for_bulk(access_objects[path])
996
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
997
                                         access_dict)
998
        self._lookup_objects(permissions_path)
999
        return nobject_permissions
1000

    
1001
    @debug_method
1002
    @backend_method
1003
    def get_object_permissions(self, user, account, container, name):
1004
        """Return the action allowed on the object, the path
1005
        from which the object gets its permissions from,
1006
        along with a dictionary containing the permissions."""
1007

    
1008
        allowed = 'write'
1009
        permissions_path = self._get_permissions_path(account, container, name)
1010
        if user != account:
1011
            if self.permissions.access_check(permissions_path, self.WRITE,
1012
                                             user):
1013
                allowed = 'write'
1014
            elif self.permissions.access_check(permissions_path, self.READ,
1015
                                               user):
1016
                allowed = 'read'
1017
            else:
1018
                raise NotAllowedError
1019
        self._lookup_object(account, container, name)
1020
        return (allowed,
1021
                permissions_path,
1022
                self.permissions.access_get(permissions_path))
1023

    
1024
    @debug_method
1025
    @backend_method
1026
    def update_object_permissions(self, user, account, container, name,
1027
                                  permissions):
1028
        """Update the permissions associated with the object."""
1029

    
1030
        if user != account:
1031
            raise NotAllowedError
1032
        path = self._lookup_object(account, container, name,
1033
                                   lock_container=True)[0]
1034
        self._check_permissions(path, permissions)
1035
        try:
1036
            self.permissions.access_set(path, permissions)
1037
        except:
1038
            raise ValueError
1039
        else:
1040
            self._report_sharing_change(user, account, path, {'members':
1041
                                        self.permissions.access_members(path)})
1042

    
1043
        # remove all the cached allowed paths
1044
        # filtering out only those affected could be more expensive
1045
        self._reset_allowed_paths()
1046

    
1047
    @debug_method
1048
    @backend_method
1049
    def get_object_public(self, user, account, container, name):
1050
        """Return the public id of the object if applicable."""
1051

    
1052
        self._can_read_object(user, account, container, name)
1053
        path = self._lookup_object(account, container, name)[0]
1054
        p = self.permissions.public_get(path)
1055
        return p
1056

    
1057
    @debug_method
1058
    @backend_method
1059
    def update_object_public(self, user, account, container, name, public):
1060
        """Update the public status of the object."""
1061

    
1062
        self._can_write_object(user, account, container, name)
1063
        path = self._lookup_object(account, container, name,
1064
                                   lock_container=True)[0]
1065
        if not public:
1066
            self.permissions.public_unset(path)
1067
        else:
1068
            self.permissions.public_set(
1069
                path, self.public_url_security, self.public_url_alphabet)
1070

    
1071
    @debug_method
1072
    @backend_method
1073
    def get_object_hashmap(self, user, account, container, name, version=None):
1074
        """Return the object's size and a list with partial hashes."""
1075

    
1076
        self._can_read_object(user, account, container, name)
1077
        path, node = self._lookup_object(account, container, name)
1078
        props = self._get_version(node, version)
1079
        if props[self.HASH] is None:
1080
            return 0, ()
1081
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1082
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1083

    
1084
    def _update_object_hash(self, user, account, container, name, size, type,
1085
                            hash, checksum, domain, meta, replace_meta,
1086
                            permissions, src_node=None, src_version_id=None,
1087
                            is_copy=False, report_size_change=True):
1088
        if permissions is not None and user != account:
1089
            raise NotAllowedError
1090
        self._can_write_object(user, account, container, name)
1091
        if permissions is not None:
1092
            path = '/'.join((account, container, name))
1093
            self._check_permissions(path, permissions)
1094

    
1095
        account_path, account_node = self._lookup_account(account, True)
1096
        container_path, container_node = self._lookup_container(
1097
            account, container)
1098

    
1099
        path, node = self._put_object_node(
1100
            container_path, container_node, name)
1101
        pre_version_id, dest_version_id = self._put_version_duplicate(
1102
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1103
            checksum=checksum, is_copy=is_copy,
1104
            update_statistics_ancestors_depth=1)
1105

    
1106
        # Handle meta.
1107
        if src_version_id is None:
1108
            src_version_id = pre_version_id
1109
        self._put_metadata_duplicate(
1110
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1111

    
1112
        del_size = self._apply_versioning(account, container, pre_version_id,
1113
                                          update_statistics_ancestors_depth=1)
1114
        size_delta = size - del_size
1115
        if size_delta > 0:
1116
            # Check account quota.
1117
            if not self.using_external_quotaholder:
1118
                account_quota = long(self._get_policy(
1119
                    account_node, is_account_policy=True)['quota'])
1120
                account_usage = self._get_statistics(account_node,
1121
                                                     compute=True)[1]
1122
                if (account_quota > 0 and account_usage > account_quota):
1123
                    raise QuotaError(
1124
                        'Account quota exceeded: limit: %s, usage: %s' % (
1125
                            account_quota, account_usage))
1126

    
1127
            # Check container quota.
1128
            container_quota = long(self._get_policy(
1129
                container_node, is_account_policy=False)['quota'])
1130
            container_usage = self._get_statistics(container_node)[1]
1131
            if (container_quota > 0 and container_usage > container_quota):
1132
                # This must be executed in a transaction, so the version is
1133
                # never created if it fails.
1134
                raise QuotaError(
1135
                    'Container quota exceeded: limit: %s, usage: %s' % (
1136
                        container_quota, container_usage
1137
                    )
1138
                )
1139

    
1140
        if report_size_change:
1141
            self._report_size_change(
1142
                user, account, size_delta,
1143
                {'action': 'object update', 'path': path,
1144
                 'versions': ','.join([str(dest_version_id)])})
1145
        if permissions is not None:
1146
            self.permissions.access_set(path, permissions)
1147
            self._report_sharing_change(
1148
                user, account, path,
1149
                {'members': self.permissions.access_members(path)})
1150

    
1151
        self._report_object_change(
1152
            user, account, path,
1153
            details={'version': dest_version_id, 'action': 'object update'})
1154
        return dest_version_id
1155

    
1156
    @debug_method
1157
    def update_object_hashmap(self, user, account, container, name, size, type,
1158
                              hashmap, checksum, domain, meta=None,
1159
                              replace_meta=False, permissions=None):
1160
        """Create/update an object's hashmap and return the new version."""
1161

    
1162
        meta = meta or {}
1163
        if size == 0:  # No such thing as an empty hashmap.
1164
            hashmap = [self.put_block('')]
1165
        map = HashMap(self.block_size, self.hash_algorithm)
1166
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1167
        missing = self.store.block_search(map)
1168
        if missing:
1169
            ie = IndexError()
1170
            ie.data = [binascii.hexlify(x) for x in missing]
1171
            raise ie
1172

    
1173
        hash = map.hash()
1174
        hexlified = binascii.hexlify(hash)
1175
        # _update_object_hash() locks destination path
1176
        dest_version_id = self._update_object_hash(
1177
            user, account, container, name, size, type, hexlified, checksum,
1178
            domain, meta, replace_meta, permissions)
1179
        self.store.map_put(hash, map)
1180
        return dest_version_id, hexlified
1181

    
1182
    @debug_method
1183
    @backend_method
1184
    def update_object_checksum(self, user, account, container, name, version,
1185
                               checksum):
1186
        """Update an object's checksum."""
1187

    
1188
        # Update objects with greater version and same hashmap
1189
        # and size (fix metadata updates).
1190
        self._can_write_object(user, account, container, name)
1191
        path, node = self._lookup_object(account, container, name,
1192
                                         lock_container=True)
1193
        props = self._get_version(node, version)
1194
        versions = self.node.node_get_versions(node)
1195
        for x in versions:
1196
            if (x[self.SERIAL] >= int(version) and
1197
                x[self.HASH] == props[self.HASH] and
1198
                    x[self.SIZE] == props[self.SIZE]):
1199
                self.node.version_put_property(
1200
                    x[self.SERIAL], 'checksum', checksum)
1201

    
1202
    def _copy_object(self, user, src_account, src_container, src_name,
1203
                     dest_account, dest_container, dest_name, type,
1204
                     dest_domain=None, dest_meta=None, replace_meta=False,
1205
                     permissions=None, src_version=None, is_move=False,
1206
                     delimiter=None):
1207

    
1208
        report_size_change = not is_move
1209
        dest_meta = dest_meta or {}
1210
        dest_version_ids = []
1211
        self._can_read_object(user, src_account, src_container, src_name)
1212

    
1213
        src_container_path = '/'.join((src_account, src_container))
1214
        dest_container_path = '/'.join((dest_account, dest_container))
1215
        # Lock container paths in alphabetical order
1216
        if src_container_path < dest_container_path:
1217
            self._lookup_container(src_account, src_container)
1218
            self._lookup_container(dest_account, dest_container)
1219
        else:
1220
            self._lookup_container(dest_account, dest_container)
1221
            self._lookup_container(src_account, src_container)
1222

    
1223
        path, node = self._lookup_object(src_account, src_container, src_name)
1224
        # TODO: Will do another fetch of the properties in duplicate version...
1225
        props = self._get_version(
1226
            node, src_version)  # Check to see if source exists.
1227
        src_version_id = props[self.SERIAL]
1228
        hash = props[self.HASH]
1229
        size = props[self.SIZE]
1230
        is_copy = not is_move and (src_account, src_container, src_name) != (
1231
            dest_account, dest_container, dest_name)  # New uuid.
1232
        dest_version_ids.append(self._update_object_hash(
1233
            user, dest_account, dest_container, dest_name, size, type, hash,
1234
            None, dest_domain, dest_meta, replace_meta, permissions,
1235
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1236
            report_size_change=report_size_change))
1237
        if is_move and ((src_account, src_container, src_name) !=
1238
                        (dest_account, dest_container, dest_name)):
1239
            self._delete_object(user, src_account, src_container, src_name,
1240
                                report_size_change=report_size_change)
1241

    
1242
        if delimiter:
1243
            prefix = (src_name + delimiter if not
1244
                      src_name.endswith(delimiter) else src_name)
1245
            src_names = self._list_objects_no_limit(
1246
                user, src_account, src_container, prefix, delimiter=None,
1247
                virtual=False, domain=None, keys=[], shared=False, until=None,
1248
                size_range=None, all_props=True, public=False)
1249
            src_names.sort(key=lambda x: x[2])  # order by nodes
1250
            paths = [elem[0] for elem in src_names]
1251
            nodes = [elem[2] for elem in src_names]
1252
            # TODO: Will do another fetch of the properties
1253
            # in duplicate version...
1254
            props = self._get_versions(nodes)  # Check to see if source exists.
1255

    
1256
            for prop, path, node in zip(props, paths, nodes):
1257
                src_version_id = prop[self.SERIAL]
1258
                hash = prop[self.HASH]
1259
                vtype = prop[self.TYPE]
1260
                size = prop[self.SIZE]
1261
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1262
                    delimiter) else dest_name
1263
                vdest_name = path.replace(prefix, dest_prefix, 1)
1264
                # _update_object_hash() locks destination path
1265
                dest_version_ids.append(self._update_object_hash(
1266
                    user, dest_account, dest_container, vdest_name, size,
1267
                    vtype, hash, None, dest_domain, meta={},
1268
                    replace_meta=False, permissions=None, src_node=node,
1269
                    src_version_id=src_version_id, is_copy=is_copy,
1270
                    report_size_change=report_size_change))
1271
                if is_move and ((src_account, src_container, src_name) !=
1272
                                (dest_account, dest_container, dest_name)):
1273
                    self._delete_object(user, src_account, src_container, path,
1274
                                        report_size_change=report_size_change)
1275
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1276
                dest_version_ids)
1277

    
1278
    @debug_method
1279
    @backend_method
1280
    def copy_object(self, user, src_account, src_container, src_name,
1281
                    dest_account, dest_container, dest_name, type, domain,
1282
                    meta=None, replace_meta=False, permissions=None,
1283
                    src_version=None, delimiter=None):
1284
        """Copy an object's data and metadata."""
1285

    
1286
        meta = meta or {}
1287
        dest_version_id = self._copy_object(
1288
            user, src_account, src_container, src_name, dest_account,
1289
            dest_container, dest_name, type, domain, meta, replace_meta,
1290
            permissions, src_version, False, delimiter)
1291
        return dest_version_id
1292

    
1293
    @debug_method
1294
    @backend_method
1295
    def move_object(self, user, src_account, src_container, src_name,
1296
                    dest_account, dest_container, dest_name, type, domain,
1297
                    meta=None, replace_meta=False, permissions=None,
1298
                    delimiter=None):
1299
        """Move an object's data and metadata."""
1300

    
1301
        meta = meta or {}
1302
        if user != src_account:
1303
            raise NotAllowedError
1304
        dest_version_id = self._move_object(
1305
            user, src_account, src_container, src_name, dest_account,
1306
            dest_container, dest_name, type, domain, meta, replace_meta,
1307
            permissions, None, delimiter=delimiter)
1308
        return dest_version_id
1309

    
1310
    def _delete_object(self, user, account, container, name, until=None,
1311
                       delimiter=None, report_size_change=True):
1312
        if user != account:
1313
            raise NotAllowedError
1314

    
1315
        # lookup object and lock container path also
1316
        path, node = self._lookup_object(account, container, name,
1317
                                         lock_container=True)
1318

    
1319
        if until is not None:
1320
            if node is None:
1321
                return
1322
            hashes = []
1323
            size = 0
1324
            serials = []
1325
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1326
                                           update_statistics_ancestors_depth=1)
1327
            hashes += h
1328
            size += s
1329
            serials += v
1330
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1331
                                           update_statistics_ancestors_depth=1)
1332
            hashes += h
1333
            if not self.free_versioning:
1334
                size += s
1335
            serials += v
1336
            for h in hashes:
1337
                self.store.map_delete(h)
1338
            self.node.node_purge(node, until, CLUSTER_DELETED,
1339
                                 update_statistics_ancestors_depth=1)
1340
            try:
1341
                self._get_version(node)
1342
            except NameError:
1343
                self.permissions.access_clear(path)
1344
            self._report_size_change(
1345
                user, account, -size, {
1346
                    'action': 'object purge',
1347
                    'path': path,
1348
                    'versions': ','.join(str(i) for i in serials)
1349
                }
1350
            )
1351
            return
1352

    
1353
        if not self._exists(node):
1354
            raise ItemNotExists('Object is deleted.')
1355

    
1356
        src_version_id, dest_version_id = self._put_version_duplicate(
1357
            user, node, size=0, type='', hash=None, checksum='',
1358
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1359
        del_size = self._apply_versioning(account, container, src_version_id,
1360
                                          update_statistics_ancestors_depth=1)
1361
        if report_size_change:
1362
            self._report_size_change(
1363
                user, account, -del_size,
1364
                {'action': 'object delete',
1365
                 'path': path,
1366
                 'versions': ','.join([str(dest_version_id)])})
1367
        self._report_object_change(
1368
            user, account, path, details={'action': 'object delete'})
1369
        self.permissions.access_clear(path)
1370

    
1371
        if delimiter:
1372
            prefix = name + delimiter if not name.endswith(delimiter) else name
1373
            src_names = self._list_objects_no_limit(
1374
                user, account, container, prefix, delimiter=None,
1375
                virtual=False, domain=None, keys=[], shared=False, until=None,
1376
                size_range=None, all_props=True, public=False)
1377
            paths = []
1378
            for t in src_names:
1379
                path = '/'.join((account, container, t[0]))
1380
                node = t[2]
1381
                if not self._exists(node):
1382
                    continue
1383
                src_version_id, dest_version_id = self._put_version_duplicate(
1384
                    user, node, size=0, type='', hash=None, checksum='',
1385
                    cluster=CLUSTER_DELETED,
1386
                    update_statistics_ancestors_depth=1)
1387
                del_size = self._apply_versioning(
1388
                    account, container, src_version_id,
1389
                    update_statistics_ancestors_depth=1)
1390
                if report_size_change:
1391
                    self._report_size_change(
1392
                        user, account, -del_size,
1393
                        {'action': 'object delete',
1394
                         'path': path,
1395
                         'versions': ','.join([str(dest_version_id)])})
1396
                self._report_object_change(
1397
                    user, account, path, details={'action': 'object delete'})
1398
                paths.append(path)
1399
            self.permissions.access_clear_bulk(paths)
1400

    
1401
        # remove all the cached allowed paths
1402
        # removing the specific path could be more expensive
1403
        self._reset_allowed_paths()
1404

    
1405
    @debug_method
1406
    @backend_method
1407
    def delete_object(self, user, account, container, name, until=None,
1408
                      prefix='', delimiter=None):
1409
        """Delete/purge an object."""
1410

    
1411
        self._delete_object(user, account, container, name, until, delimiter)
1412

    
1413
    @debug_method
1414
    @backend_method
1415
    def list_versions(self, user, account, container, name):
1416
        """Return a list of all object (version, version_timestamp) tuples."""
1417

    
1418
        self._can_read_object(user, account, container, name)
1419
        path, node = self._lookup_object(account, container, name)
1420
        versions = self.node.node_get_versions(node)
1421
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1422
                x[self.CLUSTER] != CLUSTER_DELETED]
1423

    
1424
    @debug_method
1425
    @backend_method
1426
    def get_uuid(self, user, uuid, check_permissions=True):
1427
        """Return the (account, container, name) for the UUID given."""
1428

    
1429
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1430
        if info is None:
1431
            raise NameError
1432
        path, serial = info
1433
        account, container, name = path.split('/', 2)
1434
        if check_permissions:
1435
            self._can_read_object(user, account, container, name)
1436
        return (account, container, name)
1437

    
1438
    @debug_method
1439
    @backend_method
1440
    def get_public(self, user, public):
1441
        """Return the (account, container, name) for the public id given."""
1442

    
1443
        path = self.permissions.public_path(public)
1444
        if path is None:
1445
            raise NameError
1446
        account, container, name = path.split('/', 2)
1447
        self._can_read_object(user, account, container, name)
1448
        return (account, container, name)
1449

    
1450
    def get_block(self, hash):
1451
        """Return a block's data."""
1452

    
1453
        logger.debug("get_block: %s", hash)
1454
        block = self.store.block_get(self._unhexlify_hash(hash))
1455
        if not block:
1456
            raise ItemNotExists('Block does not exist')
1457
        return block
1458

    
1459
    def put_block(self, data):
1460
        """Store a block and return the hash."""
1461

    
1462
        logger.debug("put_block: %s", len(data))
1463
        return binascii.hexlify(self.store.block_put(data))
1464

    
1465
    def update_block(self, hash, data, offset=0):
1466
        """Update a known block and return the hash."""
1467

    
1468
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1469
        if offset == 0 and len(data) == self.block_size:
1470
            return self.put_block(data)
1471
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1472
        return binascii.hexlify(h)
1473

    
1474
    # Path functions.
1475

    
1476
    def _generate_uuid(self):
1477
        return str(uuidlib.uuid4())
1478

    
1479
    def _put_object_node(self, path, parent, name):
1480
        path = '/'.join((path, name))
1481
        node = self.node.node_lookup(path)
1482
        if node is None:
1483
            node = self.node.node_create(parent, path)
1484
        return path, node
1485

    
1486
    def _put_path(self, user, parent, path,
1487
                  update_statistics_ancestors_depth=None):
1488
        node = self.node.node_create(parent, path)
1489
        self.node.version_create(node, None, 0, '', None, user,
1490
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1491
                                 update_statistics_ancestors_depth)
1492
        return node
1493

    
1494
    def _lookup_account(self, account, create=True):
1495
        node = self.node.node_lookup(account)
1496
        if node is None and create:
1497
            node = self._put_path(
1498
                account, self.ROOTNODE, account,
1499
                update_statistics_ancestors_depth=-1)  # User is account.
1500
        return account, node
1501

    
1502
    def _lookup_container(self, account, container):
1503
        for_update = True if self.lock_container_path else False
1504
        path = '/'.join((account, container))
1505
        node = self.node.node_lookup(path, for_update)
1506
        if node is None:
1507
            raise ItemNotExists('Container does not exist')
1508
        return path, node
1509

    
1510
    def _lookup_object(self, account, container, name, lock_container=False):
1511
        if lock_container:
1512
            self._lookup_container(account, container)
1513

    
1514
        path = '/'.join((account, container, name))
1515
        node = self.node.node_lookup(path)
1516
        if node is None:
1517
            raise ItemNotExists('Object does not exist')
1518
        return path, node
1519

    
1520
    def _lookup_objects(self, paths):
1521
        nodes = self.node.node_lookup_bulk(paths)
1522
        return paths, nodes
1523

    
1524
    def _get_properties(self, node, until=None):
1525
        """Return properties until the timestamp given."""
1526

    
1527
        before = until if until is not None else inf
1528
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1529
        if props is None and until is not None:
1530
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1531
        if props is None:
1532
            raise ItemNotExists('Path does not exist')
1533
        return props
1534

    
1535
    def _get_statistics(self, node, until=None, compute=False):
1536
        """Return (count, sum of size, timestamp) of everything under node."""
1537

    
1538
        if until is not None:
1539
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1540
        elif compute:
1541
            stats = self.node.statistics_latest(node,
1542
                                                except_cluster=CLUSTER_DELETED)
1543
        else:
1544
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1545
        if stats is None:
1546
            stats = (0, 0, 0)
1547
        return stats
1548

    
1549
    def _get_version(self, node, version=None):
1550
        if version is None:
1551
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1552
            if props is None:
1553
                raise ItemNotExists('Object does not exist')
1554
        else:
1555
            try:
1556
                version = int(version)
1557
            except ValueError:
1558
                raise VersionNotExists('Version does not exist')
1559
            props = self.node.version_get_properties(version, node=node)
1560
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1561
                raise VersionNotExists('Version does not exist')
1562
        return props
1563

    
1564
    def _get_versions(self, nodes):
1565
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1566

    
1567
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1568
                               type=None, hash=None, checksum=None,
1569
                               cluster=CLUSTER_NORMAL, is_copy=False,
1570
                               update_statistics_ancestors_depth=None):
1571
        """Create a new version of the node."""
1572

    
1573
        props = self.node.version_lookup(
1574
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1575
        if props is not None:
1576
            src_version_id = props[self.SERIAL]
1577
            src_hash = props[self.HASH]
1578
            src_size = props[self.SIZE]
1579
            src_type = props[self.TYPE]
1580
            src_checksum = props[self.CHECKSUM]
1581
        else:
1582
            src_version_id = None
1583
            src_hash = None
1584
            src_size = 0
1585
            src_type = ''
1586
            src_checksum = ''
1587
        if size is None:  # Set metadata.
1588
            hash = src_hash  # This way hash can be set to None
1589
                             # (account or container).
1590
            size = src_size
1591
        if type is None:
1592
            type = src_type
1593
        if checksum is None:
1594
            checksum = src_checksum
1595
        uuid = self._generate_uuid(
1596
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1597

    
1598
        if src_node is None:
1599
            pre_version_id = src_version_id
1600
        else:
1601
            pre_version_id = None
1602
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1603
            if props is not None:
1604
                pre_version_id = props[self.SERIAL]
1605
        if pre_version_id is not None:
1606
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1607
                                        update_statistics_ancestors_depth)
1608

    
1609
        dest_version_id, mtime = self.node.version_create(
1610
            node, hash, size, type, src_version_id, user, uuid, checksum,
1611
            cluster, update_statistics_ancestors_depth)
1612

    
1613
        self.node.attribute_unset_is_latest(node, dest_version_id)
1614

    
1615
        return pre_version_id, dest_version_id
1616

    
1617
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1618
                                node, meta, replace=False):
1619
        if src_version_id is not None:
1620
            self.node.attribute_copy(src_version_id, dest_version_id)
1621
        if not replace:
1622
            self.node.attribute_del(dest_version_id, domain, (
1623
                k for k, v in meta.iteritems() if v == ''))
1624
            self.node.attribute_set(dest_version_id, domain, node, (
1625
                (k, v) for k, v in meta.iteritems() if v != ''))
1626
        else:
1627
            self.node.attribute_del(dest_version_id, domain)
1628
            self.node.attribute_set(dest_version_id, domain, node, ((
1629
                k, v) for k, v in meta.iteritems()))
1630

    
1631
    def _put_metadata(self, user, node, domain, meta, replace=False,
1632
                      update_statistics_ancestors_depth=None):
1633
        """Create a new version and store metadata."""
1634

    
1635
        src_version_id, dest_version_id = self._put_version_duplicate(
1636
            user, node,
1637
            update_statistics_ancestors_depth=
1638
            update_statistics_ancestors_depth)
1639
        self._put_metadata_duplicate(
1640
            src_version_id, dest_version_id, domain, node, meta, replace)
1641
        return src_version_id, dest_version_id
1642

    
1643
    def _list_limits(self, listing, marker, limit):
1644
        start = 0
1645
        if marker:
1646
            try:
1647
                start = listing.index(marker) + 1
1648
            except ValueError:
1649
                pass
1650
        if not limit or limit > 10000:
1651
            limit = 10000
1652
        return start, limit
1653

    
1654
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1655
                                marker=None, limit=10000, virtual=True,
1656
                                domain=None, keys=None, until=None,
1657
                                size_range=None, allowed=None,
1658
                                all_props=False):
1659
        keys = keys or []
1660
        allowed = allowed or []
1661
        cont_prefix = path + '/'
1662
        prefix = cont_prefix + prefix
1663
        start = cont_prefix + marker if marker else None
1664
        before = until if until is not None else inf
1665
        filterq = keys if domain else []
1666
        sizeq = size_range
1667

    
1668
        objects, prefixes = self.node.latest_version_list(
1669
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1670
            allowed, domain, filterq, sizeq, all_props)
1671
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1672
        objects.sort(key=lambda x: x[0])
1673
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1674
        return objects
1675

    
1676
    # Reporting functions.
1677

    
1678
    @debug_method
1679
    @backend_method
1680
    def _report_size_change(self, user, account, size, details=None):
1681
        details = details or {}
1682

    
1683
        if size == 0:
1684
            return
1685

    
1686
        account_node = self._lookup_account(account, True)[1]
1687
        total = self._get_statistics(account_node, compute=True)[1]
1688
        details.update({'user': user, 'total': total})
1689
        self.messages.append(
1690
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1691
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1692

    
1693
        if not self.using_external_quotaholder:
1694
            return
1695

    
1696
        try:
1697
            name = details['path'] if 'path' in details else ''
1698
            serial = self.astakosclient.issue_one_commission(
1699
                holder=account,
1700
                source=DEFAULT_SOURCE,
1701
                provisions={'pithos.diskspace': size},
1702
                name=name)
1703
        except BaseException, e:
1704
            raise QuotaError(e)
1705
        else:
1706
            self.serials.append(serial)
1707

    
1708
    @debug_method
1709
    @backend_method
1710
    def _report_object_change(self, user, account, path, details=None):
1711
        details = details or {}
1712
        details.update({'user': user})
1713
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1714
                              account, QUEUE_INSTANCE_ID, 'object', path,
1715
                              details))
1716

    
1717
    @debug_method
1718
    @backend_method
1719
    def _report_sharing_change(self, user, account, path, details=None):
1720
        details = details or {}
1721
        details.update({'user': user})
1722
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1723
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1724
                              details))
1725

    
1726
    # Policy functions.
1727

    
1728
    def _check_policy(self, policy, is_account_policy=True):
1729
        default_policy = self.default_account_policy \
1730
            if is_account_policy else self.default_container_policy
1731
        for k in policy.keys():
1732
            if policy[k] == '':
1733
                policy[k] = default_policy.get(k)
1734
        for k, v in policy.iteritems():
1735
            if k == 'quota':
1736
                q = int(v)  # May raise ValueError.
1737
                if q < 0:
1738
                    raise ValueError
1739
            elif k == 'versioning':
1740
                if v not in ['auto', 'none']:
1741
                    raise ValueError
1742
            else:
1743
                raise ValueError
1744

    
1745
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1746
        default_policy = self.default_account_policy \
1747
            if is_account_policy else self.default_container_policy
1748
        if replace:
1749
            for k, v in default_policy.iteritems():
1750
                if k not in policy:
1751
                    policy[k] = v
1752
        self.node.policy_set(node, policy)
1753

    
1754
    def _get_policy(self, node, is_account_policy=True):
1755
        default_policy = self.default_account_policy \
1756
            if is_account_policy else self.default_container_policy
1757
        policy = default_policy.copy()
1758
        policy.update(self.node.policy_get(node))
1759
        return policy
1760

    
1761
    def _apply_versioning(self, account, container, version_id,
1762
                          update_statistics_ancestors_depth=None):
1763
        """Delete the provided version if such is the policy.
1764
           Return size of object removed.
1765
        """
1766

    
1767
        if version_id is None:
1768
            return 0
1769
        path, node = self._lookup_container(account, container)
1770
        versioning = self._get_policy(
1771
            node, is_account_policy=False)['versioning']
1772
        if versioning != 'auto':
1773
            hash, size = self.node.version_remove(
1774
                version_id, update_statistics_ancestors_depth)
1775
            self.store.map_delete(hash)
1776
            return size
1777
        elif self.free_versioning:
1778
            return self.node.version_get_properties(
1779
                version_id, keys=('size',))[0]
1780
        return 0
1781

    
1782
    # Access control functions.
1783

    
1784
    def _check_groups(self, groups):
1785
        # raise ValueError('Bad characters in groups')
1786
        pass
1787

    
1788
    def _check_permissions(self, path, permissions):
1789
        # raise ValueError('Bad characters in permissions')
1790
        pass
1791

    
1792
    def _get_formatted_paths(self, paths):
1793
        formatted = []
1794
        if len(paths) == 0:
1795
            return formatted
1796
        props = self.node.get_props(paths)
1797
        if props:
1798
            for prop in props:
1799
                if prop[1].split(';', 1)[0].strip() in (
1800
                        'application/directory', 'application/folder'):
1801
                    formatted.append((prop[0].rstrip('/') + '/',
1802
                                      self.MATCH_PREFIX))
1803
                formatted.append((prop[0], self.MATCH_EXACT))
1804
        return formatted
1805

    
1806
    def _get_permissions_path(self, account, container, name):
1807
        path = '/'.join((account, container, name))
1808
        permission_paths = self.permissions.access_inherit(path)
1809
        permission_paths.sort()
1810
        permission_paths.reverse()
1811
        for p in permission_paths:
1812
            if p == path:
1813
                return p
1814
            else:
1815
                if p.count('/') < 2:
1816
                    continue
1817
                node = self.node.node_lookup(p)
1818
                props = None
1819
                if node is not None:
1820
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1821
                if props is not None:
1822
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1823
                            'application/directory', 'application/folder'):
1824
                        return p
1825
        return None
1826

    
1827
    def _get_permissions_path_bulk(self, account, container, names):
1828
        formatted_paths = []
1829
        for name in names:
1830
            path = '/'.join((account, container, name))
1831
            formatted_paths.append(path)
1832
        permission_paths = self.permissions.access_inherit_bulk(
1833
            formatted_paths)
1834
        permission_paths.sort()
1835
        permission_paths.reverse()
1836
        permission_paths_list = []
1837
        lookup_list = []
1838
        for p in permission_paths:
1839
            if p in formatted_paths:
1840
                permission_paths_list.append(p)
1841
            else:
1842
                if p.count('/') < 2:
1843
                    continue
1844
                lookup_list.append(p)
1845

    
1846
        if len(lookup_list) > 0:
1847
            props = self.node.get_props(lookup_list)
1848
            if props:
1849
                for prop in props:
1850
                    if prop[1].split(';', 1)[0].strip() in (
1851
                            'application/directory', 'application/folder'):
1852
                        permission_paths_list.append(prop[0])
1853

    
1854
        if len(permission_paths_list) > 0:
1855
            return permission_paths_list
1856

    
1857
        return None
1858

    
1859
    def _reset_allowed_paths(self):
1860
        self.read_allowed_paths = defaultdict(set)
1861
        self.write_allowed_paths = defaultdict(set)
1862

    
1863
    @check_allowed_paths(action=0)
1864
    def _can_read_account(self, user, account):
1865
        if user != account:
1866
            if account not in self._allowed_accounts(user):
1867
                raise NotAllowedError
1868

    
1869
    @check_allowed_paths(action=1)
1870
    def _can_write_account(self, user, account):
1871
        if user != account:
1872
            raise NotAllowedError
1873

    
1874
    @check_allowed_paths(action=0)
1875
    def _can_read_container(self, user, account, container):
1876
        if user != account:
1877
            if container not in self._allowed_containers(user, account):
1878
                raise NotAllowedError
1879

    
1880
    @check_allowed_paths(action=1)
1881
    def _can_write_container(self, user, account, container):
1882
        if user != account:
1883
            raise NotAllowedError
1884

    
1885
    @check_allowed_paths(action=0)
1886
    def _can_read_object(self, user, account, container, name):
1887
        if user == account:
1888
            return True
1889
        path = '/'.join((account, container, name))
1890
        if self.permissions.public_get(path) is not None:
1891
            return True
1892
        path = self._get_permissions_path(account, container, name)
1893
        if not path:
1894
            raise NotAllowedError
1895
        if (not self.permissions.access_check(path, self.READ, user) and not
1896
                self.permissions.access_check(path, self.WRITE, user)):
1897
            raise NotAllowedError
1898

    
1899
    @check_allowed_paths(action=1)
1900
    def _can_write_object(self, user, account, container, name):
1901
        if user == account:
1902
            return True
1903
        path = '/'.join((account, container, name))
1904
        path = self._get_permissions_path(account, container, name)
1905
        if not path:
1906
            raise NotAllowedError
1907
        if not self.permissions.access_check(path, self.WRITE, user):
1908
            raise NotAllowedError
1909

    
1910
    def _allowed_accounts(self, user):
1911
        allow = set()
1912
        for path in self.permissions.access_list_paths(user):
1913
            p = path.split('/', 1)[0]
1914
            allow.add(p)
1915
        self.read_allowed_paths[user] |= allow
1916
        return sorted(allow)
1917

    
1918
    def _allowed_containers(self, user, account):
1919
        allow = set()
1920
        for path in self.permissions.access_list_paths(user, account):
1921
            p = path.split('/', 2)[1]
1922
            allow.add(p)
1923
        self.read_allowed_paths[user] |= allow
1924
        return sorted(allow)
1925

    
1926
    # Domain functions
1927

    
1928
    @debug_method
1929
    @backend_method
1930
    def get_domain_objects(self, domain, user=None):
1931
        allowed_paths = self.permissions.access_list_paths(
1932
            user, include_owned=user is not None, include_containers=False)
1933
        if not allowed_paths:
1934
            return []
1935
        obj_list = self.node.domain_object_list(
1936
            domain, allowed_paths, CLUSTER_NORMAL)
1937
        return [(path,
1938
                 self._build_metadata(props, user_defined_meta),
1939
                 self.permissions.access_get(path)) for
1940
                path, props, user_defined_meta in obj_list]
1941

    
1942
    # util functions
1943

    
1944
    def _build_metadata(self, props, user_defined=None,
1945
                        include_user_defined=True):
1946
        meta = {'bytes': props[self.SIZE],
1947
                'type': props[self.TYPE],
1948
                'hash': props[self.HASH],
1949
                'version': props[self.SERIAL],
1950
                'version_timestamp': props[self.MTIME],
1951
                'modified_by': props[self.MUSER],
1952
                'uuid': props[self.UUID],
1953
                'checksum': props[self.CHECKSUM]}
1954
        if include_user_defined and user_defined is not None:
1955
            meta.update(user_defined)
1956
        return meta
1957

    
1958
    def _exists(self, node):
1959
        try:
1960
            self._get_version(node)
1961
        except ItemNotExists:
1962
            return False
1963
        else:
1964
            return True
1965

    
1966
    def _unhexlify_hash(self, hash):
1967
        try:
1968
            return binascii.unhexlify(hash)
1969
        except TypeError:
1970
            raise InvalidHash(hash)