Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 78e1f8da

History | View | Annotate | Download (79.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
QUOTA_POLICY = 'quota'
120
VERSIONING_POLICY = 'versioning'
121
PROJECT = 'project'
122

    
123
inf = float('inf')
124

    
125
ULTIMATE_ANSWER = 42
126

    
127
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
128

    
129
logger = logging.getLogger(__name__)
130

    
131

    
132
def backend_method(func):
133
    @wraps(func)
134
    def wrapper(self, *args, **kw):
135
        # if we are inside a database transaction
136
        # just proceed with the method execution
137
        # otherwise manage a new transaction
138
        if self.in_transaction:
139
            return func(self, *args, **kw)
140

    
141
        try:
142
            self.pre_exec()
143
            result = func(self, *args, **kw)
144
            success_status = True
145
            return result
146
        except:
147
            success_status = False
148
            raise
149
        finally:
150
            self.post_exec(success_status)
151
    return wrapper
152

    
153

    
154
def debug_method(func):
155
    @wraps(func)
156
    def wrapper(self, *args, **kw):
157
        try:
158
            result = func(self, *args, **kw)
159
            return result
160
        except:
161
            result = format_exc()
162
            raise
163
        finally:
164
            all_args = map(repr, args)
165
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
166
            logger.debug(">>> %s(%s) <<< %s" % (
167
                func.__name__, ', '.join(all_args).rstrip(', '), result))
168
    return wrapper
169

    
170

    
171
def check_allowed_paths(action):
172
    """Decorator for backend methods checking path access granted to user.
173

174
    The 1st argument of the decorated method is expected to be a
175
    ModularBackend instance, the 2nd the user performing the request and
176
    the path join of the rest arguments is supposed to be the requested path.
177

178
    The decorator checks whether the requested path is among the user's allowed
179
    cached paths.
180
    If this is the case, the decorator returns immediately to reduce the
181
    interactions with the database.
182
    Otherwise, it proceeds with the execution of the decorated method and if
183
    the method returns successfully (no exceptions are raised), the requested
184
    path is added to the user's cached allowed paths.
185

186
    :param action: (int) 0 for reads / 1 for writes
187
    :raises NotAllowedError: the user does not have access to the path
188
    """
189
    def decorator(func):
190
        @wraps(func)
191
        def wrapper(self, *args):
192
            user = args[0]
193
            if action == self.READ:
194
                d = self.read_allowed_paths
195
            else:
196
                d = self.write_allowed_paths
197
            path = '/'.join(args[1:])
198
            if path in d.get(user, []):
199
                return  # access is already checked
200
            else:
201
                func(self, *args)   # proceed with access check
202
                d[user].add(path)  # add path in the allowed user paths
203
        return wrapper
204
    return decorator
205

    
206

    
207
def list_method(func):
208
    @wraps(func)
209
    def wrapper(self, *args, **kw):
210
        marker = kw.get('marker')
211
        limit = kw.get('limit')
212
        result = func(self, *args, **kw)
213
        start, limit = self._list_limits(result, marker, limit)
214
        return result[start:start + limit]
215
    return wrapper
216

    
217

    
218
class ModularBackend(BaseBackend):
219
    """A modular backend.
220

221
    Uses modules for SQL functions and storage.
222
    """
223

    
224
    def __init__(self, db_module=None, db_connection=None,
225
                 block_module=None, block_path=None, block_umask=None,
226
                 block_size=None, hash_algorithm=None,
227
                 queue_module=None, queue_hosts=None, queue_exchange=None,
228
                 astakos_auth_url=None, service_token=None,
229
                 astakosclient_poolsize=None,
230
                 free_versioning=True, block_params=None,
231
                 public_url_security=None,
232
                 public_url_alphabet=None,
233
                 account_quota_policy=None,
234
                 container_quota_policy=None,
235
                 container_versioning_policy=None):
236
        db_module = db_module or DEFAULT_DB_MODULE
237
        db_connection = db_connection or DEFAULT_DB_CONNECTION
238
        block_module = block_module or DEFAULT_BLOCK_MODULE
239
        block_path = block_path or DEFAULT_BLOCK_PATH
240
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
241
        block_params = block_params or DEFAULT_BLOCK_PARAMS
242
        block_size = block_size or DEFAULT_BLOCK_SIZE
243
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
244
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
245
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
246
        container_quota_policy = container_quota_policy \
247
            or DEFAULT_CONTAINER_QUOTA
248
        container_versioning_policy = container_versioning_policy \
249
            or DEFAULT_CONTAINER_VERSIONING
250

    
251
        self.default_account_policy = {}
252
        self.default_container_policy = {
253
            QUOTA_POLICY: container_quota_policy,
254
            VERSIONING_POLICY: container_versioning_policy,
255
            PROJECT: None
256
        }
257
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
258
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
259

    
260
        self.public_url_security = (public_url_security or
261
                                    DEFAULT_PUBLIC_URL_SECURITY)
262
        self.public_url_alphabet = (public_url_alphabet or
263
                                    DEFAULT_PUBLIC_URL_ALPHABET)
264

    
265
        self.hash_algorithm = hash_algorithm
266
        self.block_size = block_size
267
        self.free_versioning = free_versioning
268

    
269
        def load_module(m):
270
            __import__(m)
271
            return sys.modules[m]
272

    
273
        self.db_module = load_module(db_module)
274
        self.wrapper = self.db_module.DBWrapper(db_connection)
275
        params = {'wrapper': self.wrapper}
276
        self.permissions = self.db_module.Permissions(**params)
277
        self.config = self.db_module.Config(**params)
278
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
279
        for x in ['READ', 'WRITE']:
280
            setattr(self, x, getattr(self.db_module, x))
281
        self.node = self.db_module.Node(**params)
282
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
283
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
284
                  'MATCH_PREFIX', 'MATCH_EXACT']:
285
            setattr(self, x, getattr(self.db_module, x))
286

    
287
        self.ALLOWED = ['read', 'write']
288

    
289
        self.block_module = load_module(block_module)
290
        self.block_params = block_params
291
        params = {'path': block_path,
292
                  'block_size': self.block_size,
293
                  'hash_algorithm': self.hash_algorithm,
294
                  'umask': block_umask}
295
        params.update(self.block_params)
296
        self.store = self.block_module.Store(**params)
297

    
298
        if queue_module and queue_hosts:
299
            self.queue_module = load_module(queue_module)
300
            params = {'hosts': queue_hosts,
301
                      'exchange': queue_exchange,
302
                      'client_id': QUEUE_CLIENT_ID}
303
            self.queue = self.queue_module.Queue(**params)
304
        else:
305
            class NoQueue:
306
                def send(self, *args):
307
                    pass
308

    
309
                def close(self):
310
                    pass
311

    
312
            self.queue = NoQueue()
313

    
314
        self.astakos_auth_url = astakos_auth_url
315
        self.service_token = service_token
316

    
317
        if not astakos_auth_url or not AstakosClient:
318
            self.astakosclient = DisabledAstakosClient(
319
                service_token, astakos_auth_url,
320
                use_pool=True,
321
                pool_size=astakosclient_poolsize)
322
        else:
323
            self.astakosclient = AstakosClient(
324
                service_token, astakos_auth_url,
325
                use_pool=True,
326
                pool_size=astakosclient_poolsize)
327

    
328
        self.serials = []
329
        self.messages = []
330

    
331
        self._move_object = partial(self._copy_object, is_move=True)
332

    
333
        self.lock_container_path = False
334

    
335
        self.in_transaction = False
336

    
337
        self._reset_allowed_paths()
338

    
339
    def pre_exec(self, lock_container_path=False):
340
        self.lock_container_path = lock_container_path
341
        self.wrapper.execute()
342
        self.serials = []
343
        self._reset_allowed_paths()
344
        self.in_transaction = True
345

    
346
    def post_exec(self, success_status=True):
347
        if success_status:
348
            # send messages produced
349
            for m in self.messages:
350
                self.queue.send(*m)
351

    
352
            # register serials
353
            if self.serials:
354
                self.commission_serials.insert_many(
355
                    self.serials)
356

    
357
                # commit to ensure that the serials are registered
358
                # even if resolve commission fails
359
                self.wrapper.commit()
360

    
361
                # start new transaction
362
                self.wrapper.execute()
363

    
364
                r = self.astakosclient.resolve_commissions(
365
                    accept_serials=self.serials,
366
                    reject_serials=[])
367
                self.commission_serials.delete_many(
368
                    r['accepted'])
369

    
370
            self.wrapper.commit()
371
        else:
372
            if self.serials:
373
                r = self.astakosclient.resolve_commissions(
374
                    accept_serials=[],
375
                    reject_serials=self.serials)
376
                self.commission_serials.delete_many(
377
                    r['rejected'])
378
            self.wrapper.rollback()
379
        self.in_transaction = False
380

    
381
    def close(self):
382
        self.wrapper.close()
383
        self.queue.close()
384

    
385
    @property
386
    def using_external_quotaholder(self):
387
        return not isinstance(self.astakosclient, DisabledAstakosClient)
388

    
389
    @debug_method
390
    @backend_method
391
    @list_method
392
    def list_accounts(self, user, marker=None, limit=10000):
393
        """Return a list of accounts the user can access."""
394

    
395
        return self._allowed_accounts(user)
396

    
397
    @debug_method
398
    @backend_method
399
    def get_account_meta(self, user, account, domain=None, until=None,
400
                         include_user_defined=True):
401
        """Return a dictionary with the account metadata for the domain."""
402

    
403
        self._can_read_account(user, account)
404
        path, node = self._lookup_account(account, user == account)
405
        if user != account:
406
            if until or (node is None):
407
                raise NotAllowedError
408
        try:
409
            props = self._get_properties(node, until)
410
            mtime = props[self.MTIME]
411
        except NameError:
412
            props = None
413
            mtime = until
414
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
415
        tstamp = max(tstamp, mtime)
416
        if until is None:
417
            modified = tstamp
418
        else:
419
            modified = self._get_statistics(
420
                node, compute=True)[2]  # Overall last modification.
421
            modified = max(modified, mtime)
422

    
423
        if user != account:
424
            meta = {'name': account}
425
        else:
426
            meta = {}
427
            if props is not None and include_user_defined:
428
                if domain is None:
429
                    raise ValueError(
430
                        'Domain argument is obligatory for getting '
431
                        'user defined metadata')
432
                meta.update(
433
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
434
            if until is not None:
435
                meta.update({'until_timestamp': tstamp})
436
            meta.update({'name': account, 'count': count, 'bytes': bytes})
437
            if self.using_external_quotaholder:
438
                external_quota = self.astakosclient.service_get_quotas(
439
                    account)[account]
440
                meta['bytes'] = sum(d['pithos.diskspace']['usage'] for d in
441
                                    external_quota.values())
442
        meta.update({'modified': modified})
443
        return meta
444

    
445
    @debug_method
446
    @backend_method
447
    def update_account_meta(self, user, account, domain, meta, replace=False):
448
        """Update the metadata associated with the account for the domain."""
449

    
450
        self._can_write_account(user, account)
451
        path, node = self._lookup_account(account, True)
452
        self._put_metadata(user, node, domain, meta, replace,
453
                           update_statistics_ancestors_depth=-1)
454

    
455
    @debug_method
456
    @backend_method
457
    def get_account_groups(self, user, account):
458
        """Return a dictionary with the user groups defined for the account."""
459

    
460
        self._can_read_account(user, account)
461
        if user != account:
462
            return {}
463
        self._lookup_account(account, True)
464
        return self.permissions.group_dict(account)
465

    
466
    @debug_method
467
    @backend_method
468
    def update_account_groups(self, user, account, groups, replace=False):
469
        """Update the groups associated with the account."""
470

    
471
        self._can_write_account(user, account)
472
        self._lookup_account(account, True)
473
        self._check_groups(groups)
474
        if replace:
475
            self.permissions.group_destroy(account)
476
        for k, v in groups.iteritems():
477
            if not replace:  # If not already deleted.
478
                self.permissions.group_delete(account, k)
479
            if v:
480
                self.permissions.group_addmany(account, k, v)
481

    
482
    @debug_method
483
    @backend_method
484
    def get_account_policy(self, user, account):
485
        """Return a dictionary with the account policy."""
486

    
487
        self._can_read_account(user, account)
488
        if user != account:
489
            return {}
490
        path, node = self._lookup_account(account, True)
491
        policy = self._get_policy(node, is_account_policy=True)
492
        if self.using_external_quotaholder:
493
            external_quota = self.astakosclient.service_get_quotas(
494
                account)[account]
495
            policy.update(dict(('%s-%s' % (QUOTA_POLICY, k),
496
                                v['pithos.diskspace']['limit']) for k, v in
497
                               external_quota.items()))
498

    
499
        return policy
500

    
501
    @debug_method
502
    @backend_method
503
    def update_account_policy(self, user, account, policy, replace=False):
504
        """Update the policy associated with the account."""
505

    
506
        self._can_write_account(user, account)
507
        path, node = self._lookup_account(account, True)
508
        self._put_policy(node, policy, replace, is_account_policy=True,
509
                         check=True)
510

    
511
    @debug_method
512
    @backend_method
513
    def put_account(self, user, account, policy=None):
514
        """Create a new account with the given name."""
515

    
516
        policy = policy or {}
517
        self._can_write_account(user, account)
518
        node = self.node.node_lookup(account)
519
        if node is not None:
520
            raise AccountExists('Account already exists')
521
        node = self._put_path(user, self.ROOTNODE, account,
522
                              update_statistics_ancestors_depth=-1)
523
        self._put_policy(node, policy, True, is_account_policy=True,
524
                         check=True if policy else False)
525

    
526
    @debug_method
527
    @backend_method
528
    def delete_account(self, user, account):
529
        """Delete the account with the given name."""
530

    
531
        self._can_write_account(user, account)
532
        node = self.node.node_lookup(account)
533
        if node is None:
534
            return
535
        if not self.node.node_remove(node,
536
                                     update_statistics_ancestors_depth=-1):
537
            raise AccountNotEmpty('Account is not empty')
538
        self.permissions.group_destroy(account)
539

    
540
        # remove all the cached allowed paths
541
        # removing the specific path could be more expensive
542
        self._reset_allowed_paths()
543

    
544
    @debug_method
545
    @backend_method
546
    @list_method
547
    def list_containers(self, user, account, marker=None, limit=10000,
548
                        shared=False, until=None, public=False):
549
        """Return a list of containers existing under an account."""
550

    
551
        self._can_read_account(user, account)
552
        if user != account:
553
            if until:
554
                raise NotAllowedError
555
            return self._allowed_containers(user, account)
556
        if shared or public:
557
            allowed = set()
558
            if shared:
559
                allowed.update([x.split('/', 2)[1] for x in
560
                               self.permissions.access_list_shared(account)])
561
            if public:
562
                allowed.update([x[0].split('/', 2)[1] for x in
563
                               self.permissions.public_list(account)])
564
            return sorted(allowed)
565
        node = self.node.node_lookup(account)
566
        return [x[0] for x in self._list_object_properties(
567
            node, account, '', '/', marker, limit, False, None, [], until)]
568

    
569
    @debug_method
570
    @backend_method
571
    def list_container_meta(self, user, account, container, domain,
572
                            until=None):
573
        """Return a list of the container's object meta keys for a domain."""
574

    
575
        self._can_read_container(user, account, container)
576
        allowed = []
577
        if user != account:
578
            if until:
579
                raise NotAllowedError
580
        path, node = self._lookup_container(account, container)
581
        before = until if until is not None else inf
582
        allowed = self._get_formatted_paths(allowed)
583
        return self.node.latest_attribute_keys(node, domain, before,
584
                                               CLUSTER_DELETED, allowed)
585

    
586
    @debug_method
587
    @backend_method
588
    def get_container_meta(self, user, account, container, domain=None,
589
                           until=None, include_user_defined=True):
590
        """Return a dictionary with the container metadata for the domain."""
591

    
592
        self._can_read_container(user, account, container)
593
        if user != account:
594
            if until:
595
                raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597
        props = self._get_properties(node, until)
598
        mtime = props[self.MTIME]
599
        count, bytes, tstamp = self._get_statistics(node, until)
600
        tstamp = max(tstamp, mtime)
601
        if until is None:
602
            modified = tstamp
603
        else:
604
            modified = self._get_statistics(
605
                node)[2]  # Overall last modification.
606
            modified = max(modified, mtime)
607

    
608
        if user != account:
609
            meta = {'name': container}
610
        else:
611
            meta = {}
612
            if include_user_defined:
613
                if domain is None:
614
                    raise ValueError(
615
                        'Domain argument is obligatory for getting '
616
                        'user defined metadata')
617
                meta.update(
618
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
619
            if until is not None:
620
                meta.update({'until_timestamp': tstamp})
621
            meta.update({'name': container, 'count': count, 'bytes': bytes})
622
        meta.update({'modified': modified})
623
        return meta
624

    
625
    @debug_method
626
    @backend_method
627
    def update_container_meta(self, user, account, container, domain, meta,
628
                              replace=False):
629
        """Update the metadata associated with the container for the domain."""
630

    
631
        self._can_write_container(user, account, container)
632
        path, node = self._lookup_container(account, container)
633
        src_version_id, dest_version_id = self._put_metadata(
634
            user, node, domain, meta, replace,
635
            update_statistics_ancestors_depth=0)
636
        if src_version_id is not None:
637
            versioning = self._get_policy(
638
                node, is_account_policy=False)[VERSIONING_POLICY]
639
            if versioning != 'auto':
640
                self.node.version_remove(src_version_id,
641
                                         update_statistics_ancestors_depth=0)
642

    
643
    @debug_method
644
    @backend_method
645
    def get_container_policy(self, user, account, container):
646
        """Return a dictionary with the container policy."""
647

    
648
        self._can_read_container(user, account, container)
649
        if user != account:
650
            return {}
651
        path, node = self._lookup_container(account, container)
652
        return self._get_policy(node, is_account_policy=False)
653

    
654
    @debug_method
655
    @backend_method
656
    def update_container_policy(self, user, account, container, policy,
657
                                replace=False):
658
        """Update the policy associated with the container."""
659

    
660
        self._can_write_container(user, account, container)
661
        path, node = self._lookup_container(account, container)
662

    
663
        if PROJECT in policy:
664
            project = self._get_project(node)
665
            try:
666
                serial = self.astakosclient.issue_resource_reassignment(
667
                    holder=account,
668
                    from_source=project,
669
                    to_source=policy[PROJECT],
670
                    provisions={'pithos.diskspace': self.get_container_meta(
671
                        user, account, container,
672
                        include_user_defined=False)['bytes']})
673
            except BaseException, e:
674
                raise QuotaError(e)
675
            else:
676
                self.serials.append(serial)
677

    
678
        self._put_policy(node, policy, replace, is_account_policy=False,
679
                         default_project=account, check=True)
680

    
681
    @debug_method
682
    @backend_method
683
    def put_container(self, user, account, container, policy=None):
684
        """Create a new container with the given name."""
685

    
686
        policy = policy or {}
687
        self._can_write_container(user, account, container)
688
        try:
689
            path, node = self._lookup_container(account, container)
690
        except NameError:
691
            pass
692
        else:
693
            raise ContainerExists('Container already exists')
694
        path = '/'.join((account, container))
695
        node = self._put_path(
696
            user, self._lookup_account(account, True)[1], path,
697
            update_statistics_ancestors_depth=-1)
698
        self._put_policy(node, policy, True, is_account_policy=False,
699
                         default_project=account,
700
                         check=True if policy else False)
701

    
702
    @debug_method
703
    @backend_method
704
    def delete_container(self, user, account, container, until=None, prefix='',
705
                         delimiter=None):
706
        """Delete/purge the container with the given name."""
707

    
708
        self._can_write_container(user, account, container)
709
        path, node = self._lookup_container(account, container)
710
        project = self._get_project(node)
711

    
712
        if until is not None:
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, until, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            if not self.free_versioning:
721
                self._report_size_change(
722
                    user, account, -size, project, {
723
                        'action': 'container purge',
724
                        'path': path,
725
                        'versions': ','.join(str(i) for i in serials)
726
                    }
727
                )
728
            return
729

    
730
        if not delimiter:
731
            if self._get_statistics(node)[0] > 0:
732
                raise ContainerNotEmpty('Container is not empty')
733
            hashes, size, serials = self.node.node_purge_children(
734
                node, inf, CLUSTER_HISTORY,
735
                update_statistics_ancestors_depth=0)
736
            for h in hashes:
737
                self.store.map_delete(h)
738
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
739
                                          update_statistics_ancestors_depth=0)
740
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
741
            if not self.free_versioning:
742
                self._report_size_change(
743
                    user, account, -size, project, {
744
                        'action': 'container purge',
745
                        'path': path,
746
                        'versions': ','.join(str(i) for i in serials)
747
                    }
748
                )
749
        else:
750
            # remove only contents
751
            src_names = self._list_objects_no_limit(
752
                user, account, container, prefix='', delimiter=None,
753
                virtual=False, domain=None, keys=[], shared=False, until=None,
754
                size_range=None, all_props=True, public=False)
755
            paths = []
756
            for t in src_names:
757
                path = '/'.join((account, container, t[0]))
758
                node = t[2]
759
                if not self._exists(node):
760
                    continue
761
                src_version_id, dest_version_id = self._put_version_duplicate(
762
                    user, node, size=0, type='', hash=None, checksum='',
763
                    cluster=CLUSTER_DELETED,
764
                    update_statistics_ancestors_depth=1)
765
                del_size = self._apply_versioning(
766
                    account, container, src_version_id,
767
                    update_statistics_ancestors_depth=1)
768
                self._report_size_change(
769
                    user, account, -del_size, project, {
770
                        'action': 'object delete',
771
                        'path': path,
772
                        'versions': ','.join([str(dest_version_id)])})
773
                self._report_object_change(
774
                    user, account, path, details={'action': 'object delete'})
775
                paths.append(path)
776
            self.permissions.access_clear_bulk(paths)
777

    
778
        # remove all the cached allowed paths
779
        # removing the specific path could be more expensive
780
        self._reset_allowed_paths()
781

    
782
    def _list_objects(self, user, account, container, prefix, delimiter,
783
                      marker, limit, virtual, domain, keys, shared, until,
784
                      size_range, all_props, public):
785
        if user != account and until:
786
            raise NotAllowedError
787

    
788
        objects = set()
789
        if shared and public:
790
            # get shared first
791
            shared_paths = self._list_object_permissions(
792
                user, account, container, prefix, shared=True, public=False)
793
            if shared_paths:
794
                path, node = self._lookup_container(account, container)
795
                shared_paths = self._get_formatted_paths(shared_paths)
796
                objects = set(self._list_object_properties(
797
                    node, path, prefix, delimiter, marker, limit, virtual,
798
                    domain, keys, until, size_range, shared_paths, all_props))
799

    
800
            # get public
801
            objects |= set(self._list_public_object_properties(
802
                user, account, container, prefix, all_props))
803
            objects = list(objects)
804

    
805
            objects.sort(key=lambda x: x[0])
806
        elif public:
807
            objects = self._list_public_object_properties(
808
                user, account, container, prefix, all_props)
809
        else:
810
            allowed = self._list_object_permissions(
811
                user, account, container, prefix, shared, public=False)
812
            if shared and not allowed:
813
                return []
814
            path, node = self._lookup_container(account, container)
815
            allowed = self._get_formatted_paths(allowed)
816
            objects = self._list_object_properties(
817
                node, path, prefix, delimiter, marker, limit, virtual, domain,
818
                keys, until, size_range, allowed, all_props)
819

    
820
        # apply limits
821
        start, limit = self._list_limits(objects, marker, limit)
822
        return objects[start:start + limit]
823

    
824
    def _list_public_object_properties(self, user, account, container, prefix,
825
                                       all_props):
826
        public = self._list_object_permissions(
827
            user, account, container, prefix, shared=False, public=True)
828
        paths, nodes = self._lookup_objects(public)
829
        path = '/'.join((account, container))
830
        cont_prefix = path + '/'
831
        paths = [x[len(cont_prefix):] for x in paths]
832
        objects = [(p,) + props for p, props in
833
                   zip(paths, self.node.version_lookup_bulk(
834
                       nodes, all_props=all_props, order_by_path=True))]
835
        return objects
836

    
837
    def _list_objects_no_limit(self, user, account, container, prefix,
838
                               delimiter, virtual, domain, keys, shared, until,
839
                               size_range, all_props, public):
840
        objects = []
841
        while True:
842
            marker = objects[-1] if objects else None
843
            limit = 10000
844
            l = self._list_objects(
845
                user, account, container, prefix, delimiter, marker, limit,
846
                virtual, domain, keys, shared, until, size_range, all_props,
847
                public)
848
            objects.extend(l)
849
            if not l or len(l) < limit:
850
                break
851
        return objects
852

    
853
    def _list_object_permissions(self, user, account, container, prefix,
854
                                 shared, public):
855
        allowed = []
856
        path = '/'.join((account, container, prefix)).rstrip('/')
857
        if user != account:
858
            allowed = self.permissions.access_list_paths(user, path)
859
            if not allowed:
860
                raise NotAllowedError
861
        else:
862
            allowed = set()
863
            if shared:
864
                allowed.update(self.permissions.access_list_shared(path))
865
            if public:
866
                allowed.update(
867
                    [x[0] for x in self.permissions.public_list(path)])
868
            allowed = sorted(allowed)
869
            if not allowed:
870
                return []
871
        return allowed
872

    
873
    @debug_method
874
    @backend_method
875
    def list_objects(self, user, account, container, prefix='', delimiter=None,
876
                     marker=None, limit=10000, virtual=True, domain=None,
877
                     keys=None, shared=False, until=None, size_range=None,
878
                     public=False):
879
        """List (object name, object version_id) under a container."""
880

    
881
        keys = keys or []
882
        return self._list_objects(
883
            user, account, container, prefix, delimiter, marker, limit,
884
            virtual, domain, keys, shared, until, size_range, False, public)
885

    
886
    @debug_method
887
    @backend_method
888
    def list_object_meta(self, user, account, container, prefix='',
889
                         delimiter=None, marker=None, limit=10000,
890
                         virtual=True, domain=None, keys=None, shared=False,
891
                         until=None, size_range=None, public=False):
892
        """Return a list of metadata dicts of objects under a container."""
893

    
894
        keys = keys or []
895
        props = self._list_objects(
896
            user, account, container, prefix, delimiter, marker, limit,
897
            virtual, domain, keys, shared, until, size_range, True, public)
898
        objects = []
899
        for p in props:
900
            if len(p) == 2:
901
                objects.append({'subdir': p[0]})
902
            else:
903
                objects.append({
904
                    'name': p[0],
905
                    'bytes': p[self.SIZE + 1],
906
                    'type': p[self.TYPE + 1],
907
                    'hash': p[self.HASH + 1],
908
                    'version': p[self.SERIAL + 1],
909
                    'version_timestamp': p[self.MTIME + 1],
910
                    'modified': p[self.MTIME + 1] if until is None else None,
911
                    'modified_by': p[self.MUSER + 1],
912
                    'uuid': p[self.UUID + 1],
913
                    'checksum': p[self.CHECKSUM + 1]})
914
        return objects
915

    
916
    @debug_method
917
    @backend_method
918
    def list_object_permissions(self, user, account, container, prefix=''):
919
        """Return a list of paths enforce permissions under a container."""
920

    
921
        return self._list_object_permissions(user, account, container, prefix,
922
                                             True, False)
923

    
924
    @debug_method
925
    @backend_method
926
    def list_object_public(self, user, account, container, prefix=''):
927
        """Return a mapping of object paths to public ids under a container."""
928

    
929
        public = {}
930
        for path, p in self.permissions.public_list('/'.join((account,
931
                                                              container,
932
                                                              prefix))):
933
            public[path] = p
934
        return public
935

    
936
    @debug_method
937
    @backend_method
938
    def get_object_meta(self, user, account, container, name, domain=None,
939
                        version=None, include_user_defined=True):
940
        """Return a dictionary with the object metadata for the domain."""
941

    
942
        self._can_read_object(user, account, container, name)
943
        path, node = self._lookup_object(account, container, name)
944
        props = self._get_version(node, version)
945
        if version is None:
946
            modified = props[self.MTIME]
947
        else:
948
            try:
949
                modified = self._get_version(
950
                    node)[self.MTIME]  # Overall last modification.
951
            except NameError:  # Object may be deleted.
952
                del_props = self.node.version_lookup(
953
                    node, inf, CLUSTER_DELETED)
954
                if del_props is None:
955
                    raise ItemNotExists('Object does not exist')
956
                modified = del_props[self.MTIME]
957

    
958
        meta = {}
959
        if include_user_defined:
960
            if domain is None:
961
                raise ValueError(
962
                    'Domain argument is obligatory for getting '
963
                    'user defined metadata')
964
            meta.update(
965
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
966
        meta.update({'name': name,
967
                     'bytes': props[self.SIZE],
968
                     'type': props[self.TYPE],
969
                     'hash': props[self.HASH],
970
                     'version': props[self.SERIAL],
971
                     'version_timestamp': props[self.MTIME],
972
                     'modified': modified,
973
                     'modified_by': props[self.MUSER],
974
                     'uuid': props[self.UUID],
975
                     'checksum': props[self.CHECKSUM]})
976
        return meta
977

    
978
    @debug_method
979
    @backend_method
980
    def update_object_meta(self, user, account, container, name, domain, meta,
981
                           replace=False):
982
        """Update object metadata for a domain and return the new version."""
983

    
984
        self._can_write_object(user, account, container, name)
985

    
986
        path, node = self._lookup_object(account, container, name,
987
                                         lock_container=True)
988
        src_version_id, dest_version_id = self._put_metadata(
989
            user, node, domain, meta, replace,
990
            update_statistics_ancestors_depth=1)
991
        self._apply_versioning(account, container, src_version_id,
992
                               update_statistics_ancestors_depth=1)
993
        return dest_version_id
994

    
995
    @debug_method
996
    @backend_method
997
    def get_object_permissions_bulk(self, user, account, container, names):
998
        """Return the action allowed on the object, the path
999
        from which the object gets its permissions from,
1000
        along with a dictionary containing the permissions."""
1001

    
1002
        permissions_path = self._get_permissions_path_bulk(account, container,
1003
                                                           names)
1004
        access_objects = self.permissions.access_check_bulk(permissions_path,
1005
                                                            user)
1006
        #group_parents = access_objects['group_parents']
1007
        nobject_permissions = {}
1008
        cpath = '/'.join((account, container, ''))
1009
        cpath_idx = len(cpath)
1010
        for path in permissions_path:
1011
            allowed = 1
1012
            name = path[cpath_idx:]
1013
            if user != account:
1014
                try:
1015
                    allowed = access_objects[path]
1016
                except KeyError:
1017
                    raise NotAllowedError
1018
            access_dict, allowed = \
1019
                self.permissions.access_get_for_bulk(access_objects[path])
1020
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1021
                                         access_dict)
1022
        self._lookup_objects(permissions_path)
1023
        return nobject_permissions
1024

    
1025
    @debug_method
1026
    @backend_method
1027
    def get_object_permissions(self, user, account, container, name):
1028
        """Return the action allowed on the object, the path
1029
        from which the object gets its permissions from,
1030
        along with a dictionary containing the permissions."""
1031

    
1032
        allowed = 'write'
1033
        permissions_path = self._get_permissions_path(account, container, name)
1034
        if user != account:
1035
            if self.permissions.access_check(permissions_path, self.WRITE,
1036
                                             user):
1037
                allowed = 'write'
1038
            elif self.permissions.access_check(permissions_path, self.READ,
1039
                                               user):
1040
                allowed = 'read'
1041
            else:
1042
                raise NotAllowedError
1043
        self._lookup_object(account, container, name)
1044
        return (allowed,
1045
                permissions_path,
1046
                self.permissions.access_get(permissions_path))
1047

    
1048
    @debug_method
1049
    @backend_method
1050
    def update_object_permissions(self, user, account, container, name,
1051
                                  permissions):
1052
        """Update the permissions associated with the object."""
1053

    
1054
        if user != account:
1055
            raise NotAllowedError
1056
        path = self._lookup_object(account, container, name,
1057
                                   lock_container=True)[0]
1058
        self._check_permissions(path, permissions)
1059
        try:
1060
            self.permissions.access_set(path, permissions)
1061
        except:
1062
            raise ValueError
1063
        else:
1064
            self._report_sharing_change(user, account, path, {'members':
1065
                                        self.permissions.access_members(path)})
1066

    
1067
        # remove all the cached allowed paths
1068
        # filtering out only those affected could be more expensive
1069
        self._reset_allowed_paths()
1070

    
1071
    @debug_method
1072
    @backend_method
1073
    def get_object_public(self, user, account, container, name):
1074
        """Return the public id of the object if applicable."""
1075

    
1076
        self._can_read_object(user, account, container, name)
1077
        path = self._lookup_object(account, container, name)[0]
1078
        p = self.permissions.public_get(path)
1079
        return p
1080

    
1081
    @debug_method
1082
    @backend_method
1083
    def update_object_public(self, user, account, container, name, public):
1084
        """Update the public status of the object."""
1085

    
1086
        self._can_write_object(user, account, container, name)
1087
        path = self._lookup_object(account, container, name,
1088
                                   lock_container=True)[0]
1089
        if not public:
1090
            self.permissions.public_unset(path)
1091
        else:
1092
            self.permissions.public_set(
1093
                path, self.public_url_security, self.public_url_alphabet)
1094

    
1095
    @debug_method
1096
    @backend_method
1097
    def get_object_hashmap(self, user, account, container, name, version=None):
1098
        """Return the object's size and a list with partial hashes."""
1099

    
1100
        self._can_read_object(user, account, container, name)
1101
        path, node = self._lookup_object(account, container, name)
1102
        props = self._get_version(node, version)
1103
        if props[self.HASH] is None:
1104
            return 0, ()
1105
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1106
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1107

    
1108
    def _update_object_hash(self, user, account, container, name, size, type,
1109
                            hash, checksum, domain, meta, replace_meta,
1110
                            permissions, src_node=None, src_version_id=None,
1111
                            is_copy=False, report_size_change=True):
1112
        if permissions is not None and user != account:
1113
            raise NotAllowedError
1114
        self._can_write_object(user, account, container, name)
1115
        if permissions is not None:
1116
            path = '/'.join((account, container, name))
1117
            self._check_permissions(path, permissions)
1118

    
1119
        account_path, account_node = self._lookup_account(account, True)
1120
        container_path, container_node = self._lookup_container(
1121
            account, container)
1122
        project = self._get_project(container_node)
1123

    
1124
        path, node = self._put_object_node(
1125
            container_path, container_node, name)
1126
        pre_version_id, dest_version_id = self._put_version_duplicate(
1127
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1128
            checksum=checksum, is_copy=is_copy,
1129
            update_statistics_ancestors_depth=1)
1130

    
1131
        # Handle meta.
1132
        if src_version_id is None:
1133
            src_version_id = pre_version_id
1134
        self._put_metadata_duplicate(
1135
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1136

    
1137
        del_size = self._apply_versioning(account, container, pre_version_id,
1138
                                          update_statistics_ancestors_depth=1)
1139
        size_delta = size - del_size
1140
        if size_delta > 0:
1141
            # Check account quota.
1142
            if not self.using_external_quotaholder:
1143
                account_quota = long(self._get_policy(
1144
                    account_node, is_account_policy=True)[QUOTA_POLICY])
1145
                account_usage = self._get_statistics(account_node,
1146
                                                     compute=True)[1]
1147
                if (account_quota > 0 and account_usage > account_quota):
1148
                    raise QuotaError(
1149
                        'Account quota exceeded: limit: %s, usage: %s' % (
1150
                            account_quota, account_usage))
1151

    
1152
            # Check container quota.
1153
            container_quota = long(self._get_policy(
1154
                container_node, is_account_policy=False)[QUOTA_POLICY])
1155
            container_usage = self._get_statistics(container_node)[1]
1156
            if (container_quota > 0 and container_usage > container_quota):
1157
                # This must be executed in a transaction, so the version is
1158
                # never created if it fails.
1159
                raise QuotaError(
1160
                    'Container quota exceeded: limit: %s, usage: %s' % (
1161
                        container_quota, container_usage
1162
                    )
1163
                )
1164

    
1165
        if report_size_change:
1166
            self._report_size_change(
1167
                user, account, size_delta, project,
1168
                {'action': 'object update', 'path': path,
1169
                 'versions': ','.join([str(dest_version_id)])})
1170
        if permissions is not None:
1171
            self.permissions.access_set(path, permissions)
1172
            self._report_sharing_change(
1173
                user, account, path,
1174
                {'members': self.permissions.access_members(path)})
1175

    
1176
        self._report_object_change(
1177
            user, account, path,
1178
            details={'version': dest_version_id, 'action': 'object update'})
1179
        return dest_version_id
1180

    
1181
    @debug_method
1182
    def update_object_hashmap(self, user, account, container, name, size, type,
1183
                              hashmap, checksum, domain, meta=None,
1184
                              replace_meta=False, permissions=None):
1185
        """Create/update an object's hashmap and return the new version."""
1186

    
1187
        meta = meta or {}
1188
        if size == 0:  # No such thing as an empty hashmap.
1189
            hashmap = [self.put_block('')]
1190
        map = HashMap(self.block_size, self.hash_algorithm)
1191
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1192
        missing = self.store.block_search(map)
1193
        if missing:
1194
            ie = IndexError()
1195
            ie.data = [binascii.hexlify(x) for x in missing]
1196
            raise ie
1197

    
1198
        hash = map.hash()
1199
        hexlified = binascii.hexlify(hash)
1200
        # _update_object_hash() locks destination path
1201
        dest_version_id = self._update_object_hash(
1202
            user, account, container, name, size, type, hexlified, checksum,
1203
            domain, meta, replace_meta, permissions)
1204
        self.store.map_put(hash, map)
1205
        return dest_version_id, hexlified
1206

    
1207
    @debug_method
1208
    @backend_method
1209
    def update_object_checksum(self, user, account, container, name, version,
1210
                               checksum):
1211
        """Update an object's checksum."""
1212

    
1213
        # Update objects with greater version and same hashmap
1214
        # and size (fix metadata updates).
1215
        self._can_write_object(user, account, container, name)
1216
        path, node = self._lookup_object(account, container, name,
1217
                                         lock_container=True)
1218
        props = self._get_version(node, version)
1219
        versions = self.node.node_get_versions(node)
1220
        for x in versions:
1221
            if (x[self.SERIAL] >= int(version) and
1222
                x[self.HASH] == props[self.HASH] and
1223
                    x[self.SIZE] == props[self.SIZE]):
1224
                self.node.version_put_property(
1225
                    x[self.SERIAL], 'checksum', checksum)
1226

    
1227
    def _copy_object(self, user, src_account, src_container, src_name,
1228
                     dest_account, dest_container, dest_name, type,
1229
                     dest_domain=None, dest_meta=None, replace_meta=False,
1230
                     permissions=None, src_version=None, is_move=False,
1231
                     delimiter=None):
1232

    
1233
        report_size_change = not is_move
1234
        dest_meta = dest_meta or {}
1235
        dest_version_ids = []
1236
        self._can_read_object(user, src_account, src_container, src_name)
1237

    
1238
        src_container_path = '/'.join((src_account, src_container))
1239
        dest_container_path = '/'.join((dest_account, dest_container))
1240
        # Lock container paths in alphabetical order
1241
        if src_container_path < dest_container_path:
1242
            self._lookup_container(src_account, src_container)
1243
            self._lookup_container(dest_account, dest_container)
1244
        else:
1245
            self._lookup_container(dest_account, dest_container)
1246
            self._lookup_container(src_account, src_container)
1247

    
1248
        path, node = self._lookup_object(src_account, src_container, src_name)
1249
        # TODO: Will do another fetch of the properties in duplicate version...
1250
        props = self._get_version(
1251
            node, src_version)  # Check to see if source exists.
1252
        src_version_id = props[self.SERIAL]
1253
        hash = props[self.HASH]
1254
        size = props[self.SIZE]
1255
        is_copy = not is_move and (src_account, src_container, src_name) != (
1256
            dest_account, dest_container, dest_name)  # New uuid.
1257
        dest_version_ids.append(self._update_object_hash(
1258
            user, dest_account, dest_container, dest_name, size, type, hash,
1259
            None, dest_domain, dest_meta, replace_meta, permissions,
1260
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1261
            report_size_change=report_size_change))
1262
        if is_move and ((src_account, src_container, src_name) !=
1263
                        (dest_account, dest_container, dest_name)):
1264
            self._delete_object(user, src_account, src_container, src_name,
1265
                                report_size_change=report_size_change)
1266

    
1267
        if delimiter:
1268
            prefix = (src_name + delimiter if not
1269
                      src_name.endswith(delimiter) else src_name)
1270
            src_names = self._list_objects_no_limit(
1271
                user, src_account, src_container, prefix, delimiter=None,
1272
                virtual=False, domain=None, keys=[], shared=False, until=None,
1273
                size_range=None, all_props=True, public=False)
1274
            src_names.sort(key=lambda x: x[2])  # order by nodes
1275
            paths = [elem[0] for elem in src_names]
1276
            nodes = [elem[2] for elem in src_names]
1277
            # TODO: Will do another fetch of the properties
1278
            # in duplicate version...
1279
            props = self._get_versions(nodes)  # Check to see if source exists.
1280

    
1281
            for prop, path, node in zip(props, paths, nodes):
1282
                src_version_id = prop[self.SERIAL]
1283
                hash = prop[self.HASH]
1284
                vtype = prop[self.TYPE]
1285
                size = prop[self.SIZE]
1286
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1287
                    delimiter) else dest_name
1288
                vdest_name = path.replace(prefix, dest_prefix, 1)
1289
                # _update_object_hash() locks destination path
1290
                dest_version_ids.append(self._update_object_hash(
1291
                    user, dest_account, dest_container, vdest_name, size,
1292
                    vtype, hash, None, dest_domain, meta={},
1293
                    replace_meta=False, permissions=None, src_node=node,
1294
                    src_version_id=src_version_id, is_copy=is_copy,
1295
                    report_size_change=report_size_change))
1296
                if is_move and ((src_account, src_container, src_name) !=
1297
                                (dest_account, dest_container, dest_name)):
1298
                    self._delete_object(user, src_account, src_container, path,
1299
                                        report_size_change=report_size_change)
1300
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1301
                dest_version_ids)
1302

    
1303
    @debug_method
1304
    @backend_method
1305
    def copy_object(self, user, src_account, src_container, src_name,
1306
                    dest_account, dest_container, dest_name, type, domain,
1307
                    meta=None, replace_meta=False, permissions=None,
1308
                    src_version=None, delimiter=None):
1309
        """Copy an object's data and metadata."""
1310

    
1311
        meta = meta or {}
1312
        dest_version_id = self._copy_object(
1313
            user, src_account, src_container, src_name, dest_account,
1314
            dest_container, dest_name, type, domain, meta, replace_meta,
1315
            permissions, src_version, False, delimiter)
1316
        return dest_version_id
1317

    
1318
    @debug_method
1319
    @backend_method
1320
    def move_object(self, user, src_account, src_container, src_name,
1321
                    dest_account, dest_container, dest_name, type, domain,
1322
                    meta=None, replace_meta=False, permissions=None,
1323
                    delimiter=None):
1324
        """Move an object's data and metadata."""
1325

    
1326
        meta = meta or {}
1327
        if user != src_account:
1328
            raise NotAllowedError
1329
        dest_version_id = self._move_object(
1330
            user, src_account, src_container, src_name, dest_account,
1331
            dest_container, dest_name, type, domain, meta, replace_meta,
1332
            permissions, None, delimiter=delimiter)
1333
        return dest_version_id
1334

    
1335
    def _delete_object(self, user, account, container, name, until=None,
1336
                       delimiter=None, report_size_change=True):
1337
        if user != account:
1338
            raise NotAllowedError
1339

    
1340
        # lock container path
1341
        container_path, container_node = self._lookup_container(account,
1342
                                                                container)
1343
        project = self._get_project(container_node)
1344
        path, node = self._lookup_object(account, container, name)
1345

    
1346
        if until is not None:
1347
            if node is None:
1348
                return
1349
            hashes = []
1350
            size = 0
1351
            serials = []
1352
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1353
                                           update_statistics_ancestors_depth=1)
1354
            hashes += h
1355
            size += s
1356
            serials += v
1357
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1358
                                           update_statistics_ancestors_depth=1)
1359
            hashes += h
1360
            if not self.free_versioning:
1361
                size += s
1362
            serials += v
1363
            for h in hashes:
1364
                self.store.map_delete(h)
1365
            self.node.node_purge(node, until, CLUSTER_DELETED,
1366
                                 update_statistics_ancestors_depth=1)
1367
            try:
1368
                self._get_version(node)
1369
            except NameError:
1370
                self.permissions.access_clear(path)
1371
            self._report_size_change(
1372
                user, account, -size, project, {
1373
                    'action': 'object purge',
1374
                    'path': path,
1375
                    'versions': ','.join(str(i) for i in serials)
1376
                }
1377
            )
1378
            return
1379

    
1380
        if not self._exists(node):
1381
            raise ItemNotExists('Object is deleted.')
1382

    
1383
        src_version_id, dest_version_id = self._put_version_duplicate(
1384
            user, node, size=0, type='', hash=None, checksum='',
1385
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1386
        del_size = self._apply_versioning(account, container, src_version_id,
1387
                                          update_statistics_ancestors_depth=1)
1388
        if report_size_change:
1389
            self._report_size_change(
1390
                user, account, -del_size, project,
1391
                {'action': 'object delete',
1392
                 'path': path,
1393
                 'versions': ','.join([str(dest_version_id)])})
1394
        self._report_object_change(
1395
            user, account, path, details={'action': 'object delete'})
1396
        self.permissions.access_clear(path)
1397

    
1398
        if delimiter:
1399
            prefix = name + delimiter if not name.endswith(delimiter) else name
1400
            src_names = self._list_objects_no_limit(
1401
                user, account, container, prefix, delimiter=None,
1402
                virtual=False, domain=None, keys=[], shared=False, until=None,
1403
                size_range=None, all_props=True, public=False)
1404
            paths = []
1405
            for t in src_names:
1406
                path = '/'.join((account, container, t[0]))
1407
                node = t[2]
1408
                if not self._exists(node):
1409
                    continue
1410
                src_version_id, dest_version_id = self._put_version_duplicate(
1411
                    user, node, size=0, type='', hash=None, checksum='',
1412
                    cluster=CLUSTER_DELETED,
1413
                    update_statistics_ancestors_depth=1)
1414
                del_size = self._apply_versioning(
1415
                    account, container, src_version_id,
1416
                    update_statistics_ancestors_depth=1)
1417
                if report_size_change:
1418
                    self._report_size_change(
1419
                        user, account, -del_size, project,
1420
                        {'action': 'object delete',
1421
                         'path': path,
1422
                         'versions': ','.join([str(dest_version_id)])})
1423
                self._report_object_change(
1424
                    user, account, path, details={'action': 'object delete'})
1425
                paths.append(path)
1426
            self.permissions.access_clear_bulk(paths)
1427

    
1428
        # remove all the cached allowed paths
1429
        # removing the specific path could be more expensive
1430
        self._reset_allowed_paths()
1431

    
1432
    @debug_method
1433
    @backend_method
1434
    def delete_object(self, user, account, container, name, until=None,
1435
                      prefix='', delimiter=None):
1436
        """Delete/purge an object."""
1437

    
1438
        self._delete_object(user, account, container, name, until, delimiter)
1439

    
1440
    @debug_method
1441
    @backend_method
1442
    def list_versions(self, user, account, container, name):
1443
        """Return a list of all object (version, version_timestamp) tuples."""
1444

    
1445
        self._can_read_object(user, account, container, name)
1446
        path, node = self._lookup_object(account, container, name)
1447
        versions = self.node.node_get_versions(node)
1448
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1449
                x[self.CLUSTER] != CLUSTER_DELETED]
1450

    
1451
    @debug_method
1452
    @backend_method
1453
    def get_uuid(self, user, uuid, check_permissions=True):
1454
        """Return the (account, container, name) for the UUID given."""
1455

    
1456
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1457
        if info is None:
1458
            raise NameError
1459
        path, serial = info
1460
        account, container, name = path.split('/', 2)
1461
        if check_permissions:
1462
            self._can_read_object(user, account, container, name)
1463
        return (account, container, name)
1464

    
1465
    @debug_method
1466
    @backend_method
1467
    def get_public(self, user, public):
1468
        """Return the (account, container, name) for the public id given."""
1469

    
1470
        path = self.permissions.public_path(public)
1471
        if path is None:
1472
            raise NameError
1473
        account, container, name = path.split('/', 2)
1474
        self._can_read_object(user, account, container, name)
1475
        return (account, container, name)
1476

    
1477
    def get_block(self, hash):
1478
        """Return a block's data."""
1479

    
1480
        logger.debug("get_block: %s", hash)
1481
        block = self.store.block_get(self._unhexlify_hash(hash))
1482
        if not block:
1483
            raise ItemNotExists('Block does not exist')
1484
        return block
1485

    
1486
    def put_block(self, data):
1487
        """Store a block and return the hash."""
1488

    
1489
        logger.debug("put_block: %s", len(data))
1490
        return binascii.hexlify(self.store.block_put(data))
1491

    
1492
    def update_block(self, hash, data, offset=0):
1493
        """Update a known block and return the hash."""
1494

    
1495
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1496
        if offset == 0 and len(data) == self.block_size:
1497
            return self.put_block(data)
1498
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1499
        return binascii.hexlify(h)
1500

    
1501
    # Path functions.
1502

    
1503
    def _generate_uuid(self):
1504
        return str(uuidlib.uuid4())
1505

    
1506
    def _put_object_node(self, path, parent, name):
1507
        path = '/'.join((path, name))
1508
        node = self.node.node_lookup(path)
1509
        if node is None:
1510
            node = self.node.node_create(parent, path)
1511
        return path, node
1512

    
1513
    def _put_path(self, user, parent, path,
1514
                  update_statistics_ancestors_depth=None):
1515
        node = self.node.node_create(parent, path)
1516
        self.node.version_create(node, None, 0, '', None, user,
1517
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1518
                                 update_statistics_ancestors_depth)
1519
        return node
1520

    
1521
    def _lookup_account(self, account, create=True):
1522
        node = self.node.node_lookup(account)
1523
        if node is None and create:
1524
            node = self._put_path(
1525
                account, self.ROOTNODE, account,
1526
                update_statistics_ancestors_depth=-1)  # User is account.
1527
        return account, node
1528

    
1529
    def _lookup_container(self, account, container):
1530
        for_update = True if self.lock_container_path else False
1531
        path = '/'.join((account, container))
1532
        node = self.node.node_lookup(path, for_update)
1533
        if node is None:
1534
            raise ItemNotExists('Container does not exist')
1535
        return path, node
1536

    
1537
    def _lookup_object(self, account, container, name, lock_container=False):
1538
        if lock_container:
1539
            self._lookup_container(account, container)
1540

    
1541
        path = '/'.join((account, container, name))
1542
        node = self.node.node_lookup(path)
1543
        if node is None:
1544
            raise ItemNotExists('Object does not exist')
1545
        return path, node
1546

    
1547
    def _lookup_objects(self, paths):
1548
        nodes = self.node.node_lookup_bulk(paths)
1549
        return paths, nodes
1550

    
1551
    def _get_properties(self, node, until=None):
1552
        """Return properties until the timestamp given."""
1553

    
1554
        before = until if until is not None else inf
1555
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1556
        if props is None and until is not None:
1557
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1558
        if props is None:
1559
            raise ItemNotExists('Path does not exist')
1560
        return props
1561

    
1562
    def _get_statistics(self, node, until=None, compute=False):
1563
        """Return (count, sum of size, timestamp) of everything under node."""
1564

    
1565
        if until is not None:
1566
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1567
        elif compute:
1568
            stats = self.node.statistics_latest(node,
1569
                                                except_cluster=CLUSTER_DELETED)
1570
        else:
1571
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1572
        if stats is None:
1573
            stats = (0, 0, 0)
1574
        return stats
1575

    
1576
    def _get_version(self, node, version=None):
1577
        if version is None:
1578
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1579
            if props is None:
1580
                raise ItemNotExists('Object does not exist')
1581
        else:
1582
            try:
1583
                version = int(version)
1584
            except ValueError:
1585
                raise VersionNotExists('Version does not exist')
1586
            props = self.node.version_get_properties(version, node=node)
1587
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1588
                raise VersionNotExists('Version does not exist')
1589
        return props
1590

    
1591
    def _get_versions(self, nodes):
1592
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1593

    
1594
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1595
                               type=None, hash=None, checksum=None,
1596
                               cluster=CLUSTER_NORMAL, is_copy=False,
1597
                               update_statistics_ancestors_depth=None):
1598
        """Create a new version of the node."""
1599

    
1600
        props = self.node.version_lookup(
1601
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1602
        if props is not None:
1603
            src_version_id = props[self.SERIAL]
1604
            src_hash = props[self.HASH]
1605
            src_size = props[self.SIZE]
1606
            src_type = props[self.TYPE]
1607
            src_checksum = props[self.CHECKSUM]
1608
        else:
1609
            src_version_id = None
1610
            src_hash = None
1611
            src_size = 0
1612
            src_type = ''
1613
            src_checksum = ''
1614
        if size is None:  # Set metadata.
1615
            hash = src_hash  # This way hash can be set to None
1616
                             # (account or container).
1617
            size = src_size
1618
        if type is None:
1619
            type = src_type
1620
        if checksum is None:
1621
            checksum = src_checksum
1622
        uuid = self._generate_uuid(
1623
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1624

    
1625
        if src_node is None:
1626
            pre_version_id = src_version_id
1627
        else:
1628
            pre_version_id = None
1629
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1630
            if props is not None:
1631
                pre_version_id = props[self.SERIAL]
1632
        if pre_version_id is not None:
1633
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1634
                                        update_statistics_ancestors_depth)
1635

    
1636
        dest_version_id, mtime = self.node.version_create(
1637
            node, hash, size, type, src_version_id, user, uuid, checksum,
1638
            cluster, update_statistics_ancestors_depth)
1639

    
1640
        self.node.attribute_unset_is_latest(node, dest_version_id)
1641

    
1642
        return pre_version_id, dest_version_id
1643

    
1644
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1645
                                node, meta, replace=False):
1646
        if src_version_id is not None:
1647
            self.node.attribute_copy(src_version_id, dest_version_id)
1648
        if not replace:
1649
            self.node.attribute_del(dest_version_id, domain, (
1650
                k for k, v in meta.iteritems() if v == ''))
1651
            self.node.attribute_set(dest_version_id, domain, node, (
1652
                (k, v) for k, v in meta.iteritems() if v != ''))
1653
        else:
1654
            self.node.attribute_del(dest_version_id, domain)
1655
            self.node.attribute_set(dest_version_id, domain, node, ((
1656
                k, v) for k, v in meta.iteritems()))
1657

    
1658
    def _put_metadata(self, user, node, domain, meta, replace=False,
1659
                      update_statistics_ancestors_depth=None):
1660
        """Create a new version and store metadata."""
1661

    
1662
        src_version_id, dest_version_id = self._put_version_duplicate(
1663
            user, node,
1664
            update_statistics_ancestors_depth=
1665
            update_statistics_ancestors_depth)
1666
        self._put_metadata_duplicate(
1667
            src_version_id, dest_version_id, domain, node, meta, replace)
1668
        return src_version_id, dest_version_id
1669

    
1670
    def _list_limits(self, listing, marker, limit):
1671
        start = 0
1672
        if marker:
1673
            try:
1674
                start = listing.index(marker) + 1
1675
            except ValueError:
1676
                pass
1677
        if not limit or limit > 10000:
1678
            limit = 10000
1679
        return start, limit
1680

    
1681
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1682
                                marker=None, limit=10000, virtual=True,
1683
                                domain=None, keys=None, until=None,
1684
                                size_range=None, allowed=None,
1685
                                all_props=False):
1686
        keys = keys or []
1687
        allowed = allowed or []
1688
        cont_prefix = path + '/'
1689
        prefix = cont_prefix + prefix
1690
        start = cont_prefix + marker if marker else None
1691
        before = until if until is not None else inf
1692
        filterq = keys if domain else []
1693
        sizeq = size_range
1694

    
1695
        objects, prefixes = self.node.latest_version_list(
1696
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1697
            allowed, domain, filterq, sizeq, all_props)
1698
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1699
        objects.sort(key=lambda x: x[0])
1700
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1701
        return objects
1702

    
1703
    # Reporting functions.
1704

    
1705
    @debug_method
1706
    @backend_method
1707
    def _report_size_change(self, user, account, size, source, details=None):
1708
        details = details or {}
1709

    
1710
        if size == 0:
1711
            return
1712

    
1713
        account_node = self._lookup_account(account, True)[1]
1714
        total = self._get_statistics(account_node, compute=True)[1]
1715
        details.update({'user': user, 'total': total})
1716
        self.messages.append(
1717
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1718
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1719

    
1720
        if not self.using_external_quotaholder:
1721
            return
1722

    
1723
        try:
1724
            name = details['path'] if 'path' in details else ''
1725
            serial = self.astakosclient.issue_one_commission(
1726
                holder=account,
1727
                source=source,
1728
                provisions={'pithos.diskspace': size},
1729
                name=name)
1730
        except BaseException, e:
1731
            raise QuotaError(e)
1732
        else:
1733
            self.serials.append(serial)
1734

    
1735
    @debug_method
1736
    @backend_method
1737
    def _report_object_change(self, user, account, path, details=None):
1738
        details = details or {}
1739
        details.update({'user': user})
1740
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1741
                              account, QUEUE_INSTANCE_ID, 'object', path,
1742
                              details))
1743

    
1744
    @debug_method
1745
    @backend_method
1746
    def _report_sharing_change(self, user, account, path, details=None):
1747
        details = details or {}
1748
        details.update({'user': user})
1749
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1750
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1751
                              details))
1752

    
1753
    # Policy functions.
1754

    
1755
    def _check_project(self, value):
1756
        # raise ValueError('Bad quota source policy')
1757
        pass
1758

    
1759
    def _check_policy(self, policy):
1760
        for k, v in policy.iteritems():
1761
            if k == QUOTA_POLICY:
1762
                q = int(v)  # May raise ValueError.
1763
                if q < 0:
1764
                    raise ValueError
1765
            elif k == VERSIONING_POLICY:
1766
                if v not in ['auto', 'none']:
1767
                    raise ValueError
1768
            elif k == PROJECT:
1769
                self._check_project(v)
1770
            else:
1771
                raise ValueError
1772

    
1773
    def _get_default_policy(self, node=None, is_account_policy=True,
1774
                            default_project=None):
1775
        if is_account_policy:
1776
            default_policy = self.default_account_policy
1777
        else:
1778
            default_policy = self.default_container_policy
1779
            if default_project is None and node is not None:
1780
                # set container's account as the default quota source
1781
                default_project = self.node.node_get_parent_path(node)
1782
            default_policy[PROJECT] = default_project
1783
        return default_policy
1784

    
1785
    def _put_policy(self, node, policy, replace,
1786
                    is_account_policy=True, default_project=None,
1787
                    check=True):
1788
        default_policy = self._get_default_policy(node,
1789
                                                  is_account_policy,
1790
                                                  default_project)
1791
        if replace:
1792
            for k, v in default_policy.iteritems():
1793
                if k not in policy:
1794
                    policy[k] = v
1795
        if check:
1796
            self._check_policy(policy)
1797

    
1798
        self.node.policy_set(node, policy)
1799

    
1800
    def _get_policy(self, node, is_account_policy=True,
1801
                    default_project=None):
1802
        default_policy = self._get_default_policy(node,
1803
                                                  is_account_policy,
1804
                                                  default_project)
1805
        policy = default_policy.copy()
1806
        policy.update(self.node.policy_get(node))
1807
        return policy
1808

    
1809
    def _get_project(self, node):
1810
        policy = self._get_policy(node, is_account_policy=False)
1811
        return policy[PROJECT]
1812

    
1813
    def _apply_versioning(self, account, container, version_id,
1814
                          update_statistics_ancestors_depth=None):
1815
        """Delete the provided version if such is the policy.
1816
           Return size of object removed.
1817
        """
1818

    
1819
        if version_id is None:
1820
            return 0
1821
        path, node = self._lookup_container(account, container)
1822
        versioning = self._get_policy(
1823
            node, is_account_policy=False)[VERSIONING_POLICY]
1824
        if versioning != 'auto':
1825
            hash, size = self.node.version_remove(
1826
                version_id, update_statistics_ancestors_depth)
1827
            self.store.map_delete(hash)
1828
            return size
1829
        elif self.free_versioning:
1830
            return self.node.version_get_properties(
1831
                version_id, keys=('size',))[0]
1832
        return 0
1833

    
1834
    # Access control functions.
1835

    
1836
    def _check_groups(self, groups):
1837
        # raise ValueError('Bad characters in groups')
1838
        pass
1839

    
1840
    def _check_permissions(self, path, permissions):
1841
        # raise ValueError('Bad characters in permissions')
1842
        pass
1843

    
1844
    def _get_formatted_paths(self, paths):
1845
        formatted = []
1846
        if len(paths) == 0:
1847
            return formatted
1848
        props = self.node.get_props(paths)
1849
        if props:
1850
            for prop in props:
1851
                if prop[1].split(';', 1)[0].strip() in (
1852
                        'application/directory', 'application/folder'):
1853
                    formatted.append((prop[0].rstrip('/') + '/',
1854
                                      self.MATCH_PREFIX))
1855
                formatted.append((prop[0], self.MATCH_EXACT))
1856
        return formatted
1857

    
1858
    def _get_permissions_path(self, account, container, name):
1859
        path = '/'.join((account, container, name))
1860
        permission_paths = self.permissions.access_inherit(path)
1861
        permission_paths.sort()
1862
        permission_paths.reverse()
1863
        for p in permission_paths:
1864
            if p == path:
1865
                return p
1866
            else:
1867
                if p.count('/') < 2:
1868
                    continue
1869
                node = self.node.node_lookup(p)
1870
                props = None
1871
                if node is not None:
1872
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1873
                if props is not None:
1874
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1875
                            'application/directory', 'application/folder'):
1876
                        return p
1877
        return None
1878

    
1879
    def _get_permissions_path_bulk(self, account, container, names):
1880
        formatted_paths = []
1881
        for name in names:
1882
            path = '/'.join((account, container, name))
1883
            formatted_paths.append(path)
1884
        permission_paths = self.permissions.access_inherit_bulk(
1885
            formatted_paths)
1886
        permission_paths.sort()
1887
        permission_paths.reverse()
1888
        permission_paths_list = []
1889
        lookup_list = []
1890
        for p in permission_paths:
1891
            if p in formatted_paths:
1892
                permission_paths_list.append(p)
1893
            else:
1894
                if p.count('/') < 2:
1895
                    continue
1896
                lookup_list.append(p)
1897

    
1898
        if len(lookup_list) > 0:
1899
            props = self.node.get_props(lookup_list)
1900
            if props:
1901
                for prop in props:
1902
                    if prop[1].split(';', 1)[0].strip() in (
1903
                            'application/directory', 'application/folder'):
1904
                        permission_paths_list.append(prop[0])
1905

    
1906
        if len(permission_paths_list) > 0:
1907
            return permission_paths_list
1908

    
1909
        return None
1910

    
1911
    def _reset_allowed_paths(self):
1912
        self.read_allowed_paths = defaultdict(set)
1913
        self.write_allowed_paths = defaultdict(set)
1914

    
1915
    @check_allowed_paths(action=0)
1916
    def _can_read_account(self, user, account):
1917
        if user != account:
1918
            if account not in self._allowed_accounts(user):
1919
                raise NotAllowedError
1920

    
1921
    @check_allowed_paths(action=1)
1922
    def _can_write_account(self, user, account):
1923
        if user != account:
1924
            raise NotAllowedError
1925

    
1926
    @check_allowed_paths(action=0)
1927
    def _can_read_container(self, user, account, container):
1928
        if user != account:
1929
            if container not in self._allowed_containers(user, account):
1930
                raise NotAllowedError
1931

    
1932
    @check_allowed_paths(action=1)
1933
    def _can_write_container(self, user, account, container):
1934
        if user != account:
1935
            raise NotAllowedError
1936

    
1937
    @check_allowed_paths(action=0)
1938
    def _can_read_object(self, user, account, container, name):
1939
        if user == account:
1940
            return True
1941
        path = '/'.join((account, container, name))
1942
        if self.permissions.public_get(path) is not None:
1943
            return True
1944
        path = self._get_permissions_path(account, container, name)
1945
        if not path:
1946
            raise NotAllowedError
1947
        if (not self.permissions.access_check(path, self.READ, user) and not
1948
                self.permissions.access_check(path, self.WRITE, user)):
1949
            raise NotAllowedError
1950

    
1951
    @check_allowed_paths(action=1)
1952
    def _can_write_object(self, user, account, container, name):
1953
        if user == account:
1954
            return True
1955
        path = '/'.join((account, container, name))
1956
        path = self._get_permissions_path(account, container, name)
1957
        if not path:
1958
            raise NotAllowedError
1959
        if not self.permissions.access_check(path, self.WRITE, user):
1960
            raise NotAllowedError
1961

    
1962
    def _allowed_accounts(self, user):
1963
        allow = set()
1964
        for path in self.permissions.access_list_paths(user):
1965
            p = path.split('/', 1)[0]
1966
            allow.add(p)
1967
        self.read_allowed_paths[user] |= allow
1968
        return sorted(allow)
1969

    
1970
    def _allowed_containers(self, user, account):
1971
        allow = set()
1972
        for path in self.permissions.access_list_paths(user, account):
1973
            p = path.split('/', 2)[1]
1974
            allow.add(p)
1975
        self.read_allowed_paths[user] |= allow
1976
        return sorted(allow)
1977

    
1978
    # Domain functions
1979

    
1980
    @debug_method
1981
    @backend_method
1982
    def get_domain_objects(self, domain, user=None):
1983
        allowed_paths = self.permissions.access_list_paths(
1984
            user, include_owned=user is not None, include_containers=False)
1985
        if not allowed_paths:
1986
            return []
1987
        obj_list = self.node.domain_object_list(
1988
            domain, allowed_paths, CLUSTER_NORMAL)
1989
        return [(path,
1990
                 self._build_metadata(props, user_defined_meta),
1991
                 self.permissions.access_get(path)) for
1992
                path, props, user_defined_meta in obj_list]
1993

    
1994
    # util functions
1995

    
1996
    def _build_metadata(self, props, user_defined=None,
1997
                        include_user_defined=True):
1998
        meta = {'bytes': props[self.SIZE],
1999
                'type': props[self.TYPE],
2000
                'hash': props[self.HASH],
2001
                'version': props[self.SERIAL],
2002
                'version_timestamp': props[self.MTIME],
2003
                'modified_by': props[self.MUSER],
2004
                'uuid': props[self.UUID],
2005
                'checksum': props[self.CHECKSUM]}
2006
        if include_user_defined and user_defined is not None:
2007
            meta.update(user_defined)
2008
        return meta
2009

    
2010
    def _exists(self, node):
2011
        try:
2012
            self._get_version(node)
2013
        except ItemNotExists:
2014
            return False
2015
        else:
2016
            return True
2017

    
2018
    def _unhexlify_hash(self, hash):
2019
        try:
2020
            return binascii.unhexlify(hash)
2021
        except TypeError:
2022
            raise InvalidHash(hash)