Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 0d573e18

History | View | Annotate | Download (78 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    def list_accounts(self, user, marker=None, limit=10000):
388
        """Return a list of accounts the user can access."""
389

    
390
        allowed = self._allowed_accounts(user)
391
        start, limit = self._list_limits(allowed, marker, limit)
392
        return allowed[start:start + limit]
393

    
394
    def _get_account_quotas(self, account):
395
        """Get account usage from astakos."""
396

    
397
        quotas = self.astakosclient.service_get_quotas(account)[account]
398
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
399
                                                  {})
400

    
401
    @debug_method
402
    @backend_method
403
    def get_account_meta(self, user, account, domain, until=None,
404
                         include_user_defined=True):
405
        """Return a dictionary with the account metadata for the domain."""
406

    
407
        self._can_read_account(user, account)
408
        path, node = self._lookup_account(account, user == account)
409
        if user != account:
410
            if until or (node is None):
411
                raise NotAllowedError
412
        try:
413
            props = self._get_properties(node, until)
414
            mtime = props[self.MTIME]
415
        except NameError:
416
            props = None
417
            mtime = until
418
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
419
        tstamp = max(tstamp, mtime)
420
        if until is None:
421
            modified = tstamp
422
        else:
423
            modified = self._get_statistics(
424
                node, compute=True)[2]  # Overall last modification.
425
            modified = max(modified, mtime)
426

    
427
        if user != account:
428
            meta = {'name': account}
429
        else:
430
            meta = {}
431
            if props is not None and include_user_defined:
432
                meta.update(
433
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
434
            if until is not None:
435
                meta.update({'until_timestamp': tstamp})
436
            meta.update({'name': account, 'count': count, 'bytes': bytes})
437
            if self.using_external_quotaholder:
438
                external_quota = self._get_account_quotas(account)
439
                meta['bytes'] = external_quota.get('usage', 0)
440
        meta.update({'modified': modified})
441
        return meta
442

    
443
    @debug_method
444
    @backend_method
445
    def update_account_meta(self, user, account, domain, meta, replace=False):
446
        """Update the metadata associated with the account for the domain."""
447

    
448
        self._can_write_account(user, account)
449
        path, node = self._lookup_account(account, True)
450
        self._put_metadata(user, node, domain, meta, replace,
451
                           update_statistics_ancestors_depth=-1)
452

    
453
    @debug_method
454
    @backend_method
455
    def get_account_groups(self, user, account):
456
        """Return a dictionary with the user groups defined for the account."""
457

    
458
        self._can_read_account(user, account)
459
        if user != account:
460
            return {}
461
        self._lookup_account(account, True)
462
        return self.permissions.group_dict(account)
463

    
464
    @debug_method
465
    @backend_method
466
    def update_account_groups(self, user, account, groups, replace=False):
467
        """Update the groups associated with the account."""
468

    
469
        self._can_write_account(user, account)
470
        self._lookup_account(account, True)
471
        self._check_groups(groups)
472
        if replace:
473
            self.permissions.group_destroy(account)
474
        for k, v in groups.iteritems():
475
            if not replace:  # If not already deleted.
476
                self.permissions.group_delete(account, k)
477
            if v:
478
                self.permissions.group_addmany(account, k, v)
479

    
480
    @debug_method
481
    @backend_method
482
    def get_account_policy(self, user, account):
483
        """Return a dictionary with the account policy."""
484

    
485
        self._can_read_account(user, account)
486
        if user != account:
487
            return {}
488
        path, node = self._lookup_account(account, True)
489
        policy = self._get_policy(node, is_account_policy=True)
490
        if self.using_external_quotaholder:
491
            external_quota = self._get_account_quotas(account)
492
            policy['quota'] = external_quota.get('limit', 0)
493
        return policy
494

    
495
    @debug_method
496
    @backend_method
497
    def update_account_policy(self, user, account, policy, replace=False):
498
        """Update the policy associated with the account."""
499

    
500
        self._can_write_account(user, account)
501
        path, node = self._lookup_account(account, True)
502
        self._check_policy(policy, is_account_policy=True)
503
        self._put_policy(node, policy, replace, is_account_policy=True)
504

    
505
    @debug_method
506
    @backend_method
507
    def put_account(self, user, account, policy=None):
508
        """Create a new account with the given name."""
509

    
510
        policy = policy or {}
511
        self._can_write_account(user, account)
512
        node = self.node.node_lookup(account)
513
        if node is not None:
514
            raise AccountExists('Account already exists')
515
        if policy:
516
            self._check_policy(policy, is_account_policy=True)
517
        node = self._put_path(user, self.ROOTNODE, account,
518
                              update_statistics_ancestors_depth=-1)
519
        self._put_policy(node, policy, True, is_account_policy=True)
520

    
521
    @debug_method
522
    @backend_method
523
    def delete_account(self, user, account):
524
        """Delete the account with the given name."""
525

    
526
        self._can_write_account(user, account)
527
        node = self.node.node_lookup(account)
528
        if node is None:
529
            return
530
        if not self.node.node_remove(node,
531
                                     update_statistics_ancestors_depth=-1):
532
            raise AccountNotEmpty('Account is not empty')
533
        self.permissions.group_destroy(account)
534

    
535
        # remove all the cached allowed paths
536
        # removing the specific path could be more expensive
537
        self._reset_allowed_paths()
538

    
539
    @debug_method
540
    @backend_method
541
    def list_containers(self, user, account, marker=None, limit=10000,
542
                        shared=False, until=None, public=False):
543
        """Return a list of containers existing under an account."""
544

    
545
        self._can_read_account(user, account)
546
        if user != account:
547
            if until:
548
                raise NotAllowedError
549
            allowed = self._allowed_containers(user, account)
550
            start, limit = self._list_limits(allowed, marker, limit)
551
            return allowed[start:start + limit]
552
        if shared or public:
553
            allowed = set()
554
            if shared:
555
                allowed.update([x.split('/', 2)[1] for x in
556
                               self.permissions.access_list_shared(account)])
557
            if public:
558
                allowed.update([x[0].split('/', 2)[1] for x in
559
                               self.permissions.public_list(account)])
560
            allowed = sorted(allowed)
561
            start, limit = self._list_limits(allowed, marker, limit)
562
            return allowed[start:start + limit]
563
        node = self.node.node_lookup(account)
564
        containers = [x[0] for x in self._list_object_properties(
565
            node, account, '', '/', marker, limit, False, None, [], until)]
566
        start, limit = self._list_limits(
567
            [x[0] for x in containers], marker, limit)
568
        return containers[start:start + limit]
569

    
570
    @debug_method
571
    @backend_method
572
    def list_container_meta(self, user, account, container, domain,
573
                            until=None):
574
        """Return a list of the container's object meta keys for a domain."""
575

    
576
        self._can_read_container(user, account, container)
577
        allowed = []
578
        if user != account:
579
            if until:
580
                raise NotAllowedError
581
        path, node = self._lookup_container(account, container)
582
        before = until if until is not None else inf
583
        allowed = self._get_formatted_paths(allowed)
584
        return self.node.latest_attribute_keys(node, domain, before,
585
                                               CLUSTER_DELETED, allowed)
586

    
587
    @debug_method
588
    @backend_method
589
    def get_container_meta(self, user, account, container, domain, until=None,
590
                           include_user_defined=True):
591
        """Return a dictionary with the container metadata for the domain."""
592

    
593
        self._can_read_container(user, account, container)
594
        if user != account:
595
            if until:
596
                raise NotAllowedError
597
        path, node = self._lookup_container(account, container)
598
        props = self._get_properties(node, until)
599
        mtime = props[self.MTIME]
600
        count, bytes, tstamp = self._get_statistics(node, until)
601
        tstamp = max(tstamp, mtime)
602
        if until is None:
603
            modified = tstamp
604
        else:
605
            modified = self._get_statistics(
606
                node)[2]  # Overall last modification.
607
            modified = max(modified, mtime)
608

    
609
        if user != account:
610
            meta = {'name': container}
611
        else:
612
            meta = {}
613
            if include_user_defined:
614
                meta.update(
615
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
616
            if until is not None:
617
                meta.update({'until_timestamp': tstamp})
618
            meta.update({'name': container, 'count': count, 'bytes': bytes})
619
        meta.update({'modified': modified})
620
        return meta
621

    
622
    @debug_method
623
    @backend_method
624
    def update_container_meta(self, user, account, container, domain, meta,
625
                              replace=False):
626
        """Update the metadata associated with the container for the domain."""
627

    
628
        self._can_write_container(user, account, container)
629
        path, node = self._lookup_container(account, container)
630
        src_version_id, dest_version_id = self._put_metadata(
631
            user, node, domain, meta, replace,
632
            update_statistics_ancestors_depth=0)
633
        if src_version_id is not None:
634
            versioning = self._get_policy(
635
                node, is_account_policy=False)['versioning']
636
            if versioning != 'auto':
637
                self.node.version_remove(src_version_id,
638
                                         update_statistics_ancestors_depth=0)
639

    
640
    @debug_method
641
    @backend_method
642
    def get_container_policy(self, user, account, container):
643
        """Return a dictionary with the container policy."""
644

    
645
        self._can_read_container(user, account, container)
646
        if user != account:
647
            return {}
648
        path, node = self._lookup_container(account, container)
649
        return self._get_policy(node, is_account_policy=False)
650

    
651
    @debug_method
652
    @backend_method
653
    def update_container_policy(self, user, account, container, policy,
654
                                replace=False):
655
        """Update the policy associated with the container."""
656

    
657
        self._can_write_container(user, account, container)
658
        path, node = self._lookup_container(account, container)
659
        self._check_policy(policy, is_account_policy=False)
660
        self._put_policy(node, policy, replace, is_account_policy=False)
661

    
662
    @debug_method
663
    @backend_method
664
    def put_container(self, user, account, container, policy=None):
665
        """Create a new container with the given name."""
666

    
667
        policy = policy or {}
668
        self._can_write_container(user, account, container)
669
        try:
670
            path, node = self._lookup_container(account, container)
671
        except NameError:
672
            pass
673
        else:
674
            raise ContainerExists('Container already exists')
675
        if policy:
676
            self._check_policy(policy, is_account_policy=False)
677
        path = '/'.join((account, container))
678
        node = self._put_path(
679
            user, self._lookup_account(account, True)[1], path,
680
            update_statistics_ancestors_depth=-1)
681
        self._put_policy(node, policy, True, is_account_policy=False)
682

    
683
    @debug_method
684
    @backend_method
685
    def delete_container(self, user, account, container, until=None, prefix='',
686
                         delimiter=None):
687
        """Delete/purge the container with the given name."""
688

    
689
        self._can_write_container(user, account, container)
690
        path, node = self._lookup_container(account, container)
691

    
692
        if until is not None:
693
            hashes, size, serials = self.node.node_purge_children(
694
                node, until, CLUSTER_HISTORY,
695
                update_statistics_ancestors_depth=0)
696
            for h in hashes:
697
                self.store.map_delete(h)
698
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
699
                                          update_statistics_ancestors_depth=0)
700
            if not self.free_versioning:
701
                self._report_size_change(
702
                    user, account, -size, {
703
                        'action': 'container purge',
704
                        'path': path,
705
                        'versions': ','.join(str(i) for i in serials)
706
                    }
707
                )
708
            return
709

    
710
        if not delimiter:
711
            if self._get_statistics(node)[0] > 0:
712
                raise ContainerNotEmpty('Container is not empty')
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, inf, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
721
            if not self.free_versioning:
722
                self._report_size_change(
723
                    user, account, -size, {
724
                        'action': 'container purge',
725
                        'path': path,
726
                        'versions': ','.join(str(i) for i in serials)
727
                    }
728
                )
729
        else:
730
            # remove only contents
731
            src_names = self._list_objects_no_limit(
732
                user, account, container, prefix='', delimiter=None,
733
                virtual=False, domain=None, keys=[], shared=False, until=None,
734
                size_range=None, all_props=True, public=False)
735
            paths = []
736
            for t in src_names:
737
                path = '/'.join((account, container, t[0]))
738
                node = t[2]
739
                if not self._exists(node):
740
                    continue
741
                src_version_id, dest_version_id = self._put_version_duplicate(
742
                    user, node, size=0, type='', hash=None, checksum='',
743
                    cluster=CLUSTER_DELETED,
744
                    update_statistics_ancestors_depth=1)
745
                del_size = self._apply_versioning(
746
                    account, container, src_version_id,
747
                    update_statistics_ancestors_depth=1)
748
                self._report_size_change(
749
                    user, account, -del_size, {
750
                        'action': 'object delete',
751
                        'path': path,
752
                        'versions': ','.join([str(dest_version_id)])})
753
                self._report_object_change(
754
                    user, account, path, details={'action': 'object delete'})
755
                paths.append(path)
756
            self.permissions.access_clear_bulk(paths)
757

    
758
        # remove all the cached allowed paths
759
        # removing the specific path could be more expensive
760
        self._reset_allowed_paths()
761

    
762
    def _list_objects(self, user, account, container, prefix, delimiter,
763
                      marker, limit, virtual, domain, keys, shared, until,
764
                      size_range, all_props, public):
765
        if user != account and until:
766
            raise NotAllowedError
767
        if shared and public:
768
            # get shared first
769
            shared_paths = self._list_object_permissions(
770
                user, account, container, prefix, shared=True, public=False)
771
            objects = set()
772
            if shared_paths:
773
                path, node = self._lookup_container(account, container)
774
                shared_paths = self._get_formatted_paths(shared_paths)
775
                objects |= set(self._list_object_properties(
776
                    node, path, prefix, delimiter, marker, limit, virtual,
777
                    domain, keys, until, size_range, shared_paths, all_props))
778

    
779
            # get public
780
            objects |= set(self._list_public_object_properties(
781
                user, account, container, prefix, all_props))
782
            objects = list(objects)
783

    
784
            objects.sort(key=lambda x: x[0])
785
            start, limit = self._list_limits(
786
                [x[0] for x in objects], marker, limit)
787
            return objects[start:start + limit]
788
        elif public:
789
            objects = self._list_public_object_properties(
790
                user, account, container, prefix, all_props)
791
            start, limit = self._list_limits(
792
                [x[0] for x in objects], marker, limit)
793
            return objects[start:start + limit]
794

    
795
        allowed = self._list_object_permissions(
796
            user, account, container, prefix, shared, public)
797
        if shared and not allowed:
798
            return []
799
        path, node = self._lookup_container(account, container)
800
        allowed = self._get_formatted_paths(allowed)
801
        objects = self._list_object_properties(
802
            node, path, prefix, delimiter, marker, limit, virtual, domain,
803
            keys, until, size_range, allowed, all_props)
804
        start, limit = self._list_limits(
805
            [x[0] for x in objects], marker, limit)
806
        return objects[start:start + limit]
807

    
808
    def _list_public_object_properties(self, user, account, container, prefix,
809
                                       all_props):
810
        public = self._list_object_permissions(
811
            user, account, container, prefix, shared=False, public=True)
812
        paths, nodes = self._lookup_objects(public)
813
        path = '/'.join((account, container))
814
        cont_prefix = path + '/'
815
        paths = [x[len(cont_prefix):] for x in paths]
816
        objects = [(p,) + props for p, props in
817
                   zip(paths, self.node.version_lookup_bulk(
818
                       nodes, all_props=all_props, order_by_path=True))]
819
        return objects
820

    
821
    def _list_objects_no_limit(self, user, account, container, prefix,
822
                               delimiter, virtual, domain, keys, shared, until,
823
                               size_range, all_props, public):
824
        objects = []
825
        while True:
826
            marker = objects[-1] if objects else None
827
            limit = 10000
828
            l = self._list_objects(
829
                user, account, container, prefix, delimiter, marker, limit,
830
                virtual, domain, keys, shared, until, size_range, all_props,
831
                public)
832
            objects.extend(l)
833
            if not l or len(l) < limit:
834
                break
835
        return objects
836

    
837
    def _list_object_permissions(self, user, account, container, prefix,
838
                                 shared, public):
839
        allowed = []
840
        path = '/'.join((account, container, prefix)).rstrip('/')
841
        if user != account:
842
            allowed = self.permissions.access_list_paths(user, path)
843
            if not allowed:
844
                raise NotAllowedError
845
        else:
846
            allowed = set()
847
            if shared:
848
                allowed.update(self.permissions.access_list_shared(path))
849
            if public:
850
                allowed.update(
851
                    [x[0] for x in self.permissions.public_list(path)])
852
            allowed = sorted(allowed)
853
            if not allowed:
854
                return []
855
        return allowed
856

    
857
    @debug_method
858
    @backend_method
859
    def list_objects(self, user, account, container, prefix='', delimiter=None,
860
                     marker=None, limit=10000, virtual=True, domain=None,
861
                     keys=None, shared=False, until=None, size_range=None,
862
                     public=False):
863
        """List (object name, object version_id) under a container."""
864

    
865
        keys = keys or []
866
        return self._list_objects(
867
            user, account, container, prefix, delimiter, marker, limit,
868
            virtual, domain, keys, shared, until, size_range, False, public)
869

    
870
    @debug_method
871
    @backend_method
872
    def list_object_meta(self, user, account, container, prefix='',
873
                         delimiter=None, marker=None, limit=10000,
874
                         virtual=True, domain=None, keys=None, shared=False,
875
                         until=None, size_range=None, public=False):
876
        """Return a list of metadata dicts of objects under a container."""
877

    
878
        keys = keys or []
879
        props = self._list_objects(
880
            user, account, container, prefix, delimiter, marker, limit,
881
            virtual, domain, keys, shared, until, size_range, True, public)
882
        objects = []
883
        for p in props:
884
            if len(p) == 2:
885
                objects.append({'subdir': p[0]})
886
            else:
887
                objects.append({
888
                    'name': p[0],
889
                    'bytes': p[self.SIZE + 1],
890
                    'type': p[self.TYPE + 1],
891
                    'hash': p[self.HASH + 1],
892
                    'version': p[self.SERIAL + 1],
893
                    'version_timestamp': p[self.MTIME + 1],
894
                    'modified': p[self.MTIME + 1] if until is None else None,
895
                    'modified_by': p[self.MUSER + 1],
896
                    'uuid': p[self.UUID + 1],
897
                    'checksum': p[self.CHECKSUM + 1]})
898
        return objects
899

    
900
    @debug_method
901
    @backend_method
902
    def list_object_permissions(self, user, account, container, prefix=''):
903
        """Return a list of paths enforce permissions under a container."""
904

    
905
        return self._list_object_permissions(user, account, container, prefix,
906
                                             True, False)
907

    
908
    @debug_method
909
    @backend_method
910
    def list_object_public(self, user, account, container, prefix=''):
911
        """Return a mapping of object paths to public ids under a container."""
912

    
913
        public = {}
914
        for path, p in self.permissions.public_list('/'.join((account,
915
                                                              container,
916
                                                              prefix))):
917
            public[path] = p
918
        return public
919

    
920
    @debug_method
921
    @backend_method
922
    def get_object_meta(self, user, account, container, name, domain,
923
                        version=None, include_user_defined=True):
924
        """Return a dictionary with the object metadata for the domain."""
925

    
926
        self._can_read_object(user, account, container, name)
927
        path, node = self._lookup_object(account, container, name)
928
        props = self._get_version(node, version)
929
        if version is None:
930
            modified = props[self.MTIME]
931
        else:
932
            try:
933
                modified = self._get_version(
934
                    node)[self.MTIME]  # Overall last modification.
935
            except NameError:  # Object may be deleted.
936
                del_props = self.node.version_lookup(
937
                    node, inf, CLUSTER_DELETED)
938
                if del_props is None:
939
                    raise ItemNotExists('Object does not exist')
940
                modified = del_props[self.MTIME]
941

    
942
        meta = {}
943
        if include_user_defined:
944
            meta.update(
945
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
946
        meta.update({'name': name,
947
                     'bytes': props[self.SIZE],
948
                     'type': props[self.TYPE],
949
                     'hash': props[self.HASH],
950
                     'version': props[self.SERIAL],
951
                     'version_timestamp': props[self.MTIME],
952
                     'modified': modified,
953
                     'modified_by': props[self.MUSER],
954
                     'uuid': props[self.UUID],
955
                     'checksum': props[self.CHECKSUM]})
956
        return meta
957

    
958
    @debug_method
959
    @backend_method
960
    def update_object_meta(self, user, account, container, name, domain, meta,
961
                           replace=False):
962
        """Update object metadata for a domain and return the new version."""
963

    
964
        self._can_write_object(user, account, container, name)
965

    
966
        path, node = self._lookup_object(account, container, name,
967
                                         lock_container=True)
968
        src_version_id, dest_version_id = self._put_metadata(
969
            user, node, domain, meta, replace,
970
            update_statistics_ancestors_depth=1)
971
        self._apply_versioning(account, container, src_version_id,
972
                               update_statistics_ancestors_depth=1)
973
        return dest_version_id
974

    
975
    @debug_method
976
    @backend_method
977
    def get_object_permissions_bulk(self, user, account, container, names):
978
        """Return the action allowed on the object, the path
979
        from which the object gets its permissions from,
980
        along with a dictionary containing the permissions."""
981

    
982
        permissions_path = self._get_permissions_path_bulk(account, container,
983
                                                           names)
984
        access_objects = self.permissions.access_check_bulk(permissions_path,
985
                                                            user)
986
        #group_parents = access_objects['group_parents']
987
        nobject_permissions = {}
988
        cpath = '/'.join((account, container, ''))
989
        cpath_idx = len(cpath)
990
        for path in permissions_path:
991
            allowed = 1
992
            name = path[cpath_idx:]
993
            if user != account:
994
                try:
995
                    allowed = access_objects[path]
996
                except KeyError:
997
                    raise NotAllowedError
998
            access_dict, allowed = \
999
                self.permissions.access_get_for_bulk(access_objects[path])
1000
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1001
                                         access_dict)
1002
        self._lookup_objects(permissions_path)
1003
        return nobject_permissions
1004

    
1005
    @debug_method
1006
    @backend_method
1007
    def get_object_permissions(self, user, account, container, name):
1008
        """Return the action allowed on the object, the path
1009
        from which the object gets its permissions from,
1010
        along with a dictionary containing the permissions."""
1011

    
1012
        allowed = 'write'
1013
        permissions_path = self._get_permissions_path(account, container, name)
1014
        if user != account:
1015
            if self.permissions.access_check(permissions_path, self.WRITE,
1016
                                             user):
1017
                allowed = 'write'
1018
            elif self.permissions.access_check(permissions_path, self.READ,
1019
                                               user):
1020
                allowed = 'read'
1021
            else:
1022
                raise NotAllowedError
1023
        self._lookup_object(account, container, name)
1024
        return (allowed,
1025
                permissions_path,
1026
                self.permissions.access_get(permissions_path))
1027

    
1028
    @debug_method
1029
    @backend_method
1030
    def update_object_permissions(self, user, account, container, name,
1031
                                  permissions):
1032
        """Update the permissions associated with the object."""
1033

    
1034
        if user != account:
1035
            raise NotAllowedError
1036
        path = self._lookup_object(account, container, name,
1037
                                   lock_container=True)[0]
1038
        self._check_permissions(path, permissions)
1039
        try:
1040
            self.permissions.access_set(path, permissions)
1041
        except:
1042
            raise ValueError
1043
        else:
1044
            self._report_sharing_change(user, account, path, {'members':
1045
                                        self.permissions.access_members(path)})
1046

    
1047
        # remove all the cached allowed paths
1048
        # filtering out only those affected could be more expensive
1049
        self._reset_allowed_paths()
1050

    
1051
    @debug_method
1052
    @backend_method
1053
    def get_object_public(self, user, account, container, name):
1054
        """Return the public id of the object if applicable."""
1055

    
1056
        self._can_read_object(user, account, container, name)
1057
        path = self._lookup_object(account, container, name)[0]
1058
        p = self.permissions.public_get(path)
1059
        return p
1060

    
1061
    @debug_method
1062
    @backend_method
1063
    def update_object_public(self, user, account, container, name, public):
1064
        """Update the public status of the object."""
1065

    
1066
        self._can_write_object(user, account, container, name)
1067
        path = self._lookup_object(account, container, name,
1068
                                   lock_container=True)[0]
1069
        if not public:
1070
            self.permissions.public_unset(path)
1071
        else:
1072
            self.permissions.public_set(
1073
                path, self.public_url_security, self.public_url_alphabet)
1074

    
1075
    @debug_method
1076
    @backend_method
1077
    def get_object_hashmap(self, user, account, container, name, version=None):
1078
        """Return the object's size and a list with partial hashes."""
1079

    
1080
        self._can_read_object(user, account, container, name)
1081
        path, node = self._lookup_object(account, container, name)
1082
        props = self._get_version(node, version)
1083
        if props[self.HASH] is None:
1084
            return 0, ()
1085
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1086
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1087

    
1088
    def _update_object_hash(self, user, account, container, name, size, type,
1089
                            hash, checksum, domain, meta, replace_meta,
1090
                            permissions, src_node=None, src_version_id=None,
1091
                            is_copy=False, report_size_change=True):
1092
        if permissions is not None and user != account:
1093
            raise NotAllowedError
1094
        self._can_write_object(user, account, container, name)
1095
        if permissions is not None:
1096
            path = '/'.join((account, container, name))
1097
            self._check_permissions(path, permissions)
1098

    
1099
        account_path, account_node = self._lookup_account(account, True)
1100
        container_path, container_node = self._lookup_container(
1101
            account, container)
1102

    
1103
        path, node = self._put_object_node(
1104
            container_path, container_node, name)
1105
        pre_version_id, dest_version_id = self._put_version_duplicate(
1106
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1107
            checksum=checksum, is_copy=is_copy,
1108
            update_statistics_ancestors_depth=1)
1109

    
1110
        # Handle meta.
1111
        if src_version_id is None:
1112
            src_version_id = pre_version_id
1113
        self._put_metadata_duplicate(
1114
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1115

    
1116
        del_size = self._apply_versioning(account, container, pre_version_id,
1117
                                          update_statistics_ancestors_depth=1)
1118
        size_delta = size - del_size
1119
        if size_delta > 0:
1120
            # Check account quota.
1121
            if not self.using_external_quotaholder:
1122
                account_quota = long(self._get_policy(
1123
                    account_node, is_account_policy=True)['quota'])
1124
                account_usage = self._get_statistics(account_node,
1125
                                                     compute=True)[1]
1126
                if (account_quota > 0 and account_usage > account_quota):
1127
                    raise QuotaError(
1128
                        'Account quota exceeded: limit: %s, usage: %s' % (
1129
                            account_quota, account_usage))
1130

    
1131
            # Check container quota.
1132
            container_quota = long(self._get_policy(
1133
                container_node, is_account_policy=False)['quota'])
1134
            container_usage = self._get_statistics(container_node)[1]
1135
            if (container_quota > 0 and container_usage > container_quota):
1136
                # This must be executed in a transaction, so the version is
1137
                # never created if it fails.
1138
                raise QuotaError(
1139
                    'Container quota exceeded: limit: %s, usage: %s' % (
1140
                        container_quota, container_usage
1141
                    )
1142
                )
1143

    
1144
        if report_size_change:
1145
            self._report_size_change(
1146
                user, account, size_delta,
1147
                {'action': 'object update', 'path': path,
1148
                 'versions': ','.join([str(dest_version_id)])})
1149
        if permissions is not None:
1150
            self.permissions.access_set(path, permissions)
1151
            self._report_sharing_change(
1152
                user, account, path,
1153
                {'members': self.permissions.access_members(path)})
1154

    
1155
        self._report_object_change(
1156
            user, account, path,
1157
            details={'version': dest_version_id, 'action': 'object update'})
1158
        return dest_version_id
1159

    
1160
    @debug_method
1161
    def update_object_hashmap(self, user, account, container, name, size, type,
1162
                              hashmap, checksum, domain, meta=None,
1163
                              replace_meta=False, permissions=None):
1164
        """Create/update an object's hashmap and return the new version."""
1165

    
1166
        meta = meta or {}
1167
        if size == 0:  # No such thing as an empty hashmap.
1168
            hashmap = [self.put_block('')]
1169
        map = HashMap(self.block_size, self.hash_algorithm)
1170
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1171
        missing = self.store.block_search(map)
1172
        if missing:
1173
            ie = IndexError()
1174
            ie.data = [binascii.hexlify(x) for x in missing]
1175
            raise ie
1176

    
1177
        hash = map.hash()
1178
        hexlified = binascii.hexlify(hash)
1179
        # _update_object_hash() locks destination path
1180
        dest_version_id = self._update_object_hash(
1181
            user, account, container, name, size, type, hexlified, checksum,
1182
            domain, meta, replace_meta, permissions)
1183
        self.store.map_put(hash, map)
1184
        return dest_version_id, hexlified
1185

    
1186
    @debug_method
1187
    @backend_method
1188
    def update_object_checksum(self, user, account, container, name, version,
1189
                               checksum):
1190
        """Update an object's checksum."""
1191

    
1192
        # Update objects with greater version and same hashmap
1193
        # and size (fix metadata updates).
1194
        self._can_write_object(user, account, container, name)
1195
        path, node = self._lookup_object(account, container, name,
1196
                                         lock_container=True)
1197
        props = self._get_version(node, version)
1198
        versions = self.node.node_get_versions(node)
1199
        for x in versions:
1200
            if (x[self.SERIAL] >= int(version) and
1201
                x[self.HASH] == props[self.HASH] and
1202
                    x[self.SIZE] == props[self.SIZE]):
1203
                self.node.version_put_property(
1204
                    x[self.SERIAL], 'checksum', checksum)
1205

    
1206
    def _copy_object(self, user, src_account, src_container, src_name,
1207
                     dest_account, dest_container, dest_name, type,
1208
                     dest_domain=None, dest_meta=None, replace_meta=False,
1209
                     permissions=None, src_version=None, is_move=False,
1210
                     delimiter=None):
1211

    
1212
        report_size_change = not is_move
1213
        dest_meta = dest_meta or {}
1214
        dest_version_ids = []
1215
        self._can_read_object(user, src_account, src_container, src_name)
1216

    
1217
        src_container_path = '/'.join((src_account, src_container))
1218
        dest_container_path = '/'.join((dest_account, dest_container))
1219
        # Lock container paths in alphabetical order
1220
        if src_container_path < dest_container_path:
1221
            self._lookup_container(src_account, src_container)
1222
            self._lookup_container(dest_account, dest_container)
1223
        else:
1224
            self._lookup_container(dest_account, dest_container)
1225
            self._lookup_container(src_account, src_container)
1226

    
1227
        path, node = self._lookup_object(src_account, src_container, src_name)
1228
        # TODO: Will do another fetch of the properties in duplicate version...
1229
        props = self._get_version(
1230
            node, src_version)  # Check to see if source exists.
1231
        src_version_id = props[self.SERIAL]
1232
        hash = props[self.HASH]
1233
        size = props[self.SIZE]
1234
        is_copy = not is_move and (src_account, src_container, src_name) != (
1235
            dest_account, dest_container, dest_name)  # New uuid.
1236
        dest_version_ids.append(self._update_object_hash(
1237
            user, dest_account, dest_container, dest_name, size, type, hash,
1238
            None, dest_domain, dest_meta, replace_meta, permissions,
1239
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1240
            report_size_change=report_size_change))
1241
        if is_move and ((src_account, src_container, src_name) !=
1242
                        (dest_account, dest_container, dest_name)):
1243
            self._delete_object(user, src_account, src_container, src_name,
1244
                                report_size_change=report_size_change)
1245

    
1246
        if delimiter:
1247
            prefix = (src_name + delimiter if not
1248
                      src_name.endswith(delimiter) else src_name)
1249
            src_names = self._list_objects_no_limit(
1250
                user, src_account, src_container, prefix, delimiter=None,
1251
                virtual=False, domain=None, keys=[], shared=False, until=None,
1252
                size_range=None, all_props=True, public=False)
1253
            src_names.sort(key=lambda x: x[2])  # order by nodes
1254
            paths = [elem[0] for elem in src_names]
1255
            nodes = [elem[2] for elem in src_names]
1256
            # TODO: Will do another fetch of the properties
1257
            # in duplicate version...
1258
            props = self._get_versions(nodes)  # Check to see if source exists.
1259

    
1260
            for prop, path, node in zip(props, paths, nodes):
1261
                src_version_id = prop[self.SERIAL]
1262
                hash = prop[self.HASH]
1263
                vtype = prop[self.TYPE]
1264
                size = prop[self.SIZE]
1265
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1266
                    delimiter) else dest_name
1267
                vdest_name = path.replace(prefix, dest_prefix, 1)
1268
                # _update_object_hash() locks destination path
1269
                dest_version_ids.append(self._update_object_hash(
1270
                    user, dest_account, dest_container, vdest_name, size,
1271
                    vtype, hash, None, dest_domain, meta={},
1272
                    replace_meta=False, permissions=None, src_node=node,
1273
                    src_version_id=src_version_id, is_copy=is_copy,
1274
                    report_size_change=report_size_change))
1275
                if is_move and ((src_account, src_container, src_name) !=
1276
                                (dest_account, dest_container, dest_name)):
1277
                    self._delete_object(user, src_account, src_container, path,
1278
                                        report_size_change=report_size_change)
1279
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1280
                dest_version_ids)
1281

    
1282
    @debug_method
1283
    @backend_method
1284
    def copy_object(self, user, src_account, src_container, src_name,
1285
                    dest_account, dest_container, dest_name, type, domain,
1286
                    meta=None, replace_meta=False, permissions=None,
1287
                    src_version=None, delimiter=None):
1288
        """Copy an object's data and metadata."""
1289

    
1290
        meta = meta or {}
1291
        dest_version_id = self._copy_object(
1292
            user, src_account, src_container, src_name, dest_account,
1293
            dest_container, dest_name, type, domain, meta, replace_meta,
1294
            permissions, src_version, False, delimiter)
1295
        return dest_version_id
1296

    
1297
    @debug_method
1298
    @backend_method
1299
    def move_object(self, user, src_account, src_container, src_name,
1300
                    dest_account, dest_container, dest_name, type, domain,
1301
                    meta=None, replace_meta=False, permissions=None,
1302
                    delimiter=None):
1303
        """Move an object's data and metadata."""
1304

    
1305
        meta = meta or {}
1306
        if user != src_account:
1307
            raise NotAllowedError
1308
        dest_version_id = self._move_object(
1309
            user, src_account, src_container, src_name, dest_account,
1310
            dest_container, dest_name, type, domain, meta, replace_meta,
1311
            permissions, None, delimiter=delimiter)
1312
        return dest_version_id
1313

    
1314
    def _delete_object(self, user, account, container, name, until=None,
1315
                       delimiter=None, report_size_change=True):
1316
        if user != account:
1317
            raise NotAllowedError
1318

    
1319
        # lookup object and lock container path also
1320
        path, node = self._lookup_object(account, container, name,
1321
                                         lock_container=True)
1322

    
1323
        if until is not None:
1324
            if node is None:
1325
                return
1326
            hashes = []
1327
            size = 0
1328
            serials = []
1329
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1330
                                           update_statistics_ancestors_depth=1)
1331
            hashes += h
1332
            size += s
1333
            serials += v
1334
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1335
                                           update_statistics_ancestors_depth=1)
1336
            hashes += h
1337
            if not self.free_versioning:
1338
                size += s
1339
            serials += v
1340
            for h in hashes:
1341
                self.store.map_delete(h)
1342
            self.node.node_purge(node, until, CLUSTER_DELETED,
1343
                                 update_statistics_ancestors_depth=1)
1344
            try:
1345
                self._get_version(node)
1346
            except NameError:
1347
                self.permissions.access_clear(path)
1348
            self._report_size_change(
1349
                user, account, -size, {
1350
                    'action': 'object purge',
1351
                    'path': path,
1352
                    'versions': ','.join(str(i) for i in serials)
1353
                }
1354
            )
1355
            return
1356

    
1357
        if not self._exists(node):
1358
            raise ItemNotExists('Object is deleted.')
1359

    
1360
        src_version_id, dest_version_id = self._put_version_duplicate(
1361
            user, node, size=0, type='', hash=None, checksum='',
1362
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1363
        del_size = self._apply_versioning(account, container, src_version_id,
1364
                                          update_statistics_ancestors_depth=1)
1365
        if report_size_change:
1366
            self._report_size_change(
1367
                user, account, -del_size,
1368
                {'action': 'object delete',
1369
                 'path': path,
1370
                 'versions': ','.join([str(dest_version_id)])})
1371
        self._report_object_change(
1372
            user, account, path, details={'action': 'object delete'})
1373
        self.permissions.access_clear(path)
1374

    
1375
        if delimiter:
1376
            prefix = name + delimiter if not name.endswith(delimiter) else name
1377
            src_names = self._list_objects_no_limit(
1378
                user, account, container, prefix, delimiter=None,
1379
                virtual=False, domain=None, keys=[], shared=False, until=None,
1380
                size_range=None, all_props=True, public=False)
1381
            paths = []
1382
            for t in src_names:
1383
                path = '/'.join((account, container, t[0]))
1384
                node = t[2]
1385
                if not self._exists(node):
1386
                    continue
1387
                src_version_id, dest_version_id = self._put_version_duplicate(
1388
                    user, node, size=0, type='', hash=None, checksum='',
1389
                    cluster=CLUSTER_DELETED,
1390
                    update_statistics_ancestors_depth=1)
1391
                del_size = self._apply_versioning(
1392
                    account, container, src_version_id,
1393
                    update_statistics_ancestors_depth=1)
1394
                if report_size_change:
1395
                    self._report_size_change(
1396
                        user, account, -del_size,
1397
                        {'action': 'object delete',
1398
                         'path': path,
1399
                         'versions': ','.join([str(dest_version_id)])})
1400
                self._report_object_change(
1401
                    user, account, path, details={'action': 'object delete'})
1402
                paths.append(path)
1403
            self.permissions.access_clear_bulk(paths)
1404

    
1405
        # remove all the cached allowed paths
1406
        # removing the specific path could be more expensive
1407
        self._reset_allowed_paths()
1408

    
1409
    @debug_method
1410
    @backend_method
1411
    def delete_object(self, user, account, container, name, until=None,
1412
                      prefix='', delimiter=None):
1413
        """Delete/purge an object."""
1414

    
1415
        self._delete_object(user, account, container, name, until, delimiter)
1416

    
1417
    @debug_method
1418
    @backend_method
1419
    def list_versions(self, user, account, container, name):
1420
        """Return a list of all object (version, version_timestamp) tuples."""
1421

    
1422
        self._can_read_object(user, account, container, name)
1423
        path, node = self._lookup_object(account, container, name)
1424
        versions = self.node.node_get_versions(node)
1425
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1426
                x[self.CLUSTER] != CLUSTER_DELETED]
1427

    
1428
    @debug_method
1429
    @backend_method
1430
    def get_uuid(self, user, uuid, check_permissions=True):
1431
        """Return the (account, container, name) for the UUID given."""
1432

    
1433
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1434
        if info is None:
1435
            raise NameError
1436
        path, serial = info
1437
        account, container, name = path.split('/', 2)
1438
        if check_permissions:
1439
            self._can_read_object(user, account, container, name)
1440
        return (account, container, name)
1441

    
1442
    @debug_method
1443
    @backend_method
1444
    def get_public(self, user, public):
1445
        """Return the (account, container, name) for the public id given."""
1446

    
1447
        path = self.permissions.public_path(public)
1448
        if path is None:
1449
            raise NameError
1450
        account, container, name = path.split('/', 2)
1451
        self._can_read_object(user, account, container, name)
1452
        return (account, container, name)
1453

    
1454
    def get_block(self, hash):
1455
        """Return a block's data."""
1456

    
1457
        logger.debug("get_block: %s", hash)
1458
        block = self.store.block_get(self._unhexlify_hash(hash))
1459
        if not block:
1460
            raise ItemNotExists('Block does not exist')
1461
        return block
1462

    
1463
    def put_block(self, data):
1464
        """Store a block and return the hash."""
1465

    
1466
        logger.debug("put_block: %s", len(data))
1467
        return binascii.hexlify(self.store.block_put(data))
1468

    
1469
    def update_block(self, hash, data, offset=0):
1470
        """Update a known block and return the hash."""
1471

    
1472
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1473
        if offset == 0 and len(data) == self.block_size:
1474
            return self.put_block(data)
1475
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1476
        return binascii.hexlify(h)
1477

    
1478
    # Path functions.
1479

    
1480
    def _generate_uuid(self):
1481
        return str(uuidlib.uuid4())
1482

    
1483
    def _put_object_node(self, path, parent, name):
1484
        path = '/'.join((path, name))
1485
        node = self.node.node_lookup(path)
1486
        if node is None:
1487
            node = self.node.node_create(parent, path)
1488
        return path, node
1489

    
1490
    def _put_path(self, user, parent, path,
1491
                  update_statistics_ancestors_depth=None):
1492
        node = self.node.node_create(parent, path)
1493
        self.node.version_create(node, None, 0, '', None, user,
1494
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1495
                                 update_statistics_ancestors_depth)
1496
        return node
1497

    
1498
    def _lookup_account(self, account, create=True):
1499
        node = self.node.node_lookup(account)
1500
        if node is None and create:
1501
            node = self._put_path(
1502
                account, self.ROOTNODE, account,
1503
                update_statistics_ancestors_depth=-1)  # User is account.
1504
        return account, node
1505

    
1506
    def _lookup_container(self, account, container):
1507
        for_update = True if self.lock_container_path else False
1508
        path = '/'.join((account, container))
1509
        node = self.node.node_lookup(path, for_update)
1510
        if node is None:
1511
            raise ItemNotExists('Container does not exist')
1512
        return path, node
1513

    
1514
    def _lookup_object(self, account, container, name, lock_container=False):
1515
        if lock_container:
1516
            self._lookup_container(account, container)
1517

    
1518
        path = '/'.join((account, container, name))
1519
        node = self.node.node_lookup(path)
1520
        if node is None:
1521
            raise ItemNotExists('Object does not exist')
1522
        return path, node
1523

    
1524
    def _lookup_objects(self, paths):
1525
        nodes = self.node.node_lookup_bulk(paths)
1526
        return paths, nodes
1527

    
1528
    def _get_properties(self, node, until=None):
1529
        """Return properties until the timestamp given."""
1530

    
1531
        before = until if until is not None else inf
1532
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1533
        if props is None and until is not None:
1534
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1535
        if props is None:
1536
            raise ItemNotExists('Path does not exist')
1537
        return props
1538

    
1539
    def _get_statistics(self, node, until=None, compute=False):
1540
        """Return (count, sum of size, timestamp) of everything under node."""
1541

    
1542
        if until is not None:
1543
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1544
        elif compute:
1545
            stats = self.node.statistics_latest(node,
1546
                                                except_cluster=CLUSTER_DELETED)
1547
        else:
1548
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1549
        if stats is None:
1550
            stats = (0, 0, 0)
1551
        return stats
1552

    
1553
    def _get_version(self, node, version=None):
1554
        if version is None:
1555
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1556
            if props is None:
1557
                raise ItemNotExists('Object does not exist')
1558
        else:
1559
            try:
1560
                version = int(version)
1561
            except ValueError:
1562
                raise VersionNotExists('Version does not exist')
1563
            props = self.node.version_get_properties(version, node=node)
1564
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1565
                raise VersionNotExists('Version does not exist')
1566
        return props
1567

    
1568
    def _get_versions(self, nodes):
1569
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1570

    
1571
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1572
                               type=None, hash=None, checksum=None,
1573
                               cluster=CLUSTER_NORMAL, is_copy=False,
1574
                               update_statistics_ancestors_depth=None):
1575
        """Create a new version of the node."""
1576

    
1577
        props = self.node.version_lookup(
1578
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1579
        if props is not None:
1580
            src_version_id = props[self.SERIAL]
1581
            src_hash = props[self.HASH]
1582
            src_size = props[self.SIZE]
1583
            src_type = props[self.TYPE]
1584
            src_checksum = props[self.CHECKSUM]
1585
        else:
1586
            src_version_id = None
1587
            src_hash = None
1588
            src_size = 0
1589
            src_type = ''
1590
            src_checksum = ''
1591
        if size is None:  # Set metadata.
1592
            hash = src_hash  # This way hash can be set to None
1593
                             # (account or container).
1594
            size = src_size
1595
        if type is None:
1596
            type = src_type
1597
        if checksum is None:
1598
            checksum = src_checksum
1599
        uuid = self._generate_uuid(
1600
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1601

    
1602
        if src_node is None:
1603
            pre_version_id = src_version_id
1604
        else:
1605
            pre_version_id = None
1606
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1607
            if props is not None:
1608
                pre_version_id = props[self.SERIAL]
1609
        if pre_version_id is not None:
1610
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1611
                                        update_statistics_ancestors_depth)
1612

    
1613
        dest_version_id, mtime = self.node.version_create(
1614
            node, hash, size, type, src_version_id, user, uuid, checksum,
1615
            cluster, update_statistics_ancestors_depth)
1616

    
1617
        self.node.attribute_unset_is_latest(node, dest_version_id)
1618

    
1619
        return pre_version_id, dest_version_id
1620

    
1621
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1622
                                node, meta, replace=False):
1623
        if src_version_id is not None:
1624
            self.node.attribute_copy(src_version_id, dest_version_id)
1625
        if not replace:
1626
            self.node.attribute_del(dest_version_id, domain, (
1627
                k for k, v in meta.iteritems() if v == ''))
1628
            self.node.attribute_set(dest_version_id, domain, node, (
1629
                (k, v) for k, v in meta.iteritems() if v != ''))
1630
        else:
1631
            self.node.attribute_del(dest_version_id, domain)
1632
            self.node.attribute_set(dest_version_id, domain, node, ((
1633
                k, v) for k, v in meta.iteritems()))
1634

    
1635
    def _put_metadata(self, user, node, domain, meta, replace=False,
1636
                      update_statistics_ancestors_depth=None):
1637
        """Create a new version and store metadata."""
1638

    
1639
        src_version_id, dest_version_id = self._put_version_duplicate(
1640
            user, node,
1641
            update_statistics_ancestors_depth=
1642
            update_statistics_ancestors_depth)
1643
        self._put_metadata_duplicate(
1644
            src_version_id, dest_version_id, domain, node, meta, replace)
1645
        return src_version_id, dest_version_id
1646

    
1647
    def _list_limits(self, listing, marker, limit):
1648
        start = 0
1649
        if marker:
1650
            try:
1651
                start = listing.index(marker) + 1
1652
            except ValueError:
1653
                pass
1654
        if not limit or limit > 10000:
1655
            limit = 10000
1656
        return start, limit
1657

    
1658
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1659
                                marker=None, limit=10000, virtual=True,
1660
                                domain=None, keys=None, until=None,
1661
                                size_range=None, allowed=None,
1662
                                all_props=False):
1663
        keys = keys or []
1664
        allowed = allowed or []
1665
        cont_prefix = path + '/'
1666
        prefix = cont_prefix + prefix
1667
        start = cont_prefix + marker if marker else None
1668
        before = until if until is not None else inf
1669
        filterq = keys if domain else []
1670
        sizeq = size_range
1671

    
1672
        objects, prefixes = self.node.latest_version_list(
1673
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1674
            allowed, domain, filterq, sizeq, all_props)
1675
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1676
        objects.sort(key=lambda x: x[0])
1677
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1678
        return objects
1679

    
1680
    # Reporting functions.
1681

    
1682
    @debug_method
1683
    @backend_method
1684
    def _report_size_change(self, user, account, size, details=None):
1685
        details = details or {}
1686

    
1687
        if size == 0:
1688
            return
1689

    
1690
        account_node = self._lookup_account(account, True)[1]
1691
        total = self._get_statistics(account_node, compute=True)[1]
1692
        details.update({'user': user, 'total': total})
1693
        self.messages.append(
1694
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1695
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1696

    
1697
        if not self.using_external_quotaholder:
1698
            return
1699

    
1700
        try:
1701
            name = details['path'] if 'path' in details else ''
1702
            serial = self.astakosclient.issue_one_commission(
1703
                holder=account,
1704
                source=DEFAULT_SOURCE,
1705
                provisions={'pithos.diskspace': size},
1706
                name=name)
1707
        except BaseException, e:
1708
            raise QuotaError(e)
1709
        else:
1710
            self.serials.append(serial)
1711

    
1712
    @debug_method
1713
    @backend_method
1714
    def _report_object_change(self, user, account, path, details=None):
1715
        details = details or {}
1716
        details.update({'user': user})
1717
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1718
                              account, QUEUE_INSTANCE_ID, 'object', path,
1719
                              details))
1720

    
1721
    @debug_method
1722
    @backend_method
1723
    def _report_sharing_change(self, user, account, path, details=None):
1724
        details = details or {}
1725
        details.update({'user': user})
1726
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1727
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1728
                              details))
1729

    
1730
    # Policy functions.
1731

    
1732
    def _check_policy(self, policy, is_account_policy=True):
1733
        default_policy = self.default_account_policy \
1734
            if is_account_policy else self.default_container_policy
1735
        for k in policy.keys():
1736
            if policy[k] == '':
1737
                policy[k] = default_policy.get(k)
1738
        for k, v in policy.iteritems():
1739
            if k == 'quota':
1740
                q = int(v)  # May raise ValueError.
1741
                if q < 0:
1742
                    raise ValueError
1743
            elif k == 'versioning':
1744
                if v not in ['auto', 'none']:
1745
                    raise ValueError
1746
            else:
1747
                raise ValueError
1748

    
1749
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1750
        default_policy = self.default_account_policy \
1751
            if is_account_policy else self.default_container_policy
1752
        if replace:
1753
            for k, v in default_policy.iteritems():
1754
                if k not in policy:
1755
                    policy[k] = v
1756
        self.node.policy_set(node, policy)
1757

    
1758
    def _get_policy(self, node, is_account_policy=True):
1759
        default_policy = self.default_account_policy \
1760
            if is_account_policy else self.default_container_policy
1761
        policy = default_policy.copy()
1762
        policy.update(self.node.policy_get(node))
1763
        return policy
1764

    
1765
    def _apply_versioning(self, account, container, version_id,
1766
                          update_statistics_ancestors_depth=None):
1767
        """Delete the provided version if such is the policy.
1768
           Return size of object removed.
1769
        """
1770

    
1771
        if version_id is None:
1772
            return 0
1773
        path, node = self._lookup_container(account, container)
1774
        versioning = self._get_policy(
1775
            node, is_account_policy=False)['versioning']
1776
        if versioning != 'auto':
1777
            hash, size = self.node.version_remove(
1778
                version_id, update_statistics_ancestors_depth)
1779
            self.store.map_delete(hash)
1780
            return size
1781
        elif self.free_versioning:
1782
            return self.node.version_get_properties(
1783
                version_id, keys=('size',))[0]
1784
        return 0
1785

    
1786
    # Access control functions.
1787

    
1788
    def _check_groups(self, groups):
1789
        # raise ValueError('Bad characters in groups')
1790
        pass
1791

    
1792
    def _check_permissions(self, path, permissions):
1793
        # raise ValueError('Bad characters in permissions')
1794
        pass
1795

    
1796
    def _get_formatted_paths(self, paths):
1797
        formatted = []
1798
        if len(paths) == 0:
1799
            return formatted
1800
        props = self.node.get_props(paths)
1801
        if props:
1802
            for prop in props:
1803
                if prop[1].split(';', 1)[0].strip() in (
1804
                        'application/directory', 'application/folder'):
1805
                    formatted.append((prop[0].rstrip('/') + '/',
1806
                                      self.MATCH_PREFIX))
1807
                formatted.append((prop[0], self.MATCH_EXACT))
1808
        return formatted
1809

    
1810
    def _get_permissions_path(self, account, container, name):
1811
        path = '/'.join((account, container, name))
1812
        permission_paths = self.permissions.access_inherit(path)
1813
        permission_paths.sort()
1814
        permission_paths.reverse()
1815
        for p in permission_paths:
1816
            if p == path:
1817
                return p
1818
            else:
1819
                if p.count('/') < 2:
1820
                    continue
1821
                node = self.node.node_lookup(p)
1822
                props = None
1823
                if node is not None:
1824
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1825
                if props is not None:
1826
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1827
                            'application/directory', 'application/folder'):
1828
                        return p
1829
        return None
1830

    
1831
    def _get_permissions_path_bulk(self, account, container, names):
1832
        formatted_paths = []
1833
        for name in names:
1834
            path = '/'.join((account, container, name))
1835
            formatted_paths.append(path)
1836
        permission_paths = self.permissions.access_inherit_bulk(
1837
            formatted_paths)
1838
        permission_paths.sort()
1839
        permission_paths.reverse()
1840
        permission_paths_list = []
1841
        lookup_list = []
1842
        for p in permission_paths:
1843
            if p in formatted_paths:
1844
                permission_paths_list.append(p)
1845
            else:
1846
                if p.count('/') < 2:
1847
                    continue
1848
                lookup_list.append(p)
1849

    
1850
        if len(lookup_list) > 0:
1851
            props = self.node.get_props(lookup_list)
1852
            if props:
1853
                for prop in props:
1854
                    if prop[1].split(';', 1)[0].strip() in (
1855
                            'application/directory', 'application/folder'):
1856
                        permission_paths_list.append(prop[0])
1857

    
1858
        if len(permission_paths_list) > 0:
1859
            return permission_paths_list
1860

    
1861
        return None
1862

    
1863
    def _reset_allowed_paths(self):
1864
        self.read_allowed_paths = defaultdict(set)
1865
        self.write_allowed_paths = defaultdict(set)
1866

    
1867
    @check_allowed_paths(action=0)
1868
    def _can_read_account(self, user, account):
1869
        if user != account:
1870
            if account not in self._allowed_accounts(user):
1871
                raise NotAllowedError
1872

    
1873
    @check_allowed_paths(action=1)
1874
    def _can_write_account(self, user, account):
1875
        if user != account:
1876
            raise NotAllowedError
1877

    
1878
    @check_allowed_paths(action=0)
1879
    def _can_read_container(self, user, account, container):
1880
        if user != account:
1881
            if container not in self._allowed_containers(user, account):
1882
                raise NotAllowedError
1883

    
1884
    @check_allowed_paths(action=1)
1885
    def _can_write_container(self, user, account, container):
1886
        if user != account:
1887
            raise NotAllowedError
1888

    
1889
    @check_allowed_paths(action=0)
1890
    def _can_read_object(self, user, account, container, name):
1891
        if user == account:
1892
            return True
1893
        path = '/'.join((account, container, name))
1894
        if self.permissions.public_get(path) is not None:
1895
            return True
1896
        path = self._get_permissions_path(account, container, name)
1897
        if not path:
1898
            raise NotAllowedError
1899
        if (not self.permissions.access_check(path, self.READ, user) and not
1900
                self.permissions.access_check(path, self.WRITE, user)):
1901
            raise NotAllowedError
1902

    
1903
    @check_allowed_paths(action=1)
1904
    def _can_write_object(self, user, account, container, name):
1905
        if user == account:
1906
            return True
1907
        path = '/'.join((account, container, name))
1908
        path = self._get_permissions_path(account, container, name)
1909
        if not path:
1910
            raise NotAllowedError
1911
        if not self.permissions.access_check(path, self.WRITE, user):
1912
            raise NotAllowedError
1913

    
1914
    def _allowed_accounts(self, user):
1915
        allow = set()
1916
        for path in self.permissions.access_list_paths(user):
1917
            p = path.split('/', 1)[0]
1918
            allow.add(p)
1919
        self.read_allowed_paths[user] |= allow
1920
        return sorted(allow)
1921

    
1922
    def _allowed_containers(self, user, account):
1923
        allow = set()
1924
        for path in self.permissions.access_list_paths(user, account):
1925
            p = path.split('/', 2)[1]
1926
            allow.add(p)
1927
        self.read_allowed_paths[user] |= allow
1928
        return sorted(allow)
1929

    
1930
    # Domain functions
1931

    
1932
    @debug_method
1933
    @backend_method
1934
    def get_domain_objects(self, domain, user=None):
1935
        allowed_paths = self.permissions.access_list_paths(
1936
            user, include_owned=user is not None, include_containers=False)
1937
        if not allowed_paths:
1938
            return []
1939
        obj_list = self.node.domain_object_list(
1940
            domain, allowed_paths, CLUSTER_NORMAL)
1941
        return [(path,
1942
                 self._build_metadata(props, user_defined_meta),
1943
                 self.permissions.access_get(path)) for
1944
                path, props, user_defined_meta in obj_list]
1945

    
1946
    # util functions
1947

    
1948
    def _build_metadata(self, props, user_defined=None,
1949
                        include_user_defined=True):
1950
        meta = {'bytes': props[self.SIZE],
1951
                'type': props[self.TYPE],
1952
                'hash': props[self.HASH],
1953
                'version': props[self.SERIAL],
1954
                'version_timestamp': props[self.MTIME],
1955
                'modified_by': props[self.MUSER],
1956
                'uuid': props[self.UUID],
1957
                'checksum': props[self.CHECKSUM]}
1958
        if include_user_defined and user_defined is not None:
1959
            meta.update(user_defined)
1960
        return meta
1961

    
1962
    def _exists(self, node):
1963
        try:
1964
            self._get_version(node)
1965
        except ItemNotExists:
1966
            return False
1967
        else:
1968
            return True
1969

    
1970
    def _unhexlify_hash(self, hash):
1971
        try:
1972
            return binascii.unhexlify(hash)
1973
        except TypeError:
1974
            raise InvalidHash(hash)