Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 0c6ab9df

History | View | Annotate | Download (78.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    def list_accounts(self, user, marker=None, limit=10000):
388
        """Return a list of accounts the user can access."""
389

    
390
        allowed = self._allowed_accounts(user)
391
        start, limit = self._list_limits(allowed, marker, limit)
392
        return allowed[start:start + limit]
393

    
394
    def _get_account_quotas(self, account):
395
        """Get account usage from astakos."""
396

    
397
        quotas = self.astakosclient.service_get_quotas(account)[account]
398
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
399
                                                  {})
400

    
401
    @debug_method
402
    @backend_method
403
    def get_account_meta(self, user, account, domain, until=None,
404
                         include_user_defined=True):
405
        """Return a dictionary with the account metadata for the domain."""
406

    
407
        self._can_read_account(user, account)
408
        path, node = self._lookup_account(account, user == account)
409
        if user != account:
410
            if until or (node is None):
411
                raise NotAllowedError
412
        try:
413
            props = self._get_properties(node, until)
414
            mtime = props[self.MTIME]
415
        except NameError:
416
            props = None
417
            mtime = until
418
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
419
        tstamp = max(tstamp, mtime)
420
        if until is None:
421
            modified = tstamp
422
        else:
423
            modified = self._get_statistics(
424
                node, compute=True)[2]  # Overall last modification.
425
            modified = max(modified, mtime)
426

    
427
        if user != account:
428
            meta = {'name': account}
429
        else:
430
            meta = {}
431
            if props is not None and include_user_defined:
432
                meta.update(
433
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
434
            if until is not None:
435
                meta.update({'until_timestamp': tstamp})
436
            meta.update({'name': account, 'count': count, 'bytes': bytes})
437
            if self.using_external_quotaholder:
438
                external_quota = self._get_account_quotas(account)
439
                meta['bytes'] = external_quota.get('usage', 0)
440
        meta.update({'modified': modified})
441
        return meta
442

    
443
    @debug_method
444
    @backend_method
445
    def update_account_meta(self, user, account, domain, meta, replace=False):
446
        """Update the metadata associated with the account for the domain."""
447

    
448
        self._can_write_account(user, account)
449
        path, node = self._lookup_account(account, True)
450
        self._put_metadata(user, node, domain, meta, replace,
451
                           update_statistics_ancestors_depth=-1)
452

    
453
    @debug_method
454
    @backend_method
455
    def get_account_groups(self, user, account):
456
        """Return a dictionary with the user groups defined for the account."""
457

    
458
        self._can_read_account(user, account)
459
        if user != account:
460
            return {}
461
        self._lookup_account(account, True)
462
        return self.permissions.group_dict(account)
463

    
464
    @debug_method
465
    @backend_method
466
    def update_account_groups(self, user, account, groups, replace=False):
467
        """Update the groups associated with the account."""
468

    
469
        self._can_write_account(user, account)
470
        self._lookup_account(account, True)
471
        self._check_groups(groups)
472
        if replace:
473
            self.permissions.group_destroy(account)
474
        for k, v in groups.iteritems():
475
            if not replace:  # If not already deleted.
476
                self.permissions.group_delete(account, k)
477
            if v:
478
                self.permissions.group_addmany(account, k, v)
479

    
480
    @debug_method
481
    @backend_method
482
    def get_account_policy(self, user, account):
483
        """Return a dictionary with the account policy."""
484

    
485
        self._can_read_account(user, account)
486
        if user != account:
487
            return {}
488
        path, node = self._lookup_account(account, True)
489
        policy = self._get_policy(node, is_account_policy=True)
490
        if self.using_external_quotaholder:
491
            external_quota = self._get_account_quotas(account)
492
            policy['quota'] = external_quota.get('limit', 0)
493
        return policy
494

    
495
    @debug_method
496
    @backend_method
497
    def update_account_policy(self, user, account, policy, replace=False):
498
        """Update the policy associated with the account."""
499

    
500
        self._can_write_account(user, account)
501
        path, node = self._lookup_account(account, True)
502
        self._check_policy(policy, is_account_policy=True)
503
        self._put_policy(node, policy, replace, is_account_policy=True)
504

    
505
    @debug_method
506
    @backend_method
507
    def put_account(self, user, account, policy=None):
508
        """Create a new account with the given name."""
509

    
510
        policy = policy or {}
511
        self._can_write_account(user, account)
512
        node = self.node.node_lookup(account)
513
        if node is not None:
514
            raise AccountExists('Account already exists')
515
        if policy:
516
            self._check_policy(policy, is_account_policy=True)
517
        node = self._put_path(user, self.ROOTNODE, account,
518
                              update_statistics_ancestors_depth=-1)
519
        self._put_policy(node, policy, True, is_account_policy=True)
520

    
521
    @debug_method
522
    @backend_method
523
    def delete_account(self, user, account):
524
        """Delete the account with the given name."""
525

    
526
        self._can_write_account(user, account)
527
        node = self.node.node_lookup(account)
528
        if node is None:
529
            return
530
        if not self.node.node_remove(node,
531
                                     update_statistics_ancestors_depth=-1):
532
            raise AccountNotEmpty('Account is not empty')
533
        self.permissions.group_destroy(account)
534

    
535
        # remove all the cached allowed paths
536
        # removing the specific path could be more expensive
537
        self._reset_allowed_paths()
538

    
539
    @debug_method
540
    @backend_method
541
    def list_containers(self, user, account, marker=None, limit=10000,
542
                        shared=False, until=None, public=False):
543
        """Return a list of containers existing under an account."""
544

    
545
        self._can_read_account(user, account)
546
        if user != account:
547
            if until:
548
                raise NotAllowedError
549
            allowed = self._allowed_containers(user, account)
550
            start, limit = self._list_limits(allowed, marker, limit)
551
            return allowed[start:start + limit]
552
        if shared or public:
553
            allowed = set()
554
            if shared:
555
                allowed.update([x.split('/', 2)[1] for x in
556
                               self.permissions.access_list_shared(account)])
557
            if public:
558
                allowed.update([x[0].split('/', 2)[1] for x in
559
                               self.permissions.public_list(account)])
560
            allowed = sorted(allowed)
561
            start, limit = self._list_limits(allowed, marker, limit)
562
            return allowed[start:start + limit]
563
        node = self.node.node_lookup(account)
564
        containers = [x[0] for x in self._list_object_properties(
565
            node, account, '', '/', marker, limit, False, None, [], until)]
566
        start, limit = self._list_limits(
567
            [x[0] for x in containers], marker, limit)
568
        return containers[start:start + limit]
569

    
570
    @debug_method
571
    @backend_method
572
    def list_container_meta(self, user, account, container, domain,
573
                            until=None):
574
        """Return a list of the container's object meta keys for a domain."""
575

    
576
        self._can_read_container(user, account, container)
577
        allowed = []
578
        if user != account:
579
            if until:
580
                raise NotAllowedError
581
        path, node = self._lookup_container(account, container)
582
        before = until if until is not None else inf
583
        allowed = self._get_formatted_paths(allowed)
584
        return self.node.latest_attribute_keys(node, domain, before,
585
                                               CLUSTER_DELETED, allowed)
586

    
587
    @debug_method
588
    @backend_method
589
    def get_container_meta(self, user, account, container, domain, until=None,
590
                           include_user_defined=True):
591
        """Return a dictionary with the container metadata for the domain."""
592

    
593
        self._can_read_container(user, account, container)
594
        if user != account:
595
            if until:
596
                raise NotAllowedError
597
        path, node = self._lookup_container(account, container)
598
        props = self._get_properties(node, until)
599
        mtime = props[self.MTIME]
600
        count, bytes, tstamp = self._get_statistics(node, until)
601
        tstamp = max(tstamp, mtime)
602
        if until is None:
603
            modified = tstamp
604
        else:
605
            modified = self._get_statistics(
606
                node)[2]  # Overall last modification.
607
            modified = max(modified, mtime)
608

    
609
        if user != account:
610
            meta = {'name': container}
611
        else:
612
            meta = {}
613
            if include_user_defined:
614
                meta.update(
615
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
616
            if until is not None:
617
                meta.update({'until_timestamp': tstamp})
618
            meta.update({'name': container, 'count': count, 'bytes': bytes})
619
        meta.update({'modified': modified})
620
        return meta
621

    
622
    @debug_method
623
    @backend_method
624
    def update_container_meta(self, user, account, container, domain, meta,
625
                              replace=False):
626
        """Update the metadata associated with the container for the domain."""
627

    
628
        self._can_write_container(user, account, container)
629
        path, node = self._lookup_container(account, container)
630
        src_version_id, dest_version_id = self._put_metadata(
631
            user, node, domain, meta, replace,
632
            update_statistics_ancestors_depth=0)
633
        if src_version_id is not None:
634
            versioning = self._get_policy(
635
                node, is_account_policy=False)['versioning']
636
            if versioning != 'auto':
637
                self.node.version_remove(src_version_id,
638
                                         update_statistics_ancestors_depth=0)
639

    
640
    @debug_method
641
    @backend_method
642
    def get_container_policy(self, user, account, container):
643
        """Return a dictionary with the container policy."""
644

    
645
        self._can_read_container(user, account, container)
646
        if user != account:
647
            return {}
648
        path, node = self._lookup_container(account, container)
649
        return self._get_policy(node, is_account_policy=False)
650

    
651
    @debug_method
652
    @backend_method
653
    def update_container_policy(self, user, account, container, policy,
654
                                replace=False):
655
        """Update the policy associated with the container."""
656

    
657
        self._can_write_container(user, account, container)
658
        path, node = self._lookup_container(account, container)
659
        self._check_policy(policy, is_account_policy=False)
660
        self._put_policy(node, policy, replace, is_account_policy=False)
661

    
662
    @debug_method
663
    @backend_method
664
    def put_container(self, user, account, container, policy=None):
665
        """Create a new container with the given name."""
666

    
667
        policy = policy or {}
668
        self._can_write_container(user, account, container)
669
        try:
670
            path, node = self._lookup_container(account, container)
671
        except NameError:
672
            pass
673
        else:
674
            raise ContainerExists('Container already exists')
675
        if policy:
676
            self._check_policy(policy, is_account_policy=False)
677
        path = '/'.join((account, container))
678
        node = self._put_path(
679
            user, self._lookup_account(account, True)[1], path,
680
            update_statistics_ancestors_depth=-1)
681
        self._put_policy(node, policy, True, is_account_policy=False)
682

    
683
    @debug_method
684
    @backend_method
685
    def delete_container(self, user, account, container, until=None, prefix='',
686
                         delimiter=None, listing_limit=None):
687
        """Delete/purge the container with the given name."""
688

    
689
        self._can_write_container(user, account, container)
690
        path, node = self._lookup_container(account, container)
691

    
692
        if until is not None:
693
            hashes, size, serials = self.node.node_purge_children(
694
                node, until, CLUSTER_HISTORY,
695
                update_statistics_ancestors_depth=0)
696
            for h in hashes:
697
                self.store.map_delete(h)
698
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
699
                                          update_statistics_ancestors_depth=0)
700
            if not self.free_versioning:
701
                self._report_size_change(
702
                    user, account, -size, {
703
                        'action': 'container purge',
704
                        'path': path,
705
                        'versions': ','.join(str(i) for i in serials)
706
                    }
707
                )
708
            return
709

    
710
        if not delimiter:
711
            if self._get_statistics(node)[0] > 0:
712
                raise ContainerNotEmpty('Container is not empty')
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, inf, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
721
            if not self.free_versioning:
722
                self._report_size_change(
723
                    user, account, -size, {
724
                        'action': 'container purge',
725
                        'path': path,
726
                        'versions': ','.join(str(i) for i in serials)
727
                    }
728
                )
729
        else:
730
            # remove only contents
731
            src_names = self._list_objects_no_limit(
732
                user, account, container, prefix='', delimiter=None,
733
                virtual=False, domain=None, keys=[], shared=False, until=None,
734
                size_range=None, all_props=True, public=False,
735
                listing_limit=listing_limit)
736
            paths = []
737
            for t in src_names:
738
                path = '/'.join((account, container, t[0]))
739
                node = t[2]
740
                if not self._exists(node):
741
                    continue
742
                src_version_id, dest_version_id = self._put_version_duplicate(
743
                    user, node, size=0, type='', hash=None, checksum='',
744
                    cluster=CLUSTER_DELETED,
745
                    update_statistics_ancestors_depth=1)
746
                del_size = self._apply_versioning(
747
                    account, container, src_version_id,
748
                    update_statistics_ancestors_depth=1)
749
                self._report_size_change(
750
                    user, account, -del_size, {
751
                        'action': 'object delete',
752
                        'path': path,
753
                        'versions': ','.join([str(dest_version_id)])})
754
                self._report_object_change(
755
                    user, account, path, details={'action': 'object delete'})
756
                paths.append(path)
757
            self.permissions.access_clear_bulk(paths)
758

    
759
        # remove all the cached allowed paths
760
        # removing the specific path could be more expensive
761
        self._reset_allowed_paths()
762

    
763
    def _list_objects(self, user, account, container, prefix, delimiter,
764
                      marker, limit, virtual, domain, keys, shared, until,
765
                      size_range, all_props, public):
766
        if user != account and until:
767
            raise NotAllowedError
768
        if shared and public:
769
            # get shared first
770
            shared_paths = self._list_object_permissions(
771
                user, account, container, prefix, shared=True, public=False)
772
            objects = set()
773
            if shared_paths:
774
                path, node = self._lookup_container(account, container)
775
                shared_paths = self._get_formatted_paths(shared_paths)
776
                objects |= set(self._list_object_properties(
777
                    node, path, prefix, delimiter, marker, limit, virtual,
778
                    domain, keys, until, size_range, shared_paths, all_props))
779

    
780
            # get public
781
            objects |= set(self._list_public_object_properties(
782
                user, account, container, prefix, all_props))
783
            objects = list(objects)
784

    
785
            objects.sort(key=lambda x: x[0])
786
            start, limit = self._list_limits(
787
                [x[0] for x in objects], marker, limit)
788
            return objects[start:start + limit]
789
        elif public:
790
            objects = self._list_public_object_properties(
791
                user, account, container, prefix, all_props)
792
            start, limit = self._list_limits(
793
                [x[0] for x in objects], marker, limit)
794
            return objects[start:start + limit]
795

    
796
        allowed = self._list_object_permissions(
797
            user, account, container, prefix, shared, public)
798
        if shared and not allowed:
799
            return []
800
        path, node = self._lookup_container(account, container)
801
        allowed = self._get_formatted_paths(allowed)
802
        objects = self._list_object_properties(
803
            node, path, prefix, delimiter, marker, limit, virtual, domain,
804
            keys, until, size_range, allowed, all_props)
805
        start, limit = self._list_limits(
806
            [x[0] for x in objects], marker, limit)
807
        return objects[start:start + limit]
808

    
809
    def _list_public_object_properties(self, user, account, container, prefix,
810
                                       all_props):
811
        public = self._list_object_permissions(
812
            user, account, container, prefix, shared=False, public=True)
813
        paths, nodes = self._lookup_objects(public)
814
        path = '/'.join((account, container))
815
        cont_prefix = path + '/'
816
        paths = [x[len(cont_prefix):] for x in paths]
817
        objects = [(p,) + props for p, props in
818
                   zip(paths, self.node.version_lookup_bulk(
819
                       nodes, all_props=all_props, order_by_path=True))]
820
        return objects
821

    
822
    def _list_objects_no_limit(self, user, account, container, prefix,
823
                               delimiter, virtual, domain, keys, shared, until,
824
                               size_range, all_props, public,
825
                               listing_limit=10000):
826
        objects = []
827
        while True:
828
            marker = objects[-1][0] if objects else None
829
            limit = listing_limit
830
            l = self._list_objects(
831
                user, account, container, prefix, delimiter, marker, limit,
832
                virtual, domain, keys, shared, until, size_range, all_props,
833
                public)
834
            objects.extend(l)
835
            if not l or len(l) < limit:
836
                break
837
        return objects
838

    
839
    def _list_object_permissions(self, user, account, container, prefix,
840
                                 shared, public):
841
        allowed = []
842
        path = '/'.join((account, container, prefix)).rstrip('/')
843
        if user != account:
844
            allowed = self.permissions.access_list_paths(user, path)
845
            if not allowed:
846
                raise NotAllowedError
847
        else:
848
            allowed = set()
849
            if shared:
850
                allowed.update(self.permissions.access_list_shared(path))
851
            if public:
852
                allowed.update(
853
                    [x[0] for x in self.permissions.public_list(path)])
854
            allowed = sorted(allowed)
855
            if not allowed:
856
                return []
857
        return allowed
858

    
859
    @debug_method
860
    @backend_method
861
    def list_objects(self, user, account, container, prefix='', delimiter=None,
862
                     marker=None, limit=10000, virtual=True, domain=None,
863
                     keys=None, shared=False, until=None, size_range=None,
864
                     public=False):
865
        """List (object name, object version_id) under a container."""
866

    
867
        keys = keys or []
868
        return self._list_objects(
869
            user, account, container, prefix, delimiter, marker, limit,
870
            virtual, domain, keys, shared, until, size_range, False, public)
871

    
872
    @debug_method
873
    @backend_method
874
    def list_object_meta(self, user, account, container, prefix='',
875
                         delimiter=None, marker=None, limit=10000,
876
                         virtual=True, domain=None, keys=None, shared=False,
877
                         until=None, size_range=None, public=False):
878
        """Return a list of metadata dicts of objects under a container."""
879

    
880
        keys = keys or []
881
        props = self._list_objects(
882
            user, account, container, prefix, delimiter, marker, limit,
883
            virtual, domain, keys, shared, until, size_range, True, public)
884
        objects = []
885
        for p in props:
886
            if len(p) == 2:
887
                objects.append({'subdir': p[0]})
888
            else:
889
                objects.append({
890
                    'name': p[0],
891
                    'bytes': p[self.SIZE + 1],
892
                    'type': p[self.TYPE + 1],
893
                    'hash': p[self.HASH + 1],
894
                    'version': p[self.SERIAL + 1],
895
                    'version_timestamp': p[self.MTIME + 1],
896
                    'modified': p[self.MTIME + 1] if until is None else None,
897
                    'modified_by': p[self.MUSER + 1],
898
                    'uuid': p[self.UUID + 1],
899
                    'checksum': p[self.CHECKSUM + 1]})
900
        return objects
901

    
902
    @debug_method
903
    @backend_method
904
    def list_object_permissions(self, user, account, container, prefix=''):
905
        """Return a list of paths enforce permissions under a container."""
906

    
907
        return self._list_object_permissions(user, account, container, prefix,
908
                                             True, False)
909

    
910
    @debug_method
911
    @backend_method
912
    def list_object_public(self, user, account, container, prefix=''):
913
        """Return a mapping of object paths to public ids under a container."""
914

    
915
        public = {}
916
        for path, p in self.permissions.public_list('/'.join((account,
917
                                                              container,
918
                                                              prefix))):
919
            public[path] = p
920
        return public
921

    
922
    @debug_method
923
    @backend_method
924
    def get_object_meta(self, user, account, container, name, domain,
925
                        version=None, include_user_defined=True):
926
        """Return a dictionary with the object metadata for the domain."""
927

    
928
        self._can_read_object(user, account, container, name)
929
        path, node = self._lookup_object(account, container, name)
930
        props = self._get_version(node, version)
931
        if version is None:
932
            modified = props[self.MTIME]
933
        else:
934
            try:
935
                modified = self._get_version(
936
                    node)[self.MTIME]  # Overall last modification.
937
            except NameError:  # Object may be deleted.
938
                del_props = self.node.version_lookup(
939
                    node, inf, CLUSTER_DELETED)
940
                if del_props is None:
941
                    raise ItemNotExists('Object does not exist')
942
                modified = del_props[self.MTIME]
943

    
944
        meta = {}
945
        if include_user_defined:
946
            meta.update(
947
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
948
        meta.update({'name': name,
949
                     'bytes': props[self.SIZE],
950
                     'type': props[self.TYPE],
951
                     'hash': props[self.HASH],
952
                     'version': props[self.SERIAL],
953
                     'version_timestamp': props[self.MTIME],
954
                     'modified': modified,
955
                     'modified_by': props[self.MUSER],
956
                     'uuid': props[self.UUID],
957
                     'checksum': props[self.CHECKSUM]})
958
        return meta
959

    
960
    @debug_method
961
    @backend_method
962
    def update_object_meta(self, user, account, container, name, domain, meta,
963
                           replace=False):
964
        """Update object metadata for a domain and return the new version."""
965

    
966
        self._can_write_object(user, account, container, name)
967

    
968
        path, node = self._lookup_object(account, container, name,
969
                                         lock_container=True)
970
        src_version_id, dest_version_id = self._put_metadata(
971
            user, node, domain, meta, replace,
972
            update_statistics_ancestors_depth=1)
973
        self._apply_versioning(account, container, src_version_id,
974
                               update_statistics_ancestors_depth=1)
975
        return dest_version_id
976

    
977
    @debug_method
978
    @backend_method
979
    def get_object_permissions_bulk(self, user, account, container, names):
980
        """Return the action allowed on the object, the path
981
        from which the object gets its permissions from,
982
        along with a dictionary containing the permissions."""
983

    
984
        permissions_path = self._get_permissions_path_bulk(account, container,
985
                                                           names)
986
        access_objects = self.permissions.access_check_bulk(permissions_path,
987
                                                            user)
988
        #group_parents = access_objects['group_parents']
989
        nobject_permissions = {}
990
        cpath = '/'.join((account, container, ''))
991
        cpath_idx = len(cpath)
992
        for path in permissions_path:
993
            allowed = 1
994
            name = path[cpath_idx:]
995
            if user != account:
996
                try:
997
                    allowed = access_objects[path]
998
                except KeyError:
999
                    raise NotAllowedError
1000
            access_dict, allowed = \
1001
                self.permissions.access_get_for_bulk(access_objects[path])
1002
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1003
                                         access_dict)
1004
        self._lookup_objects(permissions_path)
1005
        return nobject_permissions
1006

    
1007
    @debug_method
1008
    @backend_method
1009
    def get_object_permissions(self, user, account, container, name):
1010
        """Return the action allowed on the object, the path
1011
        from which the object gets its permissions from,
1012
        along with a dictionary containing the permissions."""
1013

    
1014
        allowed = 'write'
1015
        permissions_path = self._get_permissions_path(account, container, name)
1016
        if user != account:
1017
            if self.permissions.access_check(permissions_path, self.WRITE,
1018
                                             user):
1019
                allowed = 'write'
1020
            elif self.permissions.access_check(permissions_path, self.READ,
1021
                                               user):
1022
                allowed = 'read'
1023
            else:
1024
                raise NotAllowedError
1025
        self._lookup_object(account, container, name)
1026
        return (allowed,
1027
                permissions_path,
1028
                self.permissions.access_get(permissions_path))
1029

    
1030
    @debug_method
1031
    @backend_method
1032
    def update_object_permissions(self, user, account, container, name,
1033
                                  permissions):
1034
        """Update the permissions associated with the object."""
1035

    
1036
        if user != account:
1037
            raise NotAllowedError
1038
        path = self._lookup_object(account, container, name,
1039
                                   lock_container=True)[0]
1040
        self._check_permissions(path, permissions)
1041
        try:
1042
            self.permissions.access_set(path, permissions)
1043
        except:
1044
            raise ValueError
1045
        else:
1046
            self._report_sharing_change(user, account, path, {'members':
1047
                                        self.permissions.access_members(path)})
1048

    
1049
        # remove all the cached allowed paths
1050
        # filtering out only those affected could be more expensive
1051
        self._reset_allowed_paths()
1052

    
1053
    @debug_method
1054
    @backend_method
1055
    def get_object_public(self, user, account, container, name):
1056
        """Return the public id of the object if applicable."""
1057

    
1058
        self._can_read_object(user, account, container, name)
1059
        path = self._lookup_object(account, container, name)[0]
1060
        p = self.permissions.public_get(path)
1061
        return p
1062

    
1063
    @debug_method
1064
    @backend_method
1065
    def update_object_public(self, user, account, container, name, public):
1066
        """Update the public status of the object."""
1067

    
1068
        self._can_write_object(user, account, container, name)
1069
        path = self._lookup_object(account, container, name,
1070
                                   lock_container=True)[0]
1071
        if not public:
1072
            self.permissions.public_unset(path)
1073
        else:
1074
            self.permissions.public_set(
1075
                path, self.public_url_security, self.public_url_alphabet)
1076

    
1077
    @debug_method
1078
    @backend_method
1079
    def get_object_hashmap(self, user, account, container, name, version=None):
1080
        """Return the object's size and a list with partial hashes."""
1081

    
1082
        self._can_read_object(user, account, container, name)
1083
        path, node = self._lookup_object(account, container, name)
1084
        props = self._get_version(node, version)
1085
        if props[self.HASH] is None:
1086
            return 0, ()
1087
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1088
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1089

    
1090
    def _update_object_hash(self, user, account, container, name, size, type,
1091
                            hash, checksum, domain, meta, replace_meta,
1092
                            permissions, src_node=None, src_version_id=None,
1093
                            is_copy=False, report_size_change=True):
1094
        if permissions is not None and user != account:
1095
            raise NotAllowedError
1096
        self._can_write_object(user, account, container, name)
1097
        if permissions is not None:
1098
            path = '/'.join((account, container, name))
1099
            self._check_permissions(path, permissions)
1100

    
1101
        account_path, account_node = self._lookup_account(account, True)
1102
        container_path, container_node = self._lookup_container(
1103
            account, container)
1104

    
1105
        path, node = self._put_object_node(
1106
            container_path, container_node, name)
1107
        pre_version_id, dest_version_id = self._put_version_duplicate(
1108
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1109
            checksum=checksum, is_copy=is_copy,
1110
            update_statistics_ancestors_depth=1)
1111

    
1112
        # Handle meta.
1113
        if src_version_id is None:
1114
            src_version_id = pre_version_id
1115
        self._put_metadata_duplicate(
1116
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1117

    
1118
        del_size = self._apply_versioning(account, container, pre_version_id,
1119
                                          update_statistics_ancestors_depth=1)
1120
        size_delta = size - del_size
1121
        if size_delta > 0:
1122
            # Check account quota.
1123
            if not self.using_external_quotaholder:
1124
                account_quota = long(self._get_policy(
1125
                    account_node, is_account_policy=True)['quota'])
1126
                account_usage = self._get_statistics(account_node,
1127
                                                     compute=True)[1]
1128
                if (account_quota > 0 and account_usage > account_quota):
1129
                    raise QuotaError(
1130
                        'Account quota exceeded: limit: %s, usage: %s' % (
1131
                            account_quota, account_usage))
1132

    
1133
            # Check container quota.
1134
            container_quota = long(self._get_policy(
1135
                container_node, is_account_policy=False)['quota'])
1136
            container_usage = self._get_statistics(container_node)[1]
1137
            if (container_quota > 0 and container_usage > container_quota):
1138
                # This must be executed in a transaction, so the version is
1139
                # never created if it fails.
1140
                raise QuotaError(
1141
                    'Container quota exceeded: limit: %s, usage: %s' % (
1142
                        container_quota, container_usage
1143
                    )
1144
                )
1145

    
1146
        if report_size_change:
1147
            self._report_size_change(
1148
                user, account, size_delta,
1149
                {'action': 'object update', 'path': path,
1150
                 'versions': ','.join([str(dest_version_id)])})
1151
        if permissions is not None:
1152
            self.permissions.access_set(path, permissions)
1153
            self._report_sharing_change(
1154
                user, account, path,
1155
                {'members': self.permissions.access_members(path)})
1156

    
1157
        self._report_object_change(
1158
            user, account, path,
1159
            details={'version': dest_version_id, 'action': 'object update'})
1160
        return dest_version_id
1161

    
1162
    @debug_method
1163
    def update_object_hashmap(self, user, account, container, name, size, type,
1164
                              hashmap, checksum, domain, meta=None,
1165
                              replace_meta=False, permissions=None):
1166
        """Create/update an object's hashmap and return the new version."""
1167

    
1168
        meta = meta or {}
1169
        if size == 0:  # No such thing as an empty hashmap.
1170
            hashmap = [self.put_block('')]
1171
        map = HashMap(self.block_size, self.hash_algorithm)
1172
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1173
        missing = self.store.block_search(map)
1174
        if missing:
1175
            ie = IndexError()
1176
            ie.data = [binascii.hexlify(x) for x in missing]
1177
            raise ie
1178

    
1179
        hash = map.hash()
1180
        hexlified = binascii.hexlify(hash)
1181
        # _update_object_hash() locks destination path
1182
        dest_version_id = self._update_object_hash(
1183
            user, account, container, name, size, type, hexlified, checksum,
1184
            domain, meta, replace_meta, permissions)
1185
        self.store.map_put(hash, map)
1186
        return dest_version_id, hexlified
1187

    
1188
    @debug_method
1189
    @backend_method
1190
    def update_object_checksum(self, user, account, container, name, version,
1191
                               checksum):
1192
        """Update an object's checksum."""
1193

    
1194
        # Update objects with greater version and same hashmap
1195
        # and size (fix metadata updates).
1196
        self._can_write_object(user, account, container, name)
1197
        path, node = self._lookup_object(account, container, name,
1198
                                         lock_container=True)
1199
        props = self._get_version(node, version)
1200
        versions = self.node.node_get_versions(node)
1201
        for x in versions:
1202
            if (x[self.SERIAL] >= int(version) and
1203
                x[self.HASH] == props[self.HASH] and
1204
                    x[self.SIZE] == props[self.SIZE]):
1205
                self.node.version_put_property(
1206
                    x[self.SERIAL], 'checksum', checksum)
1207

    
1208
    def _copy_object(self, user, src_account, src_container, src_name,
1209
                     dest_account, dest_container, dest_name, type,
1210
                     dest_domain=None, dest_meta=None, replace_meta=False,
1211
                     permissions=None, src_version=None, is_move=False,
1212
                     delimiter=None, listing_limit=10000):
1213

    
1214
        report_size_change = not is_move
1215
        dest_meta = dest_meta or {}
1216
        dest_version_ids = []
1217
        self._can_read_object(user, src_account, src_container, src_name)
1218

    
1219
        src_container_path = '/'.join((src_account, src_container))
1220
        dest_container_path = '/'.join((dest_account, dest_container))
1221
        # Lock container paths in alphabetical order
1222
        if src_container_path < dest_container_path:
1223
            self._lookup_container(src_account, src_container)
1224
            self._lookup_container(dest_account, dest_container)
1225
        else:
1226
            self._lookup_container(dest_account, dest_container)
1227
            self._lookup_container(src_account, src_container)
1228

    
1229
        path, node = self._lookup_object(src_account, src_container, src_name)
1230
        # TODO: Will do another fetch of the properties in duplicate version...
1231
        props = self._get_version(
1232
            node, src_version)  # Check to see if source exists.
1233
        src_version_id = props[self.SERIAL]
1234
        hash = props[self.HASH]
1235
        size = props[self.SIZE]
1236
        is_copy = not is_move and (src_account, src_container, src_name) != (
1237
            dest_account, dest_container, dest_name)  # New uuid.
1238
        dest_version_ids.append(self._update_object_hash(
1239
            user, dest_account, dest_container, dest_name, size, type, hash,
1240
            None, dest_domain, dest_meta, replace_meta, permissions,
1241
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1242
            report_size_change=report_size_change))
1243
        if is_move and ((src_account, src_container, src_name) !=
1244
                        (dest_account, dest_container, dest_name)):
1245
            self._delete_object(user, src_account, src_container, src_name,
1246
                                report_size_change=report_size_change)
1247

    
1248
        if delimiter:
1249
            prefix = (src_name + delimiter if not
1250
                      src_name.endswith(delimiter) else src_name)
1251
            src_names = self._list_objects_no_limit(
1252
                user, src_account, src_container, prefix, delimiter=None,
1253
                virtual=False, domain=None, keys=[], shared=False, until=None,
1254
                size_range=None, all_props=True, public=False,
1255
                listing_limit=listing_limit)
1256
            src_names.sort(key=lambda x: x[2])  # order by nodes
1257
            paths = [elem[0] for elem in src_names]
1258
            nodes = [elem[2] for elem in src_names]
1259
            # TODO: Will do another fetch of the properties
1260
            # in duplicate version...
1261
            props = self._get_versions(nodes)  # Check to see if source exists.
1262

    
1263
            for prop, path, node in zip(props, paths, nodes):
1264
                src_version_id = prop[self.SERIAL]
1265
                hash = prop[self.HASH]
1266
                vtype = prop[self.TYPE]
1267
                size = prop[self.SIZE]
1268
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1269
                    delimiter) else dest_name
1270
                vdest_name = path.replace(prefix, dest_prefix, 1)
1271
                # _update_object_hash() locks destination path
1272
                dest_version_ids.append(self._update_object_hash(
1273
                    user, dest_account, dest_container, vdest_name, size,
1274
                    vtype, hash, None, dest_domain, meta={},
1275
                    replace_meta=False, permissions=None, src_node=node,
1276
                    src_version_id=src_version_id, is_copy=is_copy,
1277
                    report_size_change=report_size_change))
1278
                if is_move and ((src_account, src_container, src_name) !=
1279
                                (dest_account, dest_container, dest_name)):
1280
                    self._delete_object(user, src_account, src_container, path,
1281
                                        report_size_change=report_size_change)
1282
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1283
                dest_version_ids)
1284

    
1285
    @debug_method
1286
    @backend_method
1287
    def copy_object(self, user, src_account, src_container, src_name,
1288
                    dest_account, dest_container, dest_name, type, domain,
1289
                    meta=None, replace_meta=False, permissions=None,
1290
                    src_version=None, delimiter=None, listing_limit=None):
1291
        """Copy an object's data and metadata."""
1292

    
1293
        meta = meta or {}
1294
        dest_version_id = self._copy_object(
1295
            user, src_account, src_container, src_name, dest_account,
1296
            dest_container, dest_name, type, domain, meta, replace_meta,
1297
            permissions, src_version, False, delimiter,
1298
            listing_limit=listing_limit)
1299
        return dest_version_id
1300

    
1301
    @debug_method
1302
    @backend_method
1303
    def move_object(self, user, src_account, src_container, src_name,
1304
                    dest_account, dest_container, dest_name, type, domain,
1305
                    meta=None, replace_meta=False, permissions=None,
1306
                    delimiter=None, listing_limit=None):
1307
        """Move an object's data and metadata."""
1308

    
1309
        meta = meta or {}
1310
        if user != src_account:
1311
            raise NotAllowedError
1312
        dest_version_id = self._move_object(
1313
            user, src_account, src_container, src_name, dest_account,
1314
            dest_container, dest_name, type, domain, meta, replace_meta,
1315
            permissions, None, delimiter=delimiter,
1316
            listing_limit=listing_limit)
1317
        return dest_version_id
1318

    
1319
    def _delete_object(self, user, account, container, name, until=None,
1320
                       delimiter=None, report_size_change=True,
1321
                       listing_limit=None):
1322
        if user != account:
1323
            raise NotAllowedError
1324

    
1325
        # lookup object and lock container path also
1326
        path, node = self._lookup_object(account, container, name,
1327
                                         lock_container=True)
1328

    
1329
        if until is not None:
1330
            if node is None:
1331
                return
1332
            hashes = []
1333
            size = 0
1334
            serials = []
1335
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1336
                                           update_statistics_ancestors_depth=1)
1337
            hashes += h
1338
            size += s
1339
            serials += v
1340
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1341
                                           update_statistics_ancestors_depth=1)
1342
            hashes += h
1343
            if not self.free_versioning:
1344
                size += s
1345
            serials += v
1346
            for h in hashes:
1347
                self.store.map_delete(h)
1348
            self.node.node_purge(node, until, CLUSTER_DELETED,
1349
                                 update_statistics_ancestors_depth=1)
1350
            try:
1351
                self._get_version(node)
1352
            except NameError:
1353
                self.permissions.access_clear(path)
1354
            self._report_size_change(
1355
                user, account, -size, {
1356
                    'action': 'object purge',
1357
                    'path': path,
1358
                    'versions': ','.join(str(i) for i in serials)
1359
                }
1360
            )
1361
            return
1362

    
1363
        if not self._exists(node):
1364
            raise ItemNotExists('Object is deleted.')
1365

    
1366
        src_version_id, dest_version_id = self._put_version_duplicate(
1367
            user, node, size=0, type='', hash=None, checksum='',
1368
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1369
        del_size = self._apply_versioning(account, container, src_version_id,
1370
                                          update_statistics_ancestors_depth=1)
1371
        if report_size_change:
1372
            self._report_size_change(
1373
                user, account, -del_size,
1374
                {'action': 'object delete',
1375
                 'path': path,
1376
                 'versions': ','.join([str(dest_version_id)])})
1377
        self._report_object_change(
1378
            user, account, path, details={'action': 'object delete'})
1379
        self.permissions.access_clear(path)
1380

    
1381
        if delimiter:
1382
            prefix = name + delimiter if not name.endswith(delimiter) else name
1383
            src_names = self._list_objects_no_limit(
1384
                user, account, container, prefix, delimiter=None,
1385
                virtual=False, domain=None, keys=[], shared=False, until=None,
1386
                size_range=None, all_props=True, public=False,
1387
                listing_limit=listing_limit)
1388
            paths = []
1389
            for t in src_names:
1390
                path = '/'.join((account, container, t[0]))
1391
                node = t[2]
1392
                if not self._exists(node):
1393
                    continue
1394
                src_version_id, dest_version_id = self._put_version_duplicate(
1395
                    user, node, size=0, type='', hash=None, checksum='',
1396
                    cluster=CLUSTER_DELETED,
1397
                    update_statistics_ancestors_depth=1)
1398
                del_size = self._apply_versioning(
1399
                    account, container, src_version_id,
1400
                    update_statistics_ancestors_depth=1)
1401
                if report_size_change:
1402
                    self._report_size_change(
1403
                        user, account, -del_size,
1404
                        {'action': 'object delete',
1405
                         'path': path,
1406
                         'versions': ','.join([str(dest_version_id)])})
1407
                self._report_object_change(
1408
                    user, account, path, details={'action': 'object delete'})
1409
                paths.append(path)
1410
            self.permissions.access_clear_bulk(paths)
1411

    
1412
        # remove all the cached allowed paths
1413
        # removing the specific path could be more expensive
1414
        self._reset_allowed_paths()
1415

    
1416
    @debug_method
1417
    @backend_method
1418
    def delete_object(self, user, account, container, name, until=None,
1419
                      prefix='', delimiter=None, listing_limit=None):
1420
        """Delete/purge an object."""
1421

    
1422
        self._delete_object(user, account, container, name, until, delimiter,
1423
                            listing_limit=listing_limit)
1424

    
1425
    @debug_method
1426
    @backend_method
1427
    def list_versions(self, user, account, container, name):
1428
        """Return a list of all object (version, version_timestamp) tuples."""
1429

    
1430
        self._can_read_object(user, account, container, name)
1431
        path, node = self._lookup_object(account, container, name)
1432
        versions = self.node.node_get_versions(node)
1433
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1434
                x[self.CLUSTER] != CLUSTER_DELETED]
1435

    
1436
    @debug_method
1437
    @backend_method
1438
    def get_uuid(self, user, uuid, check_permissions=True):
1439
        """Return the (account, container, name) for the UUID given."""
1440

    
1441
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1442
        if info is None:
1443
            raise NameError
1444
        path, serial = info
1445
        account, container, name = path.split('/', 2)
1446
        if check_permissions:
1447
            self._can_read_object(user, account, container, name)
1448
        return (account, container, name)
1449

    
1450
    @debug_method
1451
    @backend_method
1452
    def get_public(self, user, public):
1453
        """Return the (account, container, name) for the public id given."""
1454

    
1455
        path = self.permissions.public_path(public)
1456
        if path is None:
1457
            raise NameError
1458
        account, container, name = path.split('/', 2)
1459
        self._can_read_object(user, account, container, name)
1460
        return (account, container, name)
1461

    
1462
    def get_block(self, hash):
1463
        """Return a block's data."""
1464

    
1465
        logger.debug("get_block: %s", hash)
1466
        block = self.store.block_get(self._unhexlify_hash(hash))
1467
        if not block:
1468
            raise ItemNotExists('Block does not exist')
1469
        return block
1470

    
1471
    def put_block(self, data):
1472
        """Store a block and return the hash."""
1473

    
1474
        logger.debug("put_block: %s", len(data))
1475
        return binascii.hexlify(self.store.block_put(data))
1476

    
1477
    def update_block(self, hash, data, offset=0):
1478
        """Update a known block and return the hash."""
1479

    
1480
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1481
        if offset == 0 and len(data) == self.block_size:
1482
            return self.put_block(data)
1483
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1484
        return binascii.hexlify(h)
1485

    
1486
    # Path functions.
1487

    
1488
    def _generate_uuid(self):
1489
        return str(uuidlib.uuid4())
1490

    
1491
    def _put_object_node(self, path, parent, name):
1492
        path = '/'.join((path, name))
1493
        node = self.node.node_lookup(path)
1494
        if node is None:
1495
            node = self.node.node_create(parent, path)
1496
        return path, node
1497

    
1498
    def _put_path(self, user, parent, path,
1499
                  update_statistics_ancestors_depth=None):
1500
        node = self.node.node_create(parent, path)
1501
        self.node.version_create(node, None, 0, '', None, user,
1502
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1503
                                 update_statistics_ancestors_depth)
1504
        return node
1505

    
1506
    def _lookup_account(self, account, create=True):
1507
        node = self.node.node_lookup(account)
1508
        if node is None and create:
1509
            node = self._put_path(
1510
                account, self.ROOTNODE, account,
1511
                update_statistics_ancestors_depth=-1)  # User is account.
1512
        return account, node
1513

    
1514
    def _lookup_container(self, account, container):
1515
        for_update = True if self.lock_container_path else False
1516
        path = '/'.join((account, container))
1517
        node = self.node.node_lookup(path, for_update)
1518
        if node is None:
1519
            raise ItemNotExists('Container does not exist')
1520
        return path, node
1521

    
1522
    def _lookup_object(self, account, container, name, lock_container=False):
1523
        if lock_container:
1524
            self._lookup_container(account, container)
1525

    
1526
        path = '/'.join((account, container, name))
1527
        node = self.node.node_lookup(path)
1528
        if node is None:
1529
            raise ItemNotExists('Object does not exist')
1530
        return path, node
1531

    
1532
    def _lookup_objects(self, paths):
1533
        nodes = self.node.node_lookup_bulk(paths)
1534
        return paths, nodes
1535

    
1536
    def _get_properties(self, node, until=None):
1537
        """Return properties until the timestamp given."""
1538

    
1539
        before = until if until is not None else inf
1540
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1541
        if props is None and until is not None:
1542
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1543
        if props is None:
1544
            raise ItemNotExists('Path does not exist')
1545
        return props
1546

    
1547
    def _get_statistics(self, node, until=None, compute=False):
1548
        """Return (count, sum of size, timestamp) of everything under node."""
1549

    
1550
        if until is not None:
1551
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1552
        elif compute:
1553
            stats = self.node.statistics_latest(node,
1554
                                                except_cluster=CLUSTER_DELETED)
1555
        else:
1556
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1557
        if stats is None:
1558
            stats = (0, 0, 0)
1559
        return stats
1560

    
1561
    def _get_version(self, node, version=None):
1562
        if version is None:
1563
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1564
            if props is None:
1565
                raise ItemNotExists('Object does not exist')
1566
        else:
1567
            try:
1568
                version = int(version)
1569
            except ValueError:
1570
                raise VersionNotExists('Version does not exist')
1571
            props = self.node.version_get_properties(version, node=node)
1572
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1573
                raise VersionNotExists('Version does not exist')
1574
        return props
1575

    
1576
    def _get_versions(self, nodes):
1577
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1578

    
1579
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1580
                               type=None, hash=None, checksum=None,
1581
                               cluster=CLUSTER_NORMAL, is_copy=False,
1582
                               update_statistics_ancestors_depth=None):
1583
        """Create a new version of the node."""
1584

    
1585
        props = self.node.version_lookup(
1586
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1587
        if props is not None:
1588
            src_version_id = props[self.SERIAL]
1589
            src_hash = props[self.HASH]
1590
            src_size = props[self.SIZE]
1591
            src_type = props[self.TYPE]
1592
            src_checksum = props[self.CHECKSUM]
1593
        else:
1594
            src_version_id = None
1595
            src_hash = None
1596
            src_size = 0
1597
            src_type = ''
1598
            src_checksum = ''
1599
        if size is None:  # Set metadata.
1600
            hash = src_hash  # This way hash can be set to None
1601
                             # (account or container).
1602
            size = src_size
1603
        if type is None:
1604
            type = src_type
1605
        if checksum is None:
1606
            checksum = src_checksum
1607
        uuid = self._generate_uuid(
1608
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1609

    
1610
        if src_node is None:
1611
            pre_version_id = src_version_id
1612
        else:
1613
            pre_version_id = None
1614
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1615
            if props is not None:
1616
                pre_version_id = props[self.SERIAL]
1617
        if pre_version_id is not None:
1618
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1619
                                        update_statistics_ancestors_depth)
1620

    
1621
        dest_version_id, mtime = self.node.version_create(
1622
            node, hash, size, type, src_version_id, user, uuid, checksum,
1623
            cluster, update_statistics_ancestors_depth)
1624

    
1625
        self.node.attribute_unset_is_latest(node, dest_version_id)
1626

    
1627
        return pre_version_id, dest_version_id
1628

    
1629
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1630
                                node, meta, replace=False):
1631
        if src_version_id is not None:
1632
            self.node.attribute_copy(src_version_id, dest_version_id)
1633
        if not replace:
1634
            self.node.attribute_del(dest_version_id, domain, (
1635
                k for k, v in meta.iteritems() if v == ''))
1636
            self.node.attribute_set(dest_version_id, domain, node, (
1637
                (k, v) for k, v in meta.iteritems() if v != ''))
1638
        else:
1639
            self.node.attribute_del(dest_version_id, domain)
1640
            self.node.attribute_set(dest_version_id, domain, node, ((
1641
                k, v) for k, v in meta.iteritems()))
1642

    
1643
    def _put_metadata(self, user, node, domain, meta, replace=False,
1644
                      update_statistics_ancestors_depth=None):
1645
        """Create a new version and store metadata."""
1646

    
1647
        src_version_id, dest_version_id = self._put_version_duplicate(
1648
            user, node,
1649
            update_statistics_ancestors_depth=
1650
            update_statistics_ancestors_depth)
1651
        self._put_metadata_duplicate(
1652
            src_version_id, dest_version_id, domain, node, meta, replace)
1653
        return src_version_id, dest_version_id
1654

    
1655
    def _list_limits(self, listing, marker, limit):
1656
        start = 0
1657
        if marker:
1658
            try:
1659
                start = listing.index(marker) + 1
1660
            except ValueError:
1661
                pass
1662
        if not limit or limit > 10000:
1663
            limit = 10000
1664
        return start, limit
1665

    
1666
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1667
                                marker=None, limit=10000, virtual=True,
1668
                                domain=None, keys=None, until=None,
1669
                                size_range=None, allowed=None,
1670
                                all_props=False):
1671
        keys = keys or []
1672
        allowed = allowed or []
1673
        cont_prefix = path + '/'
1674
        prefix = cont_prefix + prefix
1675
        start = cont_prefix + marker if marker else None
1676
        before = until if until is not None else inf
1677
        filterq = keys if domain else []
1678
        sizeq = size_range
1679

    
1680
        objects, prefixes = self.node.latest_version_list(
1681
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1682
            allowed, domain, filterq, sizeq, all_props)
1683
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1684
        objects.sort(key=lambda x: x[0])
1685
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1686
        return objects
1687

    
1688
    # Reporting functions.
1689

    
1690
    @debug_method
1691
    @backend_method
1692
    def _report_size_change(self, user, account, size, details=None):
1693
        details = details or {}
1694

    
1695
        if size == 0:
1696
            return
1697

    
1698
        account_node = self._lookup_account(account, True)[1]
1699
        total = self._get_statistics(account_node, compute=True)[1]
1700
        details.update({'user': user, 'total': total})
1701
        self.messages.append(
1702
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1703
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1704

    
1705
        if not self.using_external_quotaholder:
1706
            return
1707

    
1708
        try:
1709
            name = details['path'] if 'path' in details else ''
1710
            serial = self.astakosclient.issue_one_commission(
1711
                holder=account,
1712
                source=DEFAULT_SOURCE,
1713
                provisions={'pithos.diskspace': size},
1714
                name=name)
1715
        except BaseException, e:
1716
            raise QuotaError(e)
1717
        else:
1718
            self.serials.append(serial)
1719

    
1720
    @debug_method
1721
    @backend_method
1722
    def _report_object_change(self, user, account, path, details=None):
1723
        details = details or {}
1724
        details.update({'user': user})
1725
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1726
                              account, QUEUE_INSTANCE_ID, 'object', path,
1727
                              details))
1728

    
1729
    @debug_method
1730
    @backend_method
1731
    def _report_sharing_change(self, user, account, path, details=None):
1732
        details = details or {}
1733
        details.update({'user': user})
1734
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1735
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1736
                              details))
1737

    
1738
    # Policy functions.
1739

    
1740
    def _check_policy(self, policy, is_account_policy=True):
1741
        default_policy = self.default_account_policy \
1742
            if is_account_policy else self.default_container_policy
1743
        for k in policy.keys():
1744
            if policy[k] == '':
1745
                policy[k] = default_policy.get(k)
1746
        for k, v in policy.iteritems():
1747
            if k == 'quota':
1748
                q = int(v)  # May raise ValueError.
1749
                if q < 0:
1750
                    raise ValueError
1751
            elif k == 'versioning':
1752
                if v not in ['auto', 'none']:
1753
                    raise ValueError
1754
            else:
1755
                raise ValueError
1756

    
1757
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1758
        default_policy = self.default_account_policy \
1759
            if is_account_policy else self.default_container_policy
1760
        if replace:
1761
            for k, v in default_policy.iteritems():
1762
                if k not in policy:
1763
                    policy[k] = v
1764
        self.node.policy_set(node, policy)
1765

    
1766
    def _get_policy(self, node, is_account_policy=True):
1767
        default_policy = self.default_account_policy \
1768
            if is_account_policy else self.default_container_policy
1769
        policy = default_policy.copy()
1770
        policy.update(self.node.policy_get(node))
1771
        return policy
1772

    
1773
    def _apply_versioning(self, account, container, version_id,
1774
                          update_statistics_ancestors_depth=None):
1775
        """Delete the provided version if such is the policy.
1776
           Return size of object removed.
1777
        """
1778

    
1779
        if version_id is None:
1780
            return 0
1781
        path, node = self._lookup_container(account, container)
1782
        versioning = self._get_policy(
1783
            node, is_account_policy=False)['versioning']
1784
        if versioning != 'auto':
1785
            hash, size = self.node.version_remove(
1786
                version_id, update_statistics_ancestors_depth)
1787
            self.store.map_delete(hash)
1788
            return size
1789
        elif self.free_versioning:
1790
            return self.node.version_get_properties(
1791
                version_id, keys=('size',))[0]
1792
        return 0
1793

    
1794
    # Access control functions.
1795

    
1796
    def _check_groups(self, groups):
1797
        # raise ValueError('Bad characters in groups')
1798
        pass
1799

    
1800
    def _check_permissions(self, path, permissions):
1801
        # raise ValueError('Bad characters in permissions')
1802
        pass
1803

    
1804
    def _get_formatted_paths(self, paths):
1805
        formatted = []
1806
        if len(paths) == 0:
1807
            return formatted
1808
        props = self.node.get_props(paths)
1809
        if props:
1810
            for prop in props:
1811
                if prop[1].split(';', 1)[0].strip() in (
1812
                        'application/directory', 'application/folder'):
1813
                    formatted.append((prop[0].rstrip('/') + '/',
1814
                                      self.MATCH_PREFIX))
1815
                formatted.append((prop[0], self.MATCH_EXACT))
1816
        return formatted
1817

    
1818
    def _get_permissions_path(self, account, container, name):
1819
        path = '/'.join((account, container, name))
1820
        permission_paths = self.permissions.access_inherit(path)
1821
        permission_paths.sort()
1822
        permission_paths.reverse()
1823
        for p in permission_paths:
1824
            if p == path:
1825
                return p
1826
            else:
1827
                if p.count('/') < 2:
1828
                    continue
1829
                node = self.node.node_lookup(p)
1830
                props = None
1831
                if node is not None:
1832
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1833
                if props is not None:
1834
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1835
                            'application/directory', 'application/folder'):
1836
                        return p
1837
        return None
1838

    
1839
    def _get_permissions_path_bulk(self, account, container, names):
1840
        formatted_paths = []
1841
        for name in names:
1842
            path = '/'.join((account, container, name))
1843
            formatted_paths.append(path)
1844
        permission_paths = self.permissions.access_inherit_bulk(
1845
            formatted_paths)
1846
        permission_paths.sort()
1847
        permission_paths.reverse()
1848
        permission_paths_list = []
1849
        lookup_list = []
1850
        for p in permission_paths:
1851
            if p in formatted_paths:
1852
                permission_paths_list.append(p)
1853
            else:
1854
                if p.count('/') < 2:
1855
                    continue
1856
                lookup_list.append(p)
1857

    
1858
        if len(lookup_list) > 0:
1859
            props = self.node.get_props(lookup_list)
1860
            if props:
1861
                for prop in props:
1862
                    if prop[1].split(';', 1)[0].strip() in (
1863
                            'application/directory', 'application/folder'):
1864
                        permission_paths_list.append(prop[0])
1865

    
1866
        if len(permission_paths_list) > 0:
1867
            return permission_paths_list
1868

    
1869
        return None
1870

    
1871
    def _reset_allowed_paths(self):
1872
        self.read_allowed_paths = defaultdict(set)
1873
        self.write_allowed_paths = defaultdict(set)
1874

    
1875
    @check_allowed_paths(action=0)
1876
    def _can_read_account(self, user, account):
1877
        if user != account:
1878
            if account not in self._allowed_accounts(user):
1879
                raise NotAllowedError
1880

    
1881
    @check_allowed_paths(action=1)
1882
    def _can_write_account(self, user, account):
1883
        if user != account:
1884
            raise NotAllowedError
1885

    
1886
    @check_allowed_paths(action=0)
1887
    def _can_read_container(self, user, account, container):
1888
        if user != account:
1889
            if container not in self._allowed_containers(user, account):
1890
                raise NotAllowedError
1891

    
1892
    @check_allowed_paths(action=1)
1893
    def _can_write_container(self, user, account, container):
1894
        if user != account:
1895
            raise NotAllowedError
1896

    
1897
    @check_allowed_paths(action=0)
1898
    def _can_read_object(self, user, account, container, name):
1899
        if user == account:
1900
            return True
1901
        path = '/'.join((account, container, name))
1902
        if self.permissions.public_get(path) is not None:
1903
            return True
1904
        path = self._get_permissions_path(account, container, name)
1905
        if not path:
1906
            raise NotAllowedError
1907
        if (not self.permissions.access_check(path, self.READ, user) and not
1908
                self.permissions.access_check(path, self.WRITE, user)):
1909
            raise NotAllowedError
1910

    
1911
    @check_allowed_paths(action=1)
1912
    def _can_write_object(self, user, account, container, name):
1913
        if user == account:
1914
            return True
1915
        path = '/'.join((account, container, name))
1916
        path = self._get_permissions_path(account, container, name)
1917
        if not path:
1918
            raise NotAllowedError
1919
        if not self.permissions.access_check(path, self.WRITE, user):
1920
            raise NotAllowedError
1921

    
1922
    def _allowed_accounts(self, user):
1923
        allow = set()
1924
        for path in self.permissions.access_list_paths(user):
1925
            p = path.split('/', 1)[0]
1926
            allow.add(p)
1927
        self.read_allowed_paths[user] |= allow
1928
        return sorted(allow)
1929

    
1930
    def _allowed_containers(self, user, account):
1931
        allow = set()
1932
        for path in self.permissions.access_list_paths(user, account):
1933
            p = path.split('/', 2)[1]
1934
            allow.add(p)
1935
        self.read_allowed_paths[user] |= allow
1936
        return sorted(allow)
1937

    
1938
    # Domain functions
1939

    
1940
    @debug_method
1941
    @backend_method
1942
    def get_domain_objects(self, domain, user=None):
1943
        allowed_paths = self.permissions.access_list_paths(
1944
            user, include_owned=user is not None, include_containers=False)
1945
        if not allowed_paths:
1946
            return []
1947
        obj_list = self.node.domain_object_list(
1948
            domain, allowed_paths, CLUSTER_NORMAL)
1949
        return [(path,
1950
                 self._build_metadata(props, user_defined_meta),
1951
                 self.permissions.access_get(path)) for
1952
                path, props, user_defined_meta in obj_list]
1953

    
1954
    # util functions
1955

    
1956
    def _build_metadata(self, props, user_defined=None,
1957
                        include_user_defined=True):
1958
        meta = {'bytes': props[self.SIZE],
1959
                'type': props[self.TYPE],
1960
                'hash': props[self.HASH],
1961
                'version': props[self.SERIAL],
1962
                'version_timestamp': props[self.MTIME],
1963
                'modified_by': props[self.MUSER],
1964
                'uuid': props[self.UUID],
1965
                'checksum': props[self.CHECKSUM]}
1966
        if include_user_defined and user_defined is not None:
1967
            meta.update(user_defined)
1968
        return meta
1969

    
1970
    def _exists(self, node):
1971
        try:
1972
            self._get_version(node)
1973
        except ItemNotExists:
1974
            return False
1975
        else:
1976
            return True
1977

    
1978
    def _unhexlify_hash(self, hash):
1979
        try:
1980
            return binascii.unhexlify(hash)
1981
        except TypeError:
1982
            raise InvalidHash(hash)