Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 2560c061

History | View | Annotate | Download (80.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
QUOTA_POLICY = 'quota'
120
VERSIONING_POLICY = 'versioning'
121
PROJECT = 'project'
122

    
123
inf = float('inf')
124

    
125
ULTIMATE_ANSWER = 42
126

    
127
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
128

    
129
logger = logging.getLogger(__name__)
130

    
131

    
132
def backend_method(func):
133
    @wraps(func)
134
    def wrapper(self, *args, **kw):
135
        # if we are inside a database transaction
136
        # just proceed with the method execution
137
        # otherwise manage a new transaction
138
        if self.in_transaction:
139
            return func(self, *args, **kw)
140

    
141
        try:
142
            self.pre_exec()
143
            result = func(self, *args, **kw)
144
            success_status = True
145
            return result
146
        except:
147
            success_status = False
148
            raise
149
        finally:
150
            self.post_exec(success_status)
151
    return wrapper
152

    
153

    
154
def debug_method(func):
155
    @wraps(func)
156
    def wrapper(self, *args, **kw):
157
        try:
158
            result = func(self, *args, **kw)
159
            return result
160
        except:
161
            result = format_exc()
162
            raise
163
        finally:
164
            all_args = map(repr, args)
165
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
166
            logger.debug(">>> %s(%s) <<< %s" % (
167
                func.__name__, ', '.join(all_args).rstrip(', '), result))
168
    return wrapper
169

    
170

    
171
def check_allowed_paths(action):
172
    """Decorator for backend methods checking path access granted to user.
173

174
    The 1st argument of the decorated method is expected to be a
175
    ModularBackend instance, the 2nd the user performing the request and
176
    the path join of the rest arguments is supposed to be the requested path.
177

178
    The decorator checks whether the requested path is among the user's allowed
179
    cached paths.
180
    If this is the case, the decorator returns immediately to reduce the
181
    interactions with the database.
182
    Otherwise, it proceeds with the execution of the decorated method and if
183
    the method returns successfully (no exceptions are raised), the requested
184
    path is added to the user's cached allowed paths.
185

186
    :param action: (int) 0 for reads / 1 for writes
187
    :raises NotAllowedError: the user does not have access to the path
188
    """
189
    def decorator(func):
190
        @wraps(func)
191
        def wrapper(self, *args):
192
            user = args[0]
193
            if action == self.READ:
194
                d = self.read_allowed_paths
195
            else:
196
                d = self.write_allowed_paths
197
            path = '/'.join(args[1:])
198
            if path in d.get(user, []):
199
                return  # access is already checked
200
            else:
201
                func(self, *args)   # proceed with access check
202
                d[user].add(path)  # add path in the allowed user paths
203
        return wrapper
204
    return decorator
205

    
206

    
207
def list_method(func):
208
    @wraps(func)
209
    def wrapper(self, *args, **kw):
210
        marker = kw.get('marker')
211
        limit = kw.get('limit')
212
        result = func(self, *args, **kw)
213
        start, limit = self._list_limits(result, marker, limit)
214
        return result[start:start + limit]
215
    return wrapper
216

    
217

    
218
class ModularBackend(BaseBackend):
219
    """A modular backend.
220

221
    Uses modules for SQL functions and storage.
222
    """
223

    
224
    def __init__(self, db_module=None, db_connection=None,
225
                 block_module=None, block_path=None, block_umask=None,
226
                 block_size=None, hash_algorithm=None,
227
                 queue_module=None, queue_hosts=None, queue_exchange=None,
228
                 astakos_auth_url=None, service_token=None,
229
                 astakosclient_poolsize=None,
230
                 free_versioning=True, block_params=None,
231
                 public_url_security=None,
232
                 public_url_alphabet=None,
233
                 account_quota_policy=None,
234
                 container_quota_policy=None,
235
                 container_versioning_policy=None):
236
        db_module = db_module or DEFAULT_DB_MODULE
237
        db_connection = db_connection or DEFAULT_DB_CONNECTION
238
        block_module = block_module or DEFAULT_BLOCK_MODULE
239
        block_path = block_path or DEFAULT_BLOCK_PATH
240
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
241
        block_params = block_params or DEFAULT_BLOCK_PARAMS
242
        block_size = block_size or DEFAULT_BLOCK_SIZE
243
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
244
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
245
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
246
        container_quota_policy = container_quota_policy \
247
            or DEFAULT_CONTAINER_QUOTA
248
        container_versioning_policy = container_versioning_policy \
249
            or DEFAULT_CONTAINER_VERSIONING
250

    
251
        self.default_account_policy = {}
252
        self.default_container_policy = {
253
            QUOTA_POLICY: container_quota_policy,
254
            VERSIONING_POLICY: container_versioning_policy,
255
            PROJECT: None
256
        }
257
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
258
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
259

    
260
        self.public_url_security = (public_url_security or
261
                                    DEFAULT_PUBLIC_URL_SECURITY)
262
        self.public_url_alphabet = (public_url_alphabet or
263
                                    DEFAULT_PUBLIC_URL_ALPHABET)
264

    
265
        self.hash_algorithm = hash_algorithm
266
        self.block_size = block_size
267
        self.free_versioning = free_versioning
268

    
269
        def load_module(m):
270
            __import__(m)
271
            return sys.modules[m]
272

    
273
        self.db_module = load_module(db_module)
274
        self.wrapper = self.db_module.DBWrapper(db_connection)
275
        params = {'wrapper': self.wrapper}
276
        self.permissions = self.db_module.Permissions(**params)
277
        self.config = self.db_module.Config(**params)
278
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
279
        for x in ['READ', 'WRITE']:
280
            setattr(self, x, getattr(self.db_module, x))
281
        self.node = self.db_module.Node(**params)
282
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
283
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
284
                  'MATCH_PREFIX', 'MATCH_EXACT']:
285
            setattr(self, x, getattr(self.db_module, x))
286

    
287
        self.ALLOWED = ['read', 'write']
288

    
289
        self.block_module = load_module(block_module)
290
        self.block_params = block_params
291
        params = {'path': block_path,
292
                  'block_size': self.block_size,
293
                  'hash_algorithm': self.hash_algorithm,
294
                  'umask': block_umask}
295
        params.update(self.block_params)
296
        self.store = self.block_module.Store(**params)
297

    
298
        if queue_module and queue_hosts:
299
            self.queue_module = load_module(queue_module)
300
            params = {'hosts': queue_hosts,
301
                      'exchange': queue_exchange,
302
                      'client_id': QUEUE_CLIENT_ID}
303
            self.queue = self.queue_module.Queue(**params)
304
        else:
305
            class NoQueue:
306
                def send(self, *args):
307
                    pass
308

    
309
                def close(self):
310
                    pass
311

    
312
            self.queue = NoQueue()
313

    
314
        self.astakos_auth_url = astakos_auth_url
315
        self.service_token = service_token
316

    
317
        if not astakos_auth_url or not AstakosClient:
318
            self.astakosclient = DisabledAstakosClient(
319
                service_token, astakos_auth_url,
320
                use_pool=True,
321
                pool_size=astakosclient_poolsize)
322
        else:
323
            self.astakosclient = AstakosClient(
324
                service_token, astakos_auth_url,
325
                use_pool=True,
326
                pool_size=astakosclient_poolsize)
327

    
328
        self.serials = []
329
        self.messages = []
330

    
331
        self._move_object = partial(self._copy_object, is_move=True)
332

    
333
        self.lock_container_path = False
334

    
335
        self.in_transaction = False
336

    
337
        self._reset_allowed_paths()
338

    
339
    def pre_exec(self, lock_container_path=False):
340
        self.lock_container_path = lock_container_path
341
        self.wrapper.execute()
342
        self.serials = []
343
        self._reset_allowed_paths()
344
        self.in_transaction = True
345

    
346
    def post_exec(self, success_status=True):
347
        if success_status:
348
            # send messages produced
349
            for m in self.messages:
350
                self.queue.send(*m)
351

    
352
            # register serials
353
            if self.serials:
354
                self.commission_serials.insert_many(
355
                    self.serials)
356

    
357
                # commit to ensure that the serials are registered
358
                # even if resolve commission fails
359
                self.wrapper.commit()
360

    
361
                # start new transaction
362
                self.wrapper.execute()
363

    
364
                r = self.astakosclient.resolve_commissions(
365
                    accept_serials=self.serials,
366
                    reject_serials=[])
367
                self.commission_serials.delete_many(
368
                    r['accepted'])
369

    
370
            self.wrapper.commit()
371
        else:
372
            if self.serials:
373
                r = self.astakosclient.resolve_commissions(
374
                    accept_serials=[],
375
                    reject_serials=self.serials)
376
                self.commission_serials.delete_many(
377
                    r['rejected'])
378
            self.wrapper.rollback()
379
        self.in_transaction = False
380

    
381
    def close(self):
382
        self.wrapper.close()
383
        self.queue.close()
384

    
385
    @property
386
    def using_external_quotaholder(self):
387
        return not isinstance(self.astakosclient, DisabledAstakosClient)
388

    
389
    @debug_method
390
    @backend_method
391
    @list_method
392
    def list_accounts(self, user, marker=None, limit=10000):
393
        """Return a list of accounts the user can access."""
394

    
395
        return self._allowed_accounts(user)
396

    
397
    @debug_method
398
    @backend_method
399
    def get_account_meta(self, user, account, domain=None, until=None,
400
                         include_user_defined=True):
401
        """Return a dictionary with the account metadata for the domain."""
402

    
403
        self._can_read_account(user, account)
404
        path, node = self._lookup_account(account, user == account)
405
        if user != account:
406
            if until or (node is None):
407
                raise NotAllowedError
408
        try:
409
            props = self._get_properties(node, until)
410
            mtime = props[self.MTIME]
411
        except NameError:
412
            props = None
413
            mtime = until
414
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
415
        tstamp = max(tstamp, mtime)
416
        if until is None:
417
            modified = tstamp
418
        else:
419
            modified = self._get_statistics(
420
                node, compute=True)[2]  # Overall last modification.
421
            modified = max(modified, mtime)
422

    
423
        if user != account:
424
            meta = {'name': account}
425
        else:
426
            meta = {}
427
            if props is not None and include_user_defined:
428
                if domain is None:
429
                    raise ValueError(
430
                        'Domain argument is obligatory for getting '
431
                        'user defined metadata')
432
                meta.update(
433
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
434
            if until is not None:
435
                meta.update({'until_timestamp': tstamp})
436
            meta.update({'name': account, 'count': count, 'bytes': bytes})
437
            if self.using_external_quotaholder:
438
                external_quota = self.astakosclient.service_get_quotas(
439
                    account)[account]
440
                meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in
441
                                    external_quota.values())
442
        meta.update({'modified': modified})
443
        return meta
444

    
445
    @debug_method
446
    @backend_method
447
    def update_account_meta(self, user, account, domain, meta, replace=False):
448
        """Update the metadata associated with the account for the domain."""
449

    
450
        self._can_write_account(user, account)
451
        path, node = self._lookup_account(account, True)
452
        self._put_metadata(user, node, domain, meta, replace,
453
                           update_statistics_ancestors_depth=-1)
454

    
455
    @debug_method
456
    @backend_method
457
    def get_account_groups(self, user, account):
458
        """Return a dictionary with the user groups defined for the account."""
459

    
460
        self._can_read_account(user, account)
461
        if user != account:
462
            return {}
463
        self._lookup_account(account, True)
464
        return self.permissions.group_dict(account)
465

    
466
    @debug_method
467
    @backend_method
468
    def update_account_groups(self, user, account, groups, replace=False):
469
        """Update the groups associated with the account."""
470

    
471
        self._can_write_account(user, account)
472
        self._lookup_account(account, True)
473
        self._check_groups(groups)
474
        if replace:
475
            self.permissions.group_destroy(account)
476
        for k, v in groups.iteritems():
477
            if not replace:  # If not already deleted.
478
                self.permissions.group_delete(account, k)
479
            if v:
480
                self.permissions.group_addmany(account, k, v)
481

    
482
    @debug_method
483
    @backend_method
484
    def get_account_policy(self, user, account):
485
        """Return a dictionary with the account policy."""
486

    
487
        self._can_read_account(user, account)
488
        if user != account:
489
            return {}
490
        path, node = self._lookup_account(account, True)
491
        policy = self._get_policy(node, is_account_policy=True)
492
        if self.using_external_quotaholder:
493
            external_quota = self.astakosclient.service_get_quotas(
494
                account)[account]
495
            policy.update(dict(('%s-%s' % (QUOTA_POLICY, k),
496
                                v['pithos.diskspace']['limit']) for k, v in
497
                               external_quota.items()))
498

    
499
        return policy
500

    
501
    @debug_method
502
    @backend_method
503
    def update_account_policy(self, user, account, policy, replace=False):
504
        """Update the policy associated with the account."""
505

    
506
        self._can_write_account(user, account)
507
        path, node = self._lookup_account(account, True)
508
        self._put_policy(node, policy, replace, is_account_policy=True,
509
                         check=True)
510

    
511
    @debug_method
512
    @backend_method
513
    def put_account(self, user, account, policy=None):
514
        """Create a new account with the given name."""
515

    
516
        policy = policy or {}
517
        self._can_write_account(user, account)
518
        node = self.node.node_lookup(account)
519
        if node is not None:
520
            raise AccountExists('Account already exists')
521
        node = self._put_path(user, self.ROOTNODE, account,
522
                              update_statistics_ancestors_depth=-1)
523
        self._put_policy(node, policy, True, is_account_policy=True,
524
                         check=True if policy else False)
525

    
526
    @debug_method
527
    @backend_method
528
    def delete_account(self, user, account):
529
        """Delete the account with the given name."""
530

    
531
        self._can_write_account(user, account)
532
        node = self.node.node_lookup(account)
533
        if node is None:
534
            return
535
        if not self.node.node_remove(node,
536
                                     update_statistics_ancestors_depth=-1):
537
            raise AccountNotEmpty('Account is not empty')
538
        self.permissions.group_destroy(account)
539

    
540
        # remove all the cached allowed paths
541
        # removing the specific path could be more expensive
542
        self._reset_allowed_paths()
543

    
544
    @debug_method
545
    @backend_method
546
    @list_method
547
    def list_containers(self, user, account, marker=None, limit=10000,
548
                        shared=False, until=None, public=False):
549
        """Return a list of containers existing under an account."""
550

    
551
        self._can_read_account(user, account)
552
        if user != account:
553
            if until:
554
                raise NotAllowedError
555
            return self._allowed_containers(user, account)
556
        if shared or public:
557
            allowed = set()
558
            if shared:
559
                allowed.update([x.split('/', 2)[1] for x in
560
                               self.permissions.access_list_shared(account)])
561
            if public:
562
                allowed.update([x[0].split('/', 2)[1] for x in
563
                               self.permissions.public_list(account)])
564
            return sorted(allowed)
565
        node = self.node.node_lookup(account)
566
        return [x[0] for x in self._list_object_properties(
567
            node, account, '', '/', marker, limit, False, None, [], until)]
568

    
569
    @debug_method
570
    @backend_method
571
    def list_container_meta(self, user, account, container, domain,
572
                            until=None):
573
        """Return a list of the container's object meta keys for a domain."""
574

    
575
        self._can_read_container(user, account, container)
576
        allowed = []
577
        if user != account:
578
            if until:
579
                raise NotAllowedError
580
        path, node = self._lookup_container(account, container)
581
        before = until if until is not None else inf
582
        allowed = self._get_formatted_paths(allowed)
583
        return self.node.latest_attribute_keys(node, domain, before,
584
                                               CLUSTER_DELETED, allowed)
585

    
586
    @debug_method
587
    @backend_method
588
    def get_container_meta(self, user, account, container, domain=None,
589
                           until=None, include_user_defined=True):
590
        """Return a dictionary with the container metadata for the domain."""
591

    
592
        self._can_read_container(user, account, container)
593
        if user != account:
594
            if until:
595
                raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597
        props = self._get_properties(node, until)
598
        mtime = props[self.MTIME]
599
        count, bytes, tstamp = self._get_statistics(node, until)
600
        tstamp = max(tstamp, mtime)
601
        if until is None:
602
            modified = tstamp
603
        else:
604
            modified = self._get_statistics(
605
                node)[2]  # Overall last modification.
606
            modified = max(modified, mtime)
607

    
608
        if user != account:
609
            meta = {'name': container}
610
        else:
611
            meta = {}
612
            if include_user_defined:
613
                if domain is None:
614
                    raise ValueError(
615
                        'Domain argument is obligatory for getting '
616
                        'user defined metadata')
617
                meta.update(
618
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
619
            if until is not None:
620
                meta.update({'until_timestamp': tstamp})
621
            meta.update({'name': container, 'count': count, 'bytes': bytes})
622
        meta.update({'modified': modified})
623
        return meta
624

    
625
    @debug_method
626
    @backend_method
627
    def update_container_meta(self, user, account, container, domain, meta,
628
                              replace=False):
629
        """Update the metadata associated with the container for the domain."""
630

    
631
        self._can_write_container(user, account, container)
632
        path, node = self._lookup_container(account, container)
633
        src_version_id, dest_version_id = self._put_metadata(
634
            user, node, domain, meta, replace,
635
            update_statistics_ancestors_depth=0)
636
        if src_version_id is not None:
637
            versioning = self._get_policy(
638
                node, is_account_policy=False)[VERSIONING_POLICY]
639
            if versioning != 'auto':
640
                self.node.version_remove(src_version_id,
641
                                         update_statistics_ancestors_depth=0)
642

    
643
    @debug_method
644
    @backend_method
645
    def get_container_policy(self, user, account, container):
646
        """Return a dictionary with the container policy."""
647

    
648
        self._can_read_container(user, account, container)
649
        if user != account:
650
            return {}
651
        path, node = self._lookup_container(account, container)
652
        return self._get_policy(node, is_account_policy=False)
653

    
654
    @debug_method
655
    @backend_method
656
    def update_container_policy(self, user, account, container, policy,
657
                                replace=False):
658
        """Update the policy associated with the container."""
659

    
660
        self._can_write_container(user, account, container)
661
        path, node = self._lookup_container(account, container)
662

    
663
        if PROJECT in policy:
664
            project = self._get_project(node)
665
            try:
666
                serial = self.astakosclient.issue_resource_reassignment(
667
                    holder=account,
668
                    from_source=project,
669
                    to_source=policy[PROJECT],
670
                    provisions={'pithos.diskspace': self.get_container_meta(
671
                        user, account, container,
672
                        include_user_defined=False)['bytes']})
673
            except BaseException, e:
674
                raise QuotaError(e)
675
            else:
676
                self.serials.append(serial)
677

    
678
        self._put_policy(node, policy, replace, is_account_policy=False,
679
                         default_project=account, check=True)
680

    
681
    @debug_method
682
    @backend_method
683
    def put_container(self, user, account, container, policy=None):
684
        """Create a new container with the given name."""
685

    
686
        policy = policy or {}
687
        self._can_write_container(user, account, container)
688
        try:
689
            path, node = self._lookup_container(account, container)
690
        except NameError:
691
            pass
692
        else:
693
            raise ContainerExists('Container already exists')
694
        path = '/'.join((account, container))
695
        node = self._put_path(
696
            user, self._lookup_account(account, True)[1], path,
697
            update_statistics_ancestors_depth=-1)
698
        self._put_policy(node, policy, True, is_account_policy=False,
699
                         default_project=account,
700
                         check=True if policy else False)
701

    
702
    @debug_method
703
    @backend_method
704
    def delete_container(self, user, account, container, until=None, prefix='',
705
                         delimiter=None):
706
        """Delete/purge the container with the given name."""
707

    
708
        self._can_write_container(user, account, container)
709
        path, node = self._lookup_container(account, container)
710
        project = self._get_project(node)
711

    
712
        if until is not None:
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, until, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            if not self.free_versioning:
721
                self._report_size_change(
722
                    user, account, -size, project, {
723
                        'action': 'container purge',
724
                        'path': path,
725
                        'versions': ','.join(str(i) for i in serials)
726
                    }
727
                )
728
            return
729

    
730
        if not delimiter:
731
            if self._get_statistics(node)[0] > 0:
732
                raise ContainerNotEmpty('Container is not empty')
733
            hashes, size, serials = self.node.node_purge_children(
734
                node, inf, CLUSTER_HISTORY,
735
                update_statistics_ancestors_depth=0)
736
            for h in hashes:
737
                self.store.map_delete(h)
738
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
739
                                          update_statistics_ancestors_depth=0)
740
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
741
            if not self.free_versioning:
742
                self._report_size_change(
743
                    user, account, -size, project, {
744
                        'action': 'container purge',
745
                        'path': path,
746
                        'versions': ','.join(str(i) for i in serials)
747
                    }
748
                )
749
        else:
750
            # remove only contents
751
            src_names = self._list_objects_no_limit(
752
                user, account, container, prefix='', delimiter=None,
753
                virtual=False, domain=None, keys=[], shared=False, until=None,
754
                size_range=None, all_props=True, public=False)
755
            paths = []
756
            for t in src_names:
757
                path = '/'.join((account, container, t[0]))
758
                node = t[2]
759
                if not self._exists(node):
760
                    continue
761
                src_version_id, dest_version_id = self._put_version_duplicate(
762
                    user, node, size=0, type='', hash=None, checksum='',
763
                    cluster=CLUSTER_DELETED,
764
                    update_statistics_ancestors_depth=1)
765
                del_size = self._apply_versioning(
766
                    account, container, src_version_id,
767
                    update_statistics_ancestors_depth=1)
768
                self._report_size_change(
769
                    user, account, -del_size, project, {
770
                        'action': 'object delete',
771
                        'path': path,
772
                        'versions': ','.join([str(dest_version_id)])})
773
                self._report_object_change(
774
                    user, account, path, details={'action': 'object delete'})
775
                paths.append(path)
776
            self.permissions.access_clear_bulk(paths)
777

    
778
        # remove all the cached allowed paths
779
        # removing the specific path could be more expensive
780
        self._reset_allowed_paths()
781

    
782
    def _list_objects(self, user, account, container, prefix, delimiter,
783
                      marker, limit, virtual, domain, keys, shared, until,
784
                      size_range, all_props, public):
785
        if user != account and until:
786
            raise NotAllowedError
787

    
788
        objects = set()
789
        if shared and public:
790
            # get shared first
791
            shared_paths = self._list_object_permissions(
792
                user, account, container, prefix, shared=True, public=False)
793
            if shared_paths:
794
                path, node = self._lookup_container(account, container)
795
                shared_paths = self._get_formatted_paths(shared_paths)
796
                objects = set(self._list_object_properties(
797
                    node, path, prefix, delimiter, marker, limit, virtual,
798
                    domain, keys, until, size_range, shared_paths, all_props))
799

    
800
            # get public
801
            objects |= set(self._list_public_object_properties(
802
                user, account, container, prefix, all_props))
803
            objects = list(objects)
804

    
805
            objects.sort(key=lambda x: x[0])
806
        elif public:
807
            objects = self._list_public_object_properties(
808
                user, account, container, prefix, all_props)
809
        else:
810
            allowed = self._list_object_permissions(
811
                user, account, container, prefix, shared, public=False)
812
            if shared and not allowed:
813
                return []
814
            path, node = self._lookup_container(account, container)
815
            allowed = self._get_formatted_paths(allowed)
816
            objects = self._list_object_properties(
817
                node, path, prefix, delimiter, marker, limit, virtual, domain,
818
                keys, until, size_range, allowed, all_props)
819

    
820
        # apply limits
821
        start, limit = self._list_limits(objects, marker, limit)
822
        return objects[start:start + limit]
823

    
824
    def _list_public_object_properties(self, user, account, container, prefix,
825
                                       all_props):
826
        public = self._list_object_permissions(
827
            user, account, container, prefix, shared=False, public=True)
828
        paths, nodes = self._lookup_objects(public)
829
        path = '/'.join((account, container))
830
        cont_prefix = path + '/'
831
        paths = [x[len(cont_prefix):] for x in paths]
832
        objects = [(p,) + props for p, props in
833
                   zip(paths, self.node.version_lookup_bulk(
834
                       nodes, all_props=all_props, order_by_path=True))]
835
        return objects
836

    
837
    def _list_objects_no_limit(self, user, account, container, prefix,
838
                               delimiter, virtual, domain, keys, shared, until,
839
                               size_range, all_props, public):
840
        objects = []
841
        while True:
842
            marker = objects[-1] if objects else None
843
            limit = 10000
844
            l = self._list_objects(
845
                user, account, container, prefix, delimiter, marker, limit,
846
                virtual, domain, keys, shared, until, size_range, all_props,
847
                public)
848
            objects.extend(l)
849
            if not l or len(l) < limit:
850
                break
851
        return objects
852

    
853
    def _list_object_permissions(self, user, account, container, prefix,
854
                                 shared, public):
855
        allowed = []
856
        path = '/'.join((account, container, prefix)).rstrip('/')
857
        if user != account:
858
            allowed = self.permissions.access_list_paths(user, path)
859
            if not allowed:
860
                raise NotAllowedError
861
        else:
862
            allowed = set()
863
            if shared:
864
                allowed.update(self.permissions.access_list_shared(path))
865
            if public:
866
                allowed.update(
867
                    [x[0] for x in self.permissions.public_list(path)])
868
            allowed = sorted(allowed)
869
            if not allowed:
870
                return []
871
        return allowed
872

    
873
    @debug_method
874
    @backend_method
875
    def list_objects(self, user, account, container, prefix='', delimiter=None,
876
                     marker=None, limit=10000, virtual=True, domain=None,
877
                     keys=None, shared=False, until=None, size_range=None,
878
                     public=False):
879
        """List (object name, object version_id) under a container."""
880

    
881
        keys = keys or []
882
        return self._list_objects(
883
            user, account, container, prefix, delimiter, marker, limit,
884
            virtual, domain, keys, shared, until, size_range, False, public)
885

    
886
    @debug_method
887
    @backend_method
888
    def list_object_meta(self, user, account, container, prefix='',
889
                         delimiter=None, marker=None, limit=10000,
890
                         virtual=True, domain=None, keys=None, shared=False,
891
                         until=None, size_range=None, public=False):
892
        """Return a list of metadata dicts of objects under a container."""
893

    
894
        keys = keys or []
895
        props = self._list_objects(
896
            user, account, container, prefix, delimiter, marker, limit,
897
            virtual, domain, keys, shared, until, size_range, True, public)
898
        objects = []
899
        for p in props:
900
            if len(p) == 2:
901
                objects.append({'subdir': p[0]})
902
            else:
903
                objects.append({
904
                    'name': p[0],
905
                    'bytes': p[self.SIZE + 1],
906
                    'type': p[self.TYPE + 1],
907
                    'hash': p[self.HASH + 1],
908
                    'version': p[self.SERIAL + 1],
909
                    'version_timestamp': p[self.MTIME + 1],
910
                    'modified': p[self.MTIME + 1] if until is None else None,
911
                    'modified_by': p[self.MUSER + 1],
912
                    'uuid': p[self.UUID + 1],
913
                    'checksum': p[self.CHECKSUM + 1]})
914
        return objects
915

    
916
    @debug_method
917
    @backend_method
918
    def list_object_permissions(self, user, account, container, prefix=''):
919
        """Return a list of paths enforce permissions under a container."""
920

    
921
        return self._list_object_permissions(user, account, container, prefix,
922
                                             True, False)
923

    
924
    @debug_method
925
    @backend_method
926
    def list_object_public(self, user, account, container, prefix=''):
927
        """Return a mapping of object paths to public ids under a container."""
928

    
929
        public = {}
930
        for path, p in self.permissions.public_list('/'.join((account,
931
                                                              container,
932
                                                              prefix))):
933
            public[path] = p
934
        return public
935

    
936
    @debug_method
937
    @backend_method
938
    def get_object_meta(self, user, account, container, name, domain=None,
939
                        version=None, include_user_defined=True):
940
        """Return a dictionary with the object metadata for the domain."""
941

    
942
        self._can_read_object(user, account, container, name)
943
        path, node = self._lookup_object(account, container, name)
944
        props = self._get_version(node, version)
945
        if version is None:
946
            modified = props[self.MTIME]
947
        else:
948
            try:
949
                modified = self._get_version(
950
                    node)[self.MTIME]  # Overall last modification.
951
            except NameError:  # Object may be deleted.
952
                del_props = self.node.version_lookup(
953
                    node, inf, CLUSTER_DELETED)
954
                if del_props is None:
955
                    raise ItemNotExists('Object does not exist')
956
                modified = del_props[self.MTIME]
957

    
958
        meta = {}
959
        if include_user_defined:
960
            if domain is None:
961
                raise ValueError(
962
                    'Domain argument is obligatory for getting '
963
                    'user defined metadata')
964
            meta.update(
965
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
966
        meta.update({'name': name,
967
                     'bytes': props[self.SIZE],
968
                     'type': props[self.TYPE],
969
                     'hash': props[self.HASH],
970
                     'version': props[self.SERIAL],
971
                     'version_timestamp': props[self.MTIME],
972
                     'modified': modified,
973
                     'modified_by': props[self.MUSER],
974
                     'uuid': props[self.UUID],
975
                     'checksum': props[self.CHECKSUM]})
976
        return meta
977

    
978
    @debug_method
979
    @backend_method
980
    def update_object_meta(self, user, account, container, name, domain, meta,
981
                           replace=False):
982
        """Update object metadata for a domain and return the new version."""
983

    
984
        self._can_write_object(user, account, container, name)
985

    
986
        path, node = self._lookup_object(account, container, name,
987
                                         lock_container=True)
988
        src_version_id, dest_version_id = self._put_metadata(
989
            user, node, domain, meta, replace,
990
            update_statistics_ancestors_depth=1)
991
        self._apply_versioning(account, container, src_version_id,
992
                               update_statistics_ancestors_depth=1)
993
        return dest_version_id
994

    
995
    @debug_method
996
    @backend_method
997
    def get_object_permissions_bulk(self, user, account, container, names):
998
        """Return the action allowed on the object, the path
999
        from which the object gets its permissions from,
1000
        along with a dictionary containing the permissions."""
1001

    
1002
        permissions_path = self._get_permissions_path_bulk(account, container,
1003
                                                           names)
1004
        access_objects = self.permissions.access_check_bulk(permissions_path,
1005
                                                            user)
1006
        #group_parents = access_objects['group_parents']
1007
        nobject_permissions = {}
1008
        cpath = '/'.join((account, container, ''))
1009
        cpath_idx = len(cpath)
1010
        for path in permissions_path:
1011
            allowed = 1
1012
            name = path[cpath_idx:]
1013
            if user != account:
1014
                try:
1015
                    allowed = access_objects[path]
1016
                except KeyError:
1017
                    raise NotAllowedError
1018
            access_dict, allowed = \
1019
                self.permissions.access_get_for_bulk(access_objects[path])
1020
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1021
                                         access_dict)
1022
        self._lookup_objects(permissions_path)
1023
        return nobject_permissions
1024

    
1025
    @debug_method
1026
    @backend_method
1027
    def get_object_permissions(self, user, account, container, name):
1028
        """Return the action allowed on the object, the path
1029
        from which the object gets its permissions from,
1030
        along with a dictionary containing the permissions."""
1031

    
1032
        allowed = 'write'
1033
        permissions_path = self._get_permissions_path(account, container, name)
1034
        if user != account:
1035
            if self.permissions.access_check(permissions_path, self.WRITE,
1036
                                             user):
1037
                allowed = 'write'
1038
            elif self.permissions.access_check(permissions_path, self.READ,
1039
                                               user):
1040
                allowed = 'read'
1041
            else:
1042
                raise NotAllowedError
1043
        self._lookup_object(account, container, name)
1044
        return (allowed,
1045
                permissions_path,
1046
                self.permissions.access_get(permissions_path))
1047

    
1048
    @debug_method
1049
    @backend_method
1050
    def update_object_permissions(self, user, account, container, name,
1051
                                  permissions):
1052
        """Update the permissions associated with the object."""
1053

    
1054
        if user != account:
1055
            raise NotAllowedError
1056
        path = self._lookup_object(account, container, name,
1057
                                   lock_container=True)[0]
1058
        self._check_permissions(path, permissions)
1059
        try:
1060
            self.permissions.access_set(path, permissions)
1061
        except:
1062
            raise ValueError
1063
        else:
1064
            self._report_sharing_change(user, account, path, {'members':
1065
                                        self.permissions.access_members(path)})
1066

    
1067
        # remove all the cached allowed paths
1068
        # filtering out only those affected could be more expensive
1069
        self._reset_allowed_paths()
1070

    
1071
    @debug_method
1072
    @backend_method
1073
    def get_object_public(self, user, account, container, name):
1074
        """Return the public id of the object if applicable."""
1075

    
1076
        self._can_read_object(user, account, container, name)
1077
        path = self._lookup_object(account, container, name)[0]
1078
        p = self.permissions.public_get(path)
1079
        return p
1080

    
1081
    @debug_method
1082
    @backend_method
1083
    def update_object_public(self, user, account, container, name, public):
1084
        """Update the public status of the object."""
1085

    
1086
        self._can_write_object(user, account, container, name)
1087
        path = self._lookup_object(account, container, name,
1088
                                   lock_container=True)[0]
1089
        if not public:
1090
            self.permissions.public_unset(path)
1091
        else:
1092
            self.permissions.public_set(
1093
                path, self.public_url_security, self.public_url_alphabet)
1094

    
1095
    @debug_method
1096
    @backend_method
1097
    def get_object_hashmap(self, user, account, container, name, version=None):
1098
        """Return the object's size and a list with partial hashes."""
1099

    
1100
        self._can_read_object(user, account, container, name)
1101
        path, node = self._lookup_object(account, container, name)
1102
        props = self._get_version(node, version)
1103
        if props[self.HASH] is None:
1104
            return 0, ()
1105
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1106
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1107

    
1108
    def _update_object_hash(self, user, account, container, name, size, type,
1109
                            hash, checksum, domain, meta, replace_meta,
1110
                            permissions, src_node=None, src_version_id=None,
1111
                            is_copy=False, report_size_change=True):
1112
        if permissions is not None and user != account:
1113
            raise NotAllowedError
1114
        self._can_write_object(user, account, container, name)
1115
        if permissions is not None:
1116
            path = '/'.join((account, container, name))
1117
            self._check_permissions(path, permissions)
1118

    
1119
        account_path, account_node = self._lookup_account(account, True)
1120
        container_path, container_node = self._lookup_container(
1121
            account, container)
1122
        project = self._get_project(container_node)
1123

    
1124
        path, node = self._put_object_node(
1125
            container_path, container_node, name)
1126
        pre_version_id, dest_version_id = self._put_version_duplicate(
1127
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1128
            checksum=checksum, is_copy=is_copy,
1129
            update_statistics_ancestors_depth=1)
1130

    
1131
        # Handle meta.
1132
        if src_version_id is None:
1133
            src_version_id = pre_version_id
1134
        self._put_metadata_duplicate(
1135
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1136

    
1137
        del_size = self._apply_versioning(account, container, pre_version_id,
1138
                                          update_statistics_ancestors_depth=1)
1139
        size_delta = size - del_size
1140
        if size_delta > 0:
1141
            # Check account quota.
1142
            if not self.using_external_quotaholder:
1143
                account_quota = long(self._get_policy(
1144
                    account_node, is_account_policy=True)[QUOTA_POLICY])
1145
                account_usage = self._get_statistics(account_node,
1146
                                                     compute=True)[1]
1147
                if (account_quota > 0 and account_usage > account_quota):
1148
                    raise QuotaError(
1149
                        'Account quota exceeded: limit: %s, usage: %s' % (
1150
                            account_quota, account_usage))
1151

    
1152
            # Check container quota.
1153
            container_quota = long(self._get_policy(
1154
                container_node, is_account_policy=False)[QUOTA_POLICY])
1155
            container_usage = self._get_statistics(container_node)[1]
1156
            if (container_quota > 0 and container_usage > container_quota):
1157
                # This must be executed in a transaction, so the version is
1158
                # never created if it fails.
1159
                raise QuotaError(
1160
                    'Container quota exceeded: limit: %s, usage: %s' % (
1161
                        container_quota, container_usage
1162
                    )
1163
                )
1164

    
1165
        if report_size_change:
1166
            self._report_size_change(
1167
                user, account, size_delta, project,
1168
                {'action': 'object update', 'path': path,
1169
                 'versions': ','.join([str(dest_version_id)])})
1170
        if permissions is not None:
1171
            self.permissions.access_set(path, permissions)
1172
            self._report_sharing_change(
1173
                user, account, path,
1174
                {'members': self.permissions.access_members(path)})
1175

    
1176
        self._report_object_change(
1177
            user, account, path,
1178
            details={'version': dest_version_id, 'action': 'object update'})
1179
        return dest_version_id
1180

    
1181
    @debug_method
1182
    def update_object_hashmap(self, user, account, container, name, size, type,
1183
                              hashmap, checksum, domain, meta=None,
1184
                              replace_meta=False, permissions=None):
1185
        """Create/update an object's hashmap and return the new version."""
1186

    
1187
        meta = meta or {}
1188
        if size == 0:  # No such thing as an empty hashmap.
1189
            hashmap = [self.put_block('')]
1190
        map = HashMap(self.block_size, self.hash_algorithm)
1191
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1192
        missing = self.store.block_search(map)
1193
        if missing:
1194
            ie = IndexError()
1195
            ie.data = [binascii.hexlify(x) for x in missing]
1196
            raise ie
1197

    
1198
        hash = map.hash()
1199
        hexlified = binascii.hexlify(hash)
1200
        # _update_object_hash() locks destination path
1201
        dest_version_id = self._update_object_hash(
1202
            user, account, container, name, size, type, hexlified, checksum,
1203
            domain, meta, replace_meta, permissions)
1204
        self.store.map_put(hash, map)
1205
        return dest_version_id, hexlified
1206

    
1207
    @debug_method
1208
    @backend_method
1209
    def update_object_checksum(self, user, account, container, name, version,
1210
                               checksum):
1211
        """Update an object's checksum."""
1212

    
1213
        # Update objects with greater version and same hashmap
1214
        # and size (fix metadata updates).
1215
        self._can_write_object(user, account, container, name)
1216
        path, node = self._lookup_object(account, container, name,
1217
                                         lock_container=True)
1218
        props = self._get_version(node, version)
1219
        versions = self.node.node_get_versions(node)
1220
        for x in versions:
1221
            if (x[self.SERIAL] >= int(version) and
1222
                x[self.HASH] == props[self.HASH] and
1223
                    x[self.SIZE] == props[self.SIZE]):
1224
                self.node.version_put_property(
1225
                    x[self.SERIAL], 'checksum', checksum)
1226

    
1227
    def _copy_object(self, user, src_account, src_container, src_name,
1228
                     dest_account, dest_container, dest_name, type,
1229
                     dest_domain=None, dest_meta=None, replace_meta=False,
1230
                     permissions=None, src_version=None, is_move=False,
1231
                     delimiter=None):
1232

    
1233
        dest_meta = dest_meta or {}
1234
        dest_version_ids = []
1235
        self._can_read_object(user, src_account, src_container, src_name)
1236

    
1237
        src_container_path = '/'.join((src_account, src_container))
1238
        dest_container_path = '/'.join((dest_account, dest_container))
1239
        # Lock container paths in alphabetical order
1240
        if src_container_path < dest_container_path:
1241
            src_container_node = self._lookup_container(src_account,
1242
                                                        src_container)[-1]
1243
            dest_container_node = self._lookup_container(dest_account,
1244
                                                         dest_container)[-1]
1245
        else:
1246
            dest_container_node = self._lookup_container(dest_account,
1247
                                                         dest_container)[-1]
1248
            src_container_node = self._lookup_container(src_account,
1249
                                                        src_container)[-1]
1250

    
1251
        cross_account = src_account != dest_account
1252
        cross_container = src_container != dest_container
1253
        if not cross_account and cross_container:
1254
            src_project = self._get_project(src_container_node)
1255
            dest_project = self._get_project(dest_container_node)
1256
            cross_project = src_project != dest_project
1257
        else:
1258
            cross_project = False
1259
        report_size_change = not is_move or cross_account or cross_project
1260

    
1261
        path, node = self._lookup_object(src_account, src_container, src_name)
1262
        # TODO: Will do another fetch of the properties in duplicate version...
1263
        props = self._get_version(
1264
            node, src_version)  # Check to see if source exists.
1265
        src_version_id = props[self.SERIAL]
1266
        hash = props[self.HASH]
1267
        size = props[self.SIZE]
1268
        is_copy = not is_move and (src_account, src_container, src_name) != (
1269
            dest_account, dest_container, dest_name)  # New uuid.
1270
        dest_version_ids.append(self._update_object_hash(
1271
            user, dest_account, dest_container, dest_name, size, type, hash,
1272
            None, dest_domain, dest_meta, replace_meta, permissions,
1273
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1274
            report_size_change=report_size_change))
1275
        if is_move and ((src_account, src_container, src_name) !=
1276
                        (dest_account, dest_container, dest_name)):
1277
            self._delete_object(user, src_account, src_container, src_name,
1278
                                report_size_change=report_size_change)
1279

    
1280
        if delimiter:
1281
            prefix = (src_name + delimiter if not
1282
                      src_name.endswith(delimiter) else src_name)
1283
            src_names = self._list_objects_no_limit(
1284
                user, src_account, src_container, prefix, delimiter=None,
1285
                virtual=False, domain=None, keys=[], shared=False, until=None,
1286
                size_range=None, all_props=True, public=False)
1287
            src_names.sort(key=lambda x: x[2])  # order by nodes
1288
            paths = [elem[0] for elem in src_names]
1289
            nodes = [elem[2] for elem in src_names]
1290
            # TODO: Will do another fetch of the properties
1291
            # in duplicate version...
1292
            props = self._get_versions(nodes)  # Check to see if source exists.
1293

    
1294
            for prop, path, node in zip(props, paths, nodes):
1295
                src_version_id = prop[self.SERIAL]
1296
                hash = prop[self.HASH]
1297
                vtype = prop[self.TYPE]
1298
                size = prop[self.SIZE]
1299
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1300
                    delimiter) else dest_name
1301
                vdest_name = path.replace(prefix, dest_prefix, 1)
1302
                # _update_object_hash() locks destination path
1303
                dest_version_ids.append(self._update_object_hash(
1304
                    user, dest_account, dest_container, vdest_name, size,
1305
                    vtype, hash, None, dest_domain, meta={},
1306
                    replace_meta=False, permissions=None, src_node=node,
1307
                    src_version_id=src_version_id, is_copy=is_copy,
1308
                    report_size_change=report_size_change))
1309
                if is_move and ((src_account, src_container, src_name) !=
1310
                                (dest_account, dest_container, dest_name)):
1311
                    self._delete_object(user, src_account, src_container, path,
1312
                                        report_size_change=report_size_change)
1313
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1314
                dest_version_ids)
1315

    
1316
    @debug_method
1317
    @backend_method
1318
    def copy_object(self, user, src_account, src_container, src_name,
1319
                    dest_account, dest_container, dest_name, type, domain,
1320
                    meta=None, replace_meta=False, permissions=None,
1321
                    src_version=None, delimiter=None):
1322
        """Copy an object's data and metadata."""
1323

    
1324
        meta = meta or {}
1325
        dest_version_id = self._copy_object(
1326
            user, src_account, src_container, src_name, dest_account,
1327
            dest_container, dest_name, type, domain, meta, replace_meta,
1328
            permissions, src_version, False, delimiter)
1329
        return dest_version_id
1330

    
1331
    @debug_method
1332
    @backend_method
1333
    def move_object(self, user, src_account, src_container, src_name,
1334
                    dest_account, dest_container, dest_name, type, domain,
1335
                    meta=None, replace_meta=False, permissions=None,
1336
                    delimiter=None):
1337
        """Move an object's data and metadata."""
1338

    
1339
        meta = meta or {}
1340
        if user != src_account:
1341
            raise NotAllowedError
1342
        dest_version_id = self._move_object(
1343
            user, src_account, src_container, src_name, dest_account,
1344
            dest_container, dest_name, type, domain, meta, replace_meta,
1345
            permissions, None, delimiter=delimiter)
1346
        return dest_version_id
1347

    
1348
    def _delete_object(self, user, account, container, name, until=None,
1349
                       delimiter=None, report_size_change=True):
1350
        if user != account:
1351
            raise NotAllowedError
1352

    
1353
        # lock container path
1354
        container_path, container_node = self._lookup_container(account,
1355
                                                                container)
1356
        project = self._get_project(container_node)
1357
        path, node = self._lookup_object(account, container, name)
1358

    
1359
        if until is not None:
1360
            if node is None:
1361
                return
1362
            hashes = []
1363
            size = 0
1364
            serials = []
1365
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1366
                                           update_statistics_ancestors_depth=1)
1367
            hashes += h
1368
            size += s
1369
            serials += v
1370
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1371
                                           update_statistics_ancestors_depth=1)
1372
            hashes += h
1373
            if not self.free_versioning:
1374
                size += s
1375
            serials += v
1376
            for h in hashes:
1377
                self.store.map_delete(h)
1378
            self.node.node_purge(node, until, CLUSTER_DELETED,
1379
                                 update_statistics_ancestors_depth=1)
1380
            try:
1381
                self._get_version(node)
1382
            except NameError:
1383
                self.permissions.access_clear(path)
1384
            self._report_size_change(
1385
                user, account, -size, project, {
1386
                    'action': 'object purge',
1387
                    'path': path,
1388
                    'versions': ','.join(str(i) for i in serials)
1389
                }
1390
            )
1391
            return
1392

    
1393
        if not self._exists(node):
1394
            raise ItemNotExists('Object is deleted.')
1395

    
1396
        src_version_id, dest_version_id = self._put_version_duplicate(
1397
            user, node, size=0, type='', hash=None, checksum='',
1398
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1399
        del_size = self._apply_versioning(account, container, src_version_id,
1400
                                          update_statistics_ancestors_depth=1)
1401
        if report_size_change:
1402
            self._report_size_change(
1403
                user, account, -del_size, project,
1404
                {'action': 'object delete',
1405
                 'path': path,
1406
                 'versions': ','.join([str(dest_version_id)])})
1407
        self._report_object_change(
1408
            user, account, path, details={'action': 'object delete'})
1409
        self.permissions.access_clear(path)
1410

    
1411
        if delimiter:
1412
            prefix = name + delimiter if not name.endswith(delimiter) else name
1413
            src_names = self._list_objects_no_limit(
1414
                user, account, container, prefix, delimiter=None,
1415
                virtual=False, domain=None, keys=[], shared=False, until=None,
1416
                size_range=None, all_props=True, public=False)
1417
            paths = []
1418
            for t in src_names:
1419
                path = '/'.join((account, container, t[0]))
1420
                node = t[2]
1421
                if not self._exists(node):
1422
                    continue
1423
                src_version_id, dest_version_id = self._put_version_duplicate(
1424
                    user, node, size=0, type='', hash=None, checksum='',
1425
                    cluster=CLUSTER_DELETED,
1426
                    update_statistics_ancestors_depth=1)
1427
                del_size = self._apply_versioning(
1428
                    account, container, src_version_id,
1429
                    update_statistics_ancestors_depth=1)
1430
                if report_size_change:
1431
                    self._report_size_change(
1432
                        user, account, -del_size, project,
1433
                        {'action': 'object delete',
1434
                         'path': path,
1435
                         'versions': ','.join([str(dest_version_id)])})
1436
                self._report_object_change(
1437
                    user, account, path, details={'action': 'object delete'})
1438
                paths.append(path)
1439
            self.permissions.access_clear_bulk(paths)
1440

    
1441
        # remove all the cached allowed paths
1442
        # removing the specific path could be more expensive
1443
        self._reset_allowed_paths()
1444

    
1445
    @debug_method
1446
    @backend_method
1447
    def delete_object(self, user, account, container, name, until=None,
1448
                      prefix='', delimiter=None):
1449
        """Delete/purge an object."""
1450

    
1451
        self._delete_object(user, account, container, name, until, delimiter)
1452

    
1453
    @debug_method
1454
    @backend_method
1455
    def list_versions(self, user, account, container, name):
1456
        """Return a list of all object (version, version_timestamp) tuples."""
1457

    
1458
        self._can_read_object(user, account, container, name)
1459
        path, node = self._lookup_object(account, container, name)
1460
        versions = self.node.node_get_versions(node)
1461
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1462
                x[self.CLUSTER] != CLUSTER_DELETED]
1463

    
1464
    @debug_method
1465
    @backend_method
1466
    def get_uuid(self, user, uuid, check_permissions=True):
1467
        """Return the (account, container, name) for the UUID given."""
1468

    
1469
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1470
        if info is None:
1471
            raise NameError
1472
        path, serial = info
1473
        account, container, name = path.split('/', 2)
1474
        if check_permissions:
1475
            self._can_read_object(user, account, container, name)
1476
        return (account, container, name)
1477

    
1478
    @debug_method
1479
    @backend_method
1480
    def get_public(self, user, public):
1481
        """Return the (account, container, name) for the public id given."""
1482

    
1483
        path = self.permissions.public_path(public)
1484
        if path is None:
1485
            raise NameError
1486
        account, container, name = path.split('/', 2)
1487
        self._can_read_object(user, account, container, name)
1488
        return (account, container, name)
1489

    
1490
    def get_block(self, hash):
1491
        """Return a block's data."""
1492

    
1493
        logger.debug("get_block: %s", hash)
1494
        block = self.store.block_get(self._unhexlify_hash(hash))
1495
        if not block:
1496
            raise ItemNotExists('Block does not exist')
1497
        return block
1498

    
1499
    def put_block(self, data):
1500
        """Store a block and return the hash."""
1501

    
1502
        logger.debug("put_block: %s", len(data))
1503
        return binascii.hexlify(self.store.block_put(data))
1504

    
1505
    def update_block(self, hash, data, offset=0):
1506
        """Update a known block and return the hash."""
1507

    
1508
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1509
        if offset == 0 and len(data) == self.block_size:
1510
            return self.put_block(data)
1511
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1512
        return binascii.hexlify(h)
1513

    
1514
    # Path functions.
1515

    
1516
    def _generate_uuid(self):
1517
        return str(uuidlib.uuid4())
1518

    
1519
    def _put_object_node(self, path, parent, name):
1520
        path = '/'.join((path, name))
1521
        node = self.node.node_lookup(path)
1522
        if node is None:
1523
            node = self.node.node_create(parent, path)
1524
        return path, node
1525

    
1526
    def _put_path(self, user, parent, path,
1527
                  update_statistics_ancestors_depth=None):
1528
        node = self.node.node_create(parent, path)
1529
        self.node.version_create(node, None, 0, '', None, user,
1530
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1531
                                 update_statistics_ancestors_depth)
1532
        return node
1533

    
1534
    def _lookup_account(self, account, create=True):
1535
        node = self.node.node_lookup(account)
1536
        if node is None and create:
1537
            node = self._put_path(
1538
                account, self.ROOTNODE, account,
1539
                update_statistics_ancestors_depth=-1)  # User is account.
1540
        return account, node
1541

    
1542
    def _lookup_container(self, account, container):
1543
        for_update = True if self.lock_container_path else False
1544
        path = '/'.join((account, container))
1545
        node = self.node.node_lookup(path, for_update)
1546
        if node is None:
1547
            raise ItemNotExists('Container does not exist')
1548
        return path, node
1549

    
1550
    def _lookup_object(self, account, container, name, lock_container=False):
1551
        if lock_container:
1552
            self._lookup_container(account, container)
1553

    
1554
        path = '/'.join((account, container, name))
1555
        node = self.node.node_lookup(path)
1556
        if node is None:
1557
            raise ItemNotExists('Object does not exist')
1558
        return path, node
1559

    
1560
    def _lookup_objects(self, paths):
1561
        nodes = self.node.node_lookup_bulk(paths)
1562
        return paths, nodes
1563

    
1564
    def _get_properties(self, node, until=None):
1565
        """Return properties until the timestamp given."""
1566

    
1567
        before = until if until is not None else inf
1568
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1569
        if props is None and until is not None:
1570
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1571
        if props is None:
1572
            raise ItemNotExists('Path does not exist')
1573
        return props
1574

    
1575
    def _get_statistics(self, node, until=None, compute=False):
1576
        """Return (count, sum of size, timestamp) of everything under node."""
1577

    
1578
        if until is not None:
1579
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1580
        elif compute:
1581
            stats = self.node.statistics_latest(node,
1582
                                                except_cluster=CLUSTER_DELETED)
1583
        else:
1584
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1585
        if stats is None:
1586
            stats = (0, 0, 0)
1587
        return stats
1588

    
1589
    def _get_version(self, node, version=None):
1590
        if version is None:
1591
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1592
            if props is None:
1593
                raise ItemNotExists('Object does not exist')
1594
        else:
1595
            try:
1596
                version = int(version)
1597
            except ValueError:
1598
                raise VersionNotExists('Version does not exist')
1599
            props = self.node.version_get_properties(version, node=node)
1600
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1601
                raise VersionNotExists('Version does not exist')
1602
        return props
1603

    
1604
    def _get_versions(self, nodes):
1605
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1606

    
1607
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1608
                               type=None, hash=None, checksum=None,
1609
                               cluster=CLUSTER_NORMAL, is_copy=False,
1610
                               update_statistics_ancestors_depth=None):
1611
        """Create a new version of the node."""
1612

    
1613
        props = self.node.version_lookup(
1614
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1615
        if props is not None:
1616
            src_version_id = props[self.SERIAL]
1617
            src_hash = props[self.HASH]
1618
            src_size = props[self.SIZE]
1619
            src_type = props[self.TYPE]
1620
            src_checksum = props[self.CHECKSUM]
1621
        else:
1622
            src_version_id = None
1623
            src_hash = None
1624
            src_size = 0
1625
            src_type = ''
1626
            src_checksum = ''
1627
        if size is None:  # Set metadata.
1628
            hash = src_hash  # This way hash can be set to None
1629
                             # (account or container).
1630
            size = src_size
1631
        if type is None:
1632
            type = src_type
1633
        if checksum is None:
1634
            checksum = src_checksum
1635
        uuid = self._generate_uuid(
1636
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1637

    
1638
        if src_node is None:
1639
            pre_version_id = src_version_id
1640
        else:
1641
            pre_version_id = None
1642
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1643
            if props is not None:
1644
                pre_version_id = props[self.SERIAL]
1645
        if pre_version_id is not None:
1646
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1647
                                        update_statistics_ancestors_depth)
1648

    
1649
        dest_version_id, mtime = self.node.version_create(
1650
            node, hash, size, type, src_version_id, user, uuid, checksum,
1651
            cluster, update_statistics_ancestors_depth)
1652

    
1653
        self.node.attribute_unset_is_latest(node, dest_version_id)
1654

    
1655
        return pre_version_id, dest_version_id
1656

    
1657
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1658
                                node, meta, replace=False):
1659
        if src_version_id is not None:
1660
            self.node.attribute_copy(src_version_id, dest_version_id)
1661
        if not replace:
1662
            self.node.attribute_del(dest_version_id, domain, (
1663
                k for k, v in meta.iteritems() if v == ''))
1664
            self.node.attribute_set(dest_version_id, domain, node, (
1665
                (k, v) for k, v in meta.iteritems() if v != ''))
1666
        else:
1667
            self.node.attribute_del(dest_version_id, domain)
1668
            self.node.attribute_set(dest_version_id, domain, node, ((
1669
                k, v) for k, v in meta.iteritems()))
1670

    
1671
    def _put_metadata(self, user, node, domain, meta, replace=False,
1672
                      update_statistics_ancestors_depth=None):
1673
        """Create a new version and store metadata."""
1674

    
1675
        src_version_id, dest_version_id = self._put_version_duplicate(
1676
            user, node,
1677
            update_statistics_ancestors_depth=
1678
            update_statistics_ancestors_depth)
1679
        self._put_metadata_duplicate(
1680
            src_version_id, dest_version_id, domain, node, meta, replace)
1681
        return src_version_id, dest_version_id
1682

    
1683
    def _list_limits(self, listing, marker, limit):
1684
        start = 0
1685
        if marker:
1686
            try:
1687
                start = listing.index(marker) + 1
1688
            except ValueError:
1689
                pass
1690
        if not limit or limit > 10000:
1691
            limit = 10000
1692
        return start, limit
1693

    
1694
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1695
                                marker=None, limit=10000, virtual=True,
1696
                                domain=None, keys=None, until=None,
1697
                                size_range=None, allowed=None,
1698
                                all_props=False):
1699
        keys = keys or []
1700
        allowed = allowed or []
1701
        cont_prefix = path + '/'
1702
        prefix = cont_prefix + prefix
1703
        start = cont_prefix + marker if marker else None
1704
        before = until if until is not None else inf
1705
        filterq = keys if domain else []
1706
        sizeq = size_range
1707

    
1708
        objects, prefixes = self.node.latest_version_list(
1709
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1710
            allowed, domain, filterq, sizeq, all_props)
1711
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1712
        objects.sort(key=lambda x: x[0])
1713
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1714
        return objects
1715

    
1716
    # Reporting functions.
1717

    
1718
    @debug_method
1719
    @backend_method
1720
    def _report_size_change(self, user, account, size, source, details=None):
1721
        details = details or {}
1722

    
1723
        if size == 0:
1724
            return
1725

    
1726
        account_node = self._lookup_account(account, True)[1]
1727
        total = self._get_statistics(account_node, compute=True)[1]
1728
        details.update({'user': user, 'total': total})
1729
        self.messages.append(
1730
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1731
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1732

    
1733
        if not self.using_external_quotaholder:
1734
            return
1735

    
1736
        try:
1737
            name = details['path'] if 'path' in details else ''
1738
            serial = self.astakosclient.issue_one_commission(
1739
                holder=account,
1740
                source=source,
1741
                provisions={'pithos.diskspace': size},
1742
                name=name)
1743
        except BaseException, e:
1744
            raise QuotaError(e)
1745
        else:
1746
            self.serials.append(serial)
1747

    
1748
    @debug_method
1749
    @backend_method
1750
    def _report_object_change(self, user, account, path, details=None):
1751
        details = details or {}
1752
        details.update({'user': user})
1753
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1754
                              account, QUEUE_INSTANCE_ID, 'object', path,
1755
                              details))
1756

    
1757
    @debug_method
1758
    @backend_method
1759
    def _report_sharing_change(self, user, account, path, details=None):
1760
        details = details or {}
1761
        details.update({'user': user})
1762
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1763
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1764
                              details))
1765

    
1766
    # Policy functions.
1767

    
1768
    def _check_project(self, value):
1769
        # raise ValueError('Bad quota source policy')
1770
        pass
1771

    
1772
    def _check_policy(self, policy):
1773
        for k, v in policy.iteritems():
1774
            if k == QUOTA_POLICY:
1775
                q = int(v)  # May raise ValueError.
1776
                if q < 0:
1777
                    raise ValueError
1778
            elif k == VERSIONING_POLICY:
1779
                if v not in ['auto', 'none']:
1780
                    raise ValueError
1781
            elif k == PROJECT:
1782
                self._check_project(v)
1783
            else:
1784
                raise ValueError
1785

    
1786
    def _get_default_policy(self, node=None, is_account_policy=True,
1787
                            default_project=None):
1788
        if is_account_policy:
1789
            default_policy = self.default_account_policy
1790
        else:
1791
            default_policy = self.default_container_policy
1792
            if default_project is None and node is not None:
1793
                # set container's account as the default quota source
1794
                default_project = self.node.node_get_parent_path(node)
1795
            default_policy[PROJECT] = default_project
1796
        return default_policy
1797

    
1798
    def _put_policy(self, node, policy, replace,
1799
                    is_account_policy=True, default_project=None,
1800
                    check=True):
1801
        default_policy = self._get_default_policy(node,
1802
                                                  is_account_policy,
1803
                                                  default_project)
1804
        if replace:
1805
            for k, v in default_policy.iteritems():
1806
                if k not in policy:
1807
                    policy[k] = v
1808
        if check:
1809
            self._check_policy(policy)
1810

    
1811
        self.node.policy_set(node, policy)
1812

    
1813
    def _get_policy(self, node, is_account_policy=True,
1814
                    default_project=None):
1815
        default_policy = self._get_default_policy(node,
1816
                                                  is_account_policy,
1817
                                                  default_project)
1818
        policy = default_policy.copy()
1819
        policy.update(self.node.policy_get(node))
1820
        return policy
1821

    
1822
    def _get_project(self, node):
1823
        policy = self._get_policy(node, is_account_policy=False)
1824
        return policy[PROJECT]
1825

    
1826
    def _apply_versioning(self, account, container, version_id,
1827
                          update_statistics_ancestors_depth=None):
1828
        """Delete the provided version if such is the policy.
1829
           Return size of object removed.
1830
        """
1831

    
1832
        if version_id is None:
1833
            return 0
1834
        path, node = self._lookup_container(account, container)
1835
        versioning = self._get_policy(
1836
            node, is_account_policy=False)[VERSIONING_POLICY]
1837
        if versioning != 'auto':
1838
            hash, size = self.node.version_remove(
1839
                version_id, update_statistics_ancestors_depth)
1840
            self.store.map_delete(hash)
1841
            return size
1842
        elif self.free_versioning:
1843
            return self.node.version_get_properties(
1844
                version_id, keys=('size',))[0]
1845
        return 0
1846

    
1847
    # Access control functions.
1848

    
1849
    def _check_groups(self, groups):
1850
        # raise ValueError('Bad characters in groups')
1851
        pass
1852

    
1853
    def _check_permissions(self, path, permissions):
1854
        # raise ValueError('Bad characters in permissions')
1855
        pass
1856

    
1857
    def _get_formatted_paths(self, paths):
1858
        formatted = []
1859
        if len(paths) == 0:
1860
            return formatted
1861
        props = self.node.get_props(paths)
1862
        if props:
1863
            for prop in props:
1864
                if prop[1].split(';', 1)[0].strip() in (
1865
                        'application/directory', 'application/folder'):
1866
                    formatted.append((prop[0].rstrip('/') + '/',
1867
                                      self.MATCH_PREFIX))
1868
                formatted.append((prop[0], self.MATCH_EXACT))
1869
        return formatted
1870

    
1871
    def _get_permissions_path(self, account, container, name):
1872
        path = '/'.join((account, container, name))
1873
        permission_paths = self.permissions.access_inherit(path)
1874
        permission_paths.sort()
1875
        permission_paths.reverse()
1876
        for p in permission_paths:
1877
            if p == path:
1878
                return p
1879
            else:
1880
                if p.count('/') < 2:
1881
                    continue
1882
                node = self.node.node_lookup(p)
1883
                props = None
1884
                if node is not None:
1885
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1886
                if props is not None:
1887
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1888
                            'application/directory', 'application/folder'):
1889
                        return p
1890
        return None
1891

    
1892
    def _get_permissions_path_bulk(self, account, container, names):
1893
        formatted_paths = []
1894
        for name in names:
1895
            path = '/'.join((account, container, name))
1896
            formatted_paths.append(path)
1897
        permission_paths = self.permissions.access_inherit_bulk(
1898
            formatted_paths)
1899
        permission_paths.sort()
1900
        permission_paths.reverse()
1901
        permission_paths_list = []
1902
        lookup_list = []
1903
        for p in permission_paths:
1904
            if p in formatted_paths:
1905
                permission_paths_list.append(p)
1906
            else:
1907
                if p.count('/') < 2:
1908
                    continue
1909
                lookup_list.append(p)
1910

    
1911
        if len(lookup_list) > 0:
1912
            props = self.node.get_props(lookup_list)
1913
            if props:
1914
                for prop in props:
1915
                    if prop[1].split(';', 1)[0].strip() in (
1916
                            'application/directory', 'application/folder'):
1917
                        permission_paths_list.append(prop[0])
1918

    
1919
        if len(permission_paths_list) > 0:
1920
            return permission_paths_list
1921

    
1922
        return None
1923

    
1924
    def _reset_allowed_paths(self):
1925
        self.read_allowed_paths = defaultdict(set)
1926
        self.write_allowed_paths = defaultdict(set)
1927

    
1928
    @check_allowed_paths(action=0)
1929
    def _can_read_account(self, user, account):
1930
        if user != account:
1931
            if account not in self._allowed_accounts(user):
1932
                raise NotAllowedError
1933

    
1934
    @check_allowed_paths(action=1)
1935
    def _can_write_account(self, user, account):
1936
        if user != account:
1937
            raise NotAllowedError
1938

    
1939
    @check_allowed_paths(action=0)
1940
    def _can_read_container(self, user, account, container):
1941
        if user != account:
1942
            if container not in self._allowed_containers(user, account):
1943
                raise NotAllowedError
1944

    
1945
    @check_allowed_paths(action=1)
1946
    def _can_write_container(self, user, account, container):
1947
        if user != account:
1948
            raise NotAllowedError
1949

    
1950
    def can_write_container(self, user, account, container):
1951
        return self._can_write_container(user, account, container)
1952

    
1953
    @check_allowed_paths(action=0)
1954
    def _can_read_object(self, user, account, container, name):
1955
        if user == account:
1956
            return True
1957
        path = '/'.join((account, container, name))
1958
        if self.permissions.public_get(path) is not None:
1959
            return True
1960
        path = self._get_permissions_path(account, container, name)
1961
        if not path:
1962
            raise NotAllowedError
1963
        if (not self.permissions.access_check(path, self.READ, user) and not
1964
                self.permissions.access_check(path, self.WRITE, user)):
1965
            raise NotAllowedError
1966

    
1967
    @check_allowed_paths(action=1)
1968
    def _can_write_object(self, user, account, container, name):
1969
        if user == account:
1970
            return True
1971
        path = '/'.join((account, container, name))
1972
        path = self._get_permissions_path(account, container, name)
1973
        if not path:
1974
            raise NotAllowedError
1975
        if not self.permissions.access_check(path, self.WRITE, user):
1976
            raise NotAllowedError
1977

    
1978
    def _allowed_accounts(self, user):
1979
        allow = set()
1980
        for path in self.permissions.access_list_paths(user):
1981
            p = path.split('/', 1)[0]
1982
            allow.add(p)
1983
        self.read_allowed_paths[user] |= allow
1984
        return sorted(allow)
1985

    
1986
    def _allowed_containers(self, user, account):
1987
        allow = set()
1988
        for path in self.permissions.access_list_paths(user, account):
1989
            p = path.split('/', 2)[1]
1990
            allow.add(p)
1991
        self.read_allowed_paths[user] |= allow
1992
        return sorted(allow)
1993

    
1994
    # Domain functions
1995

    
1996
    @debug_method
1997
    @backend_method
1998
    def get_domain_objects(self, domain, user=None):
1999
        allowed_paths = self.permissions.access_list_paths(
2000
            user, include_owned=user is not None, include_containers=False)
2001
        if not allowed_paths:
2002
            return []
2003
        obj_list = self.node.domain_object_list(
2004
            domain, allowed_paths, CLUSTER_NORMAL)
2005
        return [(path,
2006
                 self._build_metadata(props, user_defined_meta),
2007
                 self.permissions.access_get(path)) for
2008
                path, props, user_defined_meta in obj_list]
2009

    
2010
    # util functions
2011

    
2012
    def _build_metadata(self, props, user_defined=None,
2013
                        include_user_defined=True):
2014
        meta = {'bytes': props[self.SIZE],
2015
                'type': props[self.TYPE],
2016
                'hash': props[self.HASH],
2017
                'version': props[self.SERIAL],
2018
                'version_timestamp': props[self.MTIME],
2019
                'modified_by': props[self.MUSER],
2020
                'uuid': props[self.UUID],
2021
                'checksum': props[self.CHECKSUM]}
2022
        if include_user_defined and user_defined is not None:
2023
            meta.update(user_defined)
2024
        return meta
2025

    
2026
    def _exists(self, node):
2027
        try:
2028
            self._get_version(node)
2029
        except ItemNotExists:
2030
            return False
2031
        else:
2032
            return True
2033

    
2034
    def _unhexlify_hash(self, hash):
2035
        try:
2036
            return binascii.unhexlify(hash)
2037
        except TypeError:
2038
            raise InvalidHash(hash)