Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 63092950

History | View | Annotate | Download (78.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    @list_method
388
    def list_accounts(self, user, marker=None, limit=10000):
389
        """Return a list of accounts the user can access."""
390

    
391
        paths = self._allowed_accounts(user)
392
        d = self.node.node_lookup_bulk(paths)
393
        if not d:
394
            return ()
395
        nodes = d.keys()
396
        props = self.node.version_lookup_bulk(nodes, cluster=CLUSTER_NORMAL)
397
        mtimes = dict((p[self.NODE], p[self.MTIME]) for p in props)
398
        statistics = self.node.statistics_latest_bulk(nodes,
399
                                                      except_cluster=
400
                                                      CLUSTER_DELETED)
401
        meta = []
402
        append = meta.append
403
        for node, count, size, mtime in statistics:
404
            append({'name': d[node], 'modified':  max(mtimes[node], mtime)})
405
        return meta
406

    
407
    def _get_account_quotas(self, account):
408
        """Get account usage from astakos."""
409

    
410
        quotas = self.astakosclient.service_get_quotas(account)[account]
411
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
412
                                                  {})
413

    
414
    @debug_method
415
    @backend_method
416
    def get_account_meta(self, user, account, domain, until=None,
417
                         include_user_defined=True):
418
        """Return a dictionary with the account metadata for the domain."""
419

    
420
        self._can_read_account(user, account)
421
        path, node = self._lookup_account(account, user == account)
422
        if user != account:
423
            if until or (node is None):
424
                raise NotAllowedError
425
        try:
426
            props = self._get_properties(node, until)
427
            mtime = props[self.MTIME]
428
        except NameError:
429
            props = None
430
            mtime = until
431
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
432
        tstamp = max(tstamp, mtime)
433
        if until is None:
434
            modified = tstamp
435
        else:
436
            modified = self._get_statistics(
437
                node, compute=True)[2]  # Overall last modification.
438
            modified = max(modified, mtime)
439

    
440
        if user != account:
441
            meta = {'name': account}
442
        else:
443
            meta = {}
444
            if props is not None and include_user_defined:
445
                meta.update(
446
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
447
            if until is not None:
448
                meta.update({'until_timestamp': tstamp})
449
            meta.update({'name': account, 'count': count, 'bytes': bytes})
450
            if self.using_external_quotaholder:
451
                external_quota = self._get_account_quotas(account)
452
                meta['bytes'] = external_quota.get('usage', 0)
453
        meta.update({'modified': modified})
454
        return meta
455

    
456
    @debug_method
457
    @backend_method
458
    def update_account_meta(self, user, account, domain, meta, replace=False):
459
        """Update the metadata associated with the account for the domain."""
460

    
461
        self._can_write_account(user, account)
462
        path, node = self._lookup_account(account, True)
463
        self._put_metadata(user, node, domain, meta, replace,
464
                           update_statistics_ancestors_depth=-1)
465

    
466
    @debug_method
467
    @backend_method
468
    def get_account_groups(self, user, account):
469
        """Return a dictionary with the user groups defined for the account."""
470

    
471
        self._can_read_account(user, account)
472
        if user != account:
473
            return {}
474
        self._lookup_account(account, True)
475
        return self.permissions.group_dict(account)
476

    
477
    @debug_method
478
    @backend_method
479
    def update_account_groups(self, user, account, groups, replace=False):
480
        """Update the groups associated with the account."""
481

    
482
        self._can_write_account(user, account)
483
        self._lookup_account(account, True)
484
        self._check_groups(groups)
485
        if replace:
486
            self.permissions.group_destroy(account)
487
        for k, v in groups.iteritems():
488
            if not replace:  # If not already deleted.
489
                self.permissions.group_delete(account, k)
490
            if v:
491
                self.permissions.group_addmany(account, k, v)
492

    
493
    @debug_method
494
    @backend_method
495
    def get_account_policy(self, user, account):
496
        """Return a dictionary with the account policy."""
497

    
498
        self._can_read_account(user, account)
499
        if user != account:
500
            return {}
501
        path, node = self._lookup_account(account, True)
502
        policy = self._get_policy(node, is_account_policy=True)
503
        if self.using_external_quotaholder:
504
            external_quota = self._get_account_quotas(account)
505
            policy['quota'] = external_quota.get('limit', 0)
506
        return policy
507

    
508
    @debug_method
509
    @backend_method
510
    def update_account_policy(self, user, account, policy, replace=False):
511
        """Update the policy associated with the account."""
512

    
513
        self._can_write_account(user, account)
514
        path, node = self._lookup_account(account, True)
515
        self._check_policy(policy, is_account_policy=True)
516
        self._put_policy(node, policy, replace, is_account_policy=True)
517

    
518
    @debug_method
519
    @backend_method
520
    def put_account(self, user, account, policy=None):
521
        """Create a new account with the given name."""
522

    
523
        policy = policy or {}
524
        self._can_write_account(user, account)
525
        node = self.node.node_lookup(account)
526
        if node is not None:
527
            raise AccountExists('Account already exists')
528
        if policy:
529
            self._check_policy(policy, is_account_policy=True)
530
        node = self._put_path(user, self.ROOTNODE, account,
531
                              update_statistics_ancestors_depth=-1)
532
        self._put_policy(node, policy, True, is_account_policy=True)
533

    
534
    @debug_method
535
    @backend_method
536
    def delete_account(self, user, account):
537
        """Delete the account with the given name."""
538

    
539
        self._can_write_account(user, account)
540
        node = self.node.node_lookup(account)
541
        if node is None:
542
            return
543
        if not self.node.node_remove(node,
544
                                     update_statistics_ancestors_depth=-1):
545
            raise AccountNotEmpty('Account is not empty')
546
        self.permissions.group_destroy(account)
547

    
548
        # remove all the cached allowed paths
549
        # removing the specific path could be more expensive
550
        self._reset_allowed_paths()
551

    
552
    @debug_method
553
    @backend_method
554
    @list_method
555
    def list_containers(self, user, account, marker=None, limit=10000,
556
                        shared=False, until=None, public=False):
557
        """Return a list of containers existing under an account."""
558

    
559
        self._can_read_account(user, account)
560
        if user != account:
561
            if until:
562
                raise NotAllowedError
563
            return self._allowed_containers(user, account)
564
        if shared or public:
565
            allowed = set()
566
            if shared:
567
                allowed.update([x.split('/', 2)[1] for x in
568
                               self.permissions.access_list_shared(account)])
569
            if public:
570
                allowed.update([x[0].split('/', 2)[1] for x in
571
                               self.permissions.public_list(account)])
572
            return sorted(allowed)
573
        node = self.node.node_lookup(account)
574
        return [x[0] for x in self._list_object_properties(
575
            node, account, '', '/', marker, limit, False, None, [], until)]
576

    
577
    @debug_method
578
    @backend_method
579
    def list_container_meta(self, user, account, container, domain,
580
                            until=None):
581
        """Return a list of the container's object meta keys for a domain."""
582

    
583
        self._can_read_container(user, account, container)
584
        allowed = []
585
        if user != account:
586
            if until:
587
                raise NotAllowedError
588
        path, node = self._lookup_container(account, container)
589
        before = until if until is not None else inf
590
        allowed = self._get_formatted_paths(allowed)
591
        return self.node.latest_attribute_keys(node, domain, before,
592
                                               CLUSTER_DELETED, allowed)
593

    
594
    @debug_method
595
    @backend_method
596
    def get_container_meta(self, user, account, container, domain, until=None,
597
                           include_user_defined=True):
598
        """Return a dictionary with the container metadata for the domain."""
599

    
600
        self._can_read_container(user, account, container)
601
        if user != account:
602
            if until:
603
                raise NotAllowedError
604
        path, node = self._lookup_container(account, container)
605
        props = self._get_properties(node, until)
606
        mtime = props[self.MTIME]
607
        count, bytes, tstamp = self._get_statistics(node, until)
608
        tstamp = max(tstamp, mtime)
609
        if until is None:
610
            modified = tstamp
611
        else:
612
            modified = self._get_statistics(
613
                node)[2]  # Overall last modification.
614
            modified = max(modified, mtime)
615

    
616
        if user != account:
617
            meta = {'name': container}
618
        else:
619
            meta = {}
620
            if include_user_defined:
621
                meta.update(
622
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
623
            if until is not None:
624
                meta.update({'until_timestamp': tstamp})
625
            meta.update({'name': container, 'count': count, 'bytes': bytes})
626
        meta.update({'modified': modified})
627
        return meta
628

    
629
    @debug_method
630
    @backend_method
631
    def update_container_meta(self, user, account, container, domain, meta,
632
                              replace=False):
633
        """Update the metadata associated with the container for the domain."""
634

    
635
        self._can_write_container(user, account, container)
636
        path, node = self._lookup_container(account, container)
637
        src_version_id, dest_version_id = self._put_metadata(
638
            user, node, domain, meta, replace,
639
            update_statistics_ancestors_depth=0)
640
        if src_version_id is not None:
641
            versioning = self._get_policy(
642
                node, is_account_policy=False)['versioning']
643
            if versioning != 'auto':
644
                self.node.version_remove(src_version_id,
645
                                         update_statistics_ancestors_depth=0)
646

    
647
    @debug_method
648
    @backend_method
649
    def get_container_policy(self, user, account, container):
650
        """Return a dictionary with the container policy."""
651

    
652
        self._can_read_container(user, account, container)
653
        if user != account:
654
            return {}
655
        path, node = self._lookup_container(account, container)
656
        return self._get_policy(node, is_account_policy=False)
657

    
658
    @debug_method
659
    @backend_method
660
    def update_container_policy(self, user, account, container, policy,
661
                                replace=False):
662
        """Update the policy associated with the container."""
663

    
664
        self._can_write_container(user, account, container)
665
        path, node = self._lookup_container(account, container)
666
        self._check_policy(policy, is_account_policy=False)
667
        self._put_policy(node, policy, replace, is_account_policy=False)
668

    
669
    @debug_method
670
    @backend_method
671
    def put_container(self, user, account, container, policy=None):
672
        """Create a new container with the given name."""
673

    
674
        policy = policy or {}
675
        self._can_write_container(user, account, container)
676
        try:
677
            path, node = self._lookup_container(account, container)
678
        except NameError:
679
            pass
680
        else:
681
            raise ContainerExists('Container already exists')
682
        if policy:
683
            self._check_policy(policy, is_account_policy=False)
684
        path = '/'.join((account, container))
685
        node = self._put_path(
686
            user, self._lookup_account(account, True)[1], path,
687
            update_statistics_ancestors_depth=-1)
688
        self._put_policy(node, policy, True, is_account_policy=False)
689

    
690
    @debug_method
691
    @backend_method
692
    def delete_container(self, user, account, container, until=None, prefix='',
693
                         delimiter=None):
694
        """Delete/purge the container with the given name."""
695

    
696
        self._can_write_container(user, account, container)
697
        path, node = self._lookup_container(account, container)
698

    
699
        if until is not None:
700
            hashes, size, serials = self.node.node_purge_children(
701
                node, until, CLUSTER_HISTORY,
702
                update_statistics_ancestors_depth=0)
703
            for h in hashes:
704
                self.store.map_delete(h)
705
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
706
                                          update_statistics_ancestors_depth=0)
707
            if not self.free_versioning:
708
                self._report_size_change(
709
                    user, account, -size, {
710
                        'action': 'container purge',
711
                        'path': path,
712
                        'versions': ','.join(str(i) for i in serials)
713
                    }
714
                )
715
            return
716

    
717
        if not delimiter:
718
            if self._get_statistics(node)[0] > 0:
719
                raise ContainerNotEmpty('Container is not empty')
720
            hashes, size, serials = self.node.node_purge_children(
721
                node, inf, CLUSTER_HISTORY,
722
                update_statistics_ancestors_depth=0)
723
            for h in hashes:
724
                self.store.map_delete(h)
725
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
726
                                          update_statistics_ancestors_depth=0)
727
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
728
            if not self.free_versioning:
729
                self._report_size_change(
730
                    user, account, -size, {
731
                        'action': 'container purge',
732
                        'path': path,
733
                        'versions': ','.join(str(i) for i in serials)
734
                    }
735
                )
736
        else:
737
            # remove only contents
738
            src_names = self._list_objects_no_limit(
739
                user, account, container, prefix='', delimiter=None,
740
                virtual=False, domain=None, keys=[], shared=False, until=None,
741
                size_range=None, all_props=True, public=False)
742
            paths = []
743
            for t in src_names:
744
                path = '/'.join((account, container, t[0]))
745
                node = t[2]
746
                if not self._exists(node):
747
                    continue
748
                src_version_id, dest_version_id = self._put_version_duplicate(
749
                    user, node, size=0, type='', hash=None, checksum='',
750
                    cluster=CLUSTER_DELETED,
751
                    update_statistics_ancestors_depth=1)
752
                del_size = self._apply_versioning(
753
                    account, container, src_version_id,
754
                    update_statistics_ancestors_depth=1)
755
                self._report_size_change(
756
                    user, account, -del_size, {
757
                        'action': 'object delete',
758
                        'path': path,
759
                        'versions': ','.join([str(dest_version_id)])})
760
                self._report_object_change(
761
                    user, account, path, details={'action': 'object delete'})
762
                paths.append(path)
763
            self.permissions.access_clear_bulk(paths)
764

    
765
        # remove all the cached allowed paths
766
        # removing the specific path could be more expensive
767
        self._reset_allowed_paths()
768

    
769
    def _list_objects(self, user, account, container, prefix, delimiter,
770
                      marker, limit, virtual, domain, keys, shared, until,
771
                      size_range, all_props, public):
772
        if user != account and until:
773
            raise NotAllowedError
774

    
775
        objects = set()
776
        if shared and public:
777
            # get shared first
778
            shared_paths = self._list_object_permissions(
779
                user, account, container, prefix, shared=True, public=False)
780
            if shared_paths:
781
                path, node = self._lookup_container(account, container)
782
                shared_paths = self._get_formatted_paths(shared_paths)
783
                objects = set(self._list_object_properties(
784
                    node, path, prefix, delimiter, marker, limit, virtual,
785
                    domain, keys, until, size_range, shared_paths, all_props))
786

    
787
            # get public
788
            objects |= set(self._list_public_object_properties(
789
                user, account, container, prefix, all_props, until))
790
            objects = list(objects)
791

    
792
            objects.sort(key=lambda x: x[0])
793
        elif public:
794
            objects = self._list_public_object_properties(
795
                user, account, container, prefix, all_props, until)
796
        else:
797
            allowed = self._list_object_permissions(
798
                user, account, container, prefix, shared, public=False)
799
            if shared and not allowed:
800
                return []
801
            path, node = self._lookup_container(account, container)
802
            allowed = self._get_formatted_paths(allowed)
803
            objects = self._list_object_properties(
804
                node, path, prefix, delimiter, marker, limit, virtual, domain,
805
                keys, until, size_range, allowed, all_props)
806

    
807
        # apply limits
808
        start, limit = self._list_limits(objects, marker, limit)
809
        return objects[start:start + limit]
810

    
811
    def _list_public_object_properties(self, user, account, container, prefix,
812
                                       all_props, until):
813
        public = self._list_object_permissions(
814
            user, account, container, prefix, shared=False, public=True)
815
        d = self.node.node_lookup_bulk(public)
816
        nodes = d.keys()
817
        cont_prefix = '/'.join((account, container)) + '/'
818
        before = until if until is not None else inf
819
        object_path = lambda p: p[len(cont_prefix):]
820
        slice_props = lambda l: l if all_props else l[:1]
821
        versions = self.node.version_lookup_bulk(nodes, before=before,
822
                                                 all_props=True,
823
                                                 order_by_path=True)
824
        objects = [(object_path(d[p[self.NODE]]),) + slice_props(tuple(p)) for
825
                   p in versions]
826
        return objects
827

    
828
    def _list_objects_no_limit(self, user, account, container, prefix,
829
                               delimiter, virtual, domain, keys, shared, until,
830
                               size_range, all_props, public):
831
        objects = []
832
        while True:
833
            marker = objects[-1] if objects else None
834
            limit = 10000
835
            l = self._list_objects(
836
                user, account, container, prefix, delimiter, marker, limit,
837
                virtual, domain, keys, shared, until, size_range, all_props,
838
                public)
839
            objects.extend(l)
840
            if not l or len(l) < limit:
841
                break
842
        return objects
843

    
844
    def _list_object_permissions(self, user, account, container, prefix,
845
                                 shared, public):
846
        allowed = []
847
        path = '/'.join((account, container, prefix)).rstrip('/')
848
        if user != account:
849
            allowed = self.permissions.access_list_paths(user, path)
850
            if not allowed:
851
                raise NotAllowedError
852
        else:
853
            allowed = set()
854
            if shared:
855
                allowed.update(self.permissions.access_list_shared(path))
856
            if public:
857
                allowed.update(
858
                    [x[0] for x in self.permissions.public_list(path)])
859
            allowed = sorted(allowed)
860
            if not allowed:
861
                return []
862
        return allowed
863

    
864
    @debug_method
865
    @backend_method
866
    def list_objects(self, user, account, container, prefix='', delimiter=None,
867
                     marker=None, limit=10000, virtual=True, domain=None,
868
                     keys=None, shared=False, until=None, size_range=None,
869
                     public=False):
870
        """List (object name, object version_id) under a container."""
871

    
872
        keys = keys or []
873
        return self._list_objects(
874
            user, account, container, prefix, delimiter, marker, limit,
875
            virtual, domain, keys, shared, until, size_range, False, public)
876

    
877
    @debug_method
878
    @backend_method
879
    def list_object_meta(self, user, account, container, prefix='',
880
                         delimiter=None, marker=None, limit=10000,
881
                         virtual=True, domain=None, keys=None, shared=False,
882
                         until=None, size_range=None, public=False):
883
        """Return a list of metadata dicts of objects under a container."""
884

    
885
        keys = keys or []
886
        props = self._list_objects(
887
            user, account, container, prefix, delimiter, marker, limit,
888
            virtual, domain, keys, shared, until, size_range, True, public)
889
        objects = []
890
        for p in props:
891
            if len(p) == 2:
892
                objects.append({'subdir': p[0]})
893
            else:
894
                objects.append({
895
                    'name': p[0],
896
                    'bytes': p[self.SIZE + 1],
897
                    'type': p[self.TYPE + 1],
898
                    'hash': p[self.HASH + 1],
899
                    'version': p[self.SERIAL + 1],
900
                    'version_timestamp': p[self.MTIME + 1],
901
                    'modified': p[self.MTIME + 1] if until is None else None,
902
                    'modified_by': p[self.MUSER + 1],
903
                    'uuid': p[self.UUID + 1],
904
                    'checksum': p[self.CHECKSUM + 1]})
905
        return objects
906

    
907
    @debug_method
908
    @backend_method
909
    def list_object_permissions(self, user, account, container, prefix=''):
910
        """Return a list of paths enforce permissions under a container."""
911

    
912
        return self._list_object_permissions(user, account, container, prefix,
913
                                             True, False)
914

    
915
    @debug_method
916
    @backend_method
917
    def list_object_public(self, user, account, container, prefix=''):
918
        """Return a mapping of object paths to public ids under a container."""
919

    
920
        public = {}
921
        for path, p in self.permissions.public_list('/'.join((account,
922
                                                              container,
923
                                                              prefix))):
924
            public[path] = p
925
        return public
926

    
927
    @debug_method
928
    @backend_method
929
    def get_object_meta(self, user, account, container, name, domain,
930
                        version=None, include_user_defined=True):
931
        """Return a dictionary with the object metadata for the domain."""
932

    
933
        self._can_read_object(user, account, container, name)
934
        path, node = self._lookup_object(account, container, name)
935
        props = self._get_version(node, version)
936
        if version is None:
937
            modified = props[self.MTIME]
938
        else:
939
            try:
940
                modified = self._get_version(
941
                    node)[self.MTIME]  # Overall last modification.
942
            except NameError:  # Object may be deleted.
943
                del_props = self.node.version_lookup(
944
                    node, inf, CLUSTER_DELETED)
945
                if del_props is None:
946
                    raise ItemNotExists('Object does not exist')
947
                modified = del_props[self.MTIME]
948

    
949
        meta = {}
950
        if include_user_defined:
951
            meta.update(
952
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
953
        meta.update({'name': name,
954
                     'bytes': props[self.SIZE],
955
                     'type': props[self.TYPE],
956
                     'hash': props[self.HASH],
957
                     'version': props[self.SERIAL],
958
                     'version_timestamp': props[self.MTIME],
959
                     'modified': modified,
960
                     'modified_by': props[self.MUSER],
961
                     'uuid': props[self.UUID],
962
                     'checksum': props[self.CHECKSUM]})
963
        return meta
964

    
965
    @debug_method
966
    @backend_method
967
    def update_object_meta(self, user, account, container, name, domain, meta,
968
                           replace=False):
969
        """Update object metadata for a domain and return the new version."""
970

    
971
        self._can_write_object(user, account, container, name)
972

    
973
        path, node = self._lookup_object(account, container, name,
974
                                         lock_container=True)
975
        src_version_id, dest_version_id = self._put_metadata(
976
            user, node, domain, meta, replace,
977
            update_statistics_ancestors_depth=1)
978
        self._apply_versioning(account, container, src_version_id,
979
                               update_statistics_ancestors_depth=1)
980
        return dest_version_id
981

    
982
    @debug_method
983
    @backend_method
984
    def get_object_permissions_bulk(self, user, account, container, names):
985
        """Return the action allowed on the object, the path
986
        from which the object gets its permissions from,
987
        along with a dictionary containing the permissions."""
988

    
989
        permissions_path = self._get_permissions_path_bulk(account, container,
990
                                                           names)
991
        access_objects = self.permissions.access_check_bulk(permissions_path,
992
                                                            user)
993
        #group_parents = access_objects['group_parents']
994
        nobject_permissions = {}
995
        cpath = '/'.join((account, container, ''))
996
        cpath_idx = len(cpath)
997
        for path in permissions_path:
998
            allowed = 1
999
            name = path[cpath_idx:]
1000
            if user != account:
1001
                try:
1002
                    allowed = access_objects[path]
1003
                except KeyError:
1004
                    raise NotAllowedError
1005
            access_dict, allowed = \
1006
                self.permissions.access_get_for_bulk(access_objects[path])
1007
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1008
                                         access_dict)
1009
        #self._lookup_objects(permissions_path)
1010
        return nobject_permissions
1011

    
1012
    @debug_method
1013
    @backend_method
1014
    def get_object_permissions(self, user, account, container, name):
1015
        """Return the action allowed on the object, the path
1016
        from which the object gets its permissions from,
1017
        along with a dictionary containing the permissions."""
1018

    
1019
        allowed = 'write'
1020
        permissions_path = self._get_permissions_path(account, container, name)
1021
        if user != account:
1022
            if self.permissions.access_check(permissions_path, self.WRITE,
1023
                                             user):
1024
                allowed = 'write'
1025
            elif self.permissions.access_check(permissions_path, self.READ,
1026
                                               user):
1027
                allowed = 'read'
1028
            else:
1029
                raise NotAllowedError
1030
        self._lookup_object(account, container, name)
1031
        return (allowed,
1032
                permissions_path,
1033
                self.permissions.access_get(permissions_path))
1034

    
1035
    @debug_method
1036
    @backend_method
1037
    def update_object_permissions(self, user, account, container, name,
1038
                                  permissions):
1039
        """Update the permissions associated with the object."""
1040

    
1041
        if user != account:
1042
            raise NotAllowedError
1043
        path = self._lookup_object(account, container, name,
1044
                                   lock_container=True)[0]
1045
        self._check_permissions(path, permissions)
1046
        try:
1047
            self.permissions.access_set(path, permissions)
1048
        except:
1049
            raise ValueError
1050
        else:
1051
            self._report_sharing_change(user, account, path, {'members':
1052
                                        self.permissions.access_members(path)})
1053

    
1054
        # remove all the cached allowed paths
1055
        # filtering out only those affected could be more expensive
1056
        self._reset_allowed_paths()
1057

    
1058
    @debug_method
1059
    @backend_method
1060
    def get_object_public(self, user, account, container, name):
1061
        """Return the public id of the object if applicable."""
1062

    
1063
        self._can_read_object(user, account, container, name)
1064
        path = self._lookup_object(account, container, name)[0]
1065
        p = self.permissions.public_get(path)
1066
        return p
1067

    
1068
    @debug_method
1069
    @backend_method
1070
    def update_object_public(self, user, account, container, name, public):
1071
        """Update the public status of the object."""
1072

    
1073
        self._can_write_object(user, account, container, name)
1074
        path = self._lookup_object(account, container, name,
1075
                                   lock_container=True)[0]
1076
        if not public:
1077
            self.permissions.public_unset(path)
1078
        else:
1079
            self.permissions.public_set(
1080
                path, self.public_url_security, self.public_url_alphabet)
1081

    
1082
    @debug_method
1083
    @backend_method
1084
    def get_object_hashmap(self, user, account, container, name, version=None):
1085
        """Return the object's size and a list with partial hashes."""
1086

    
1087
        self._can_read_object(user, account, container, name)
1088
        path, node = self._lookup_object(account, container, name)
1089
        props = self._get_version(node, version)
1090
        if props[self.HASH] is None:
1091
            return 0, ()
1092
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1093
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1094

    
1095
    def _update_object_hash(self, user, account, container, name, size, type,
1096
                            hash, checksum, domain, meta, replace_meta,
1097
                            permissions, src_node=None, src_version_id=None,
1098
                            is_copy=False, report_size_change=True):
1099
        if permissions is not None and user != account:
1100
            raise NotAllowedError
1101
        self._can_write_object(user, account, container, name)
1102
        if permissions is not None:
1103
            path = '/'.join((account, container, name))
1104
            self._check_permissions(path, permissions)
1105

    
1106
        account_path, account_node = self._lookup_account(account, True)
1107
        container_path, container_node = self._lookup_container(
1108
            account, container)
1109

    
1110
        path, node = self._put_object_node(
1111
            container_path, container_node, name)
1112
        pre_version_id, dest_version_id = self._put_version_duplicate(
1113
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1114
            checksum=checksum, is_copy=is_copy,
1115
            update_statistics_ancestors_depth=1)
1116

    
1117
        # Handle meta.
1118
        if src_version_id is None:
1119
            src_version_id = pre_version_id
1120
        self._put_metadata_duplicate(
1121
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1122

    
1123
        del_size = self._apply_versioning(account, container, pre_version_id,
1124
                                          update_statistics_ancestors_depth=1)
1125
        size_delta = size - del_size
1126
        if size_delta > 0:
1127
            # Check account quota.
1128
            if not self.using_external_quotaholder:
1129
                account_quota = long(self._get_policy(
1130
                    account_node, is_account_policy=True)['quota'])
1131
                account_usage = self._get_statistics(account_node,
1132
                                                     compute=True)[1]
1133
                if (account_quota > 0 and account_usage > account_quota):
1134
                    raise QuotaError(
1135
                        'Account quota exceeded: limit: %s, usage: %s' % (
1136
                            account_quota, account_usage))
1137

    
1138
            # Check container quota.
1139
            container_quota = long(self._get_policy(
1140
                container_node, is_account_policy=False)['quota'])
1141
            container_usage = self._get_statistics(container_node)[1]
1142
            if (container_quota > 0 and container_usage > container_quota):
1143
                # This must be executed in a transaction, so the version is
1144
                # never created if it fails.
1145
                raise QuotaError(
1146
                    'Container quota exceeded: limit: %s, usage: %s' % (
1147
                        container_quota, container_usage
1148
                    )
1149
                )
1150

    
1151
        if report_size_change:
1152
            self._report_size_change(
1153
                user, account, size_delta,
1154
                {'action': 'object update', 'path': path,
1155
                 'versions': ','.join([str(dest_version_id)])})
1156
        if permissions is not None:
1157
            self.permissions.access_set(path, permissions)
1158
            self._report_sharing_change(
1159
                user, account, path,
1160
                {'members': self.permissions.access_members(path)})
1161

    
1162
        self._report_object_change(
1163
            user, account, path,
1164
            details={'version': dest_version_id, 'action': 'object update'})
1165
        return dest_version_id
1166

    
1167
    @debug_method
1168
    def update_object_hashmap(self, user, account, container, name, size, type,
1169
                              hashmap, checksum, domain, meta=None,
1170
                              replace_meta=False, permissions=None):
1171
        """Create/update an object's hashmap and return the new version."""
1172

    
1173
        meta = meta or {}
1174
        if size == 0:  # No such thing as an empty hashmap.
1175
            hashmap = [self.put_block('')]
1176
        map = HashMap(self.block_size, self.hash_algorithm)
1177
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1178
        missing = self.store.block_search(map)
1179
        if missing:
1180
            ie = IndexError()
1181
            ie.data = [binascii.hexlify(x) for x in missing]
1182
            raise ie
1183

    
1184
        hash = map.hash()
1185
        hexlified = binascii.hexlify(hash)
1186
        # _update_object_hash() locks destination path
1187
        dest_version_id = self._update_object_hash(
1188
            user, account, container, name, size, type, hexlified, checksum,
1189
            domain, meta, replace_meta, permissions)
1190
        self.store.map_put(hash, map)
1191
        return dest_version_id, hexlified
1192

    
1193
    @debug_method
1194
    @backend_method
1195
    def update_object_checksum(self, user, account, container, name, version,
1196
                               checksum):
1197
        """Update an object's checksum."""
1198

    
1199
        # Update objects with greater version and same hashmap
1200
        # and size (fix metadata updates).
1201
        self._can_write_object(user, account, container, name)
1202
        path, node = self._lookup_object(account, container, name,
1203
                                         lock_container=True)
1204
        props = self._get_version(node, version)
1205
        versions = self.node.node_get_versions(node)
1206
        for x in versions:
1207
            if (x[self.SERIAL] >= int(version) and
1208
                x[self.HASH] == props[self.HASH] and
1209
                    x[self.SIZE] == props[self.SIZE]):
1210
                self.node.version_put_property(
1211
                    x[self.SERIAL], 'checksum', checksum)
1212

    
1213
    def _copy_object(self, user, src_account, src_container, src_name,
1214
                     dest_account, dest_container, dest_name, type,
1215
                     dest_domain=None, dest_meta=None, replace_meta=False,
1216
                     permissions=None, src_version=None, is_move=False,
1217
                     delimiter=None):
1218

    
1219
        report_size_change = not is_move
1220
        dest_meta = dest_meta or {}
1221
        dest_version_ids = []
1222
        self._can_read_object(user, src_account, src_container, src_name)
1223

    
1224
        src_container_path = '/'.join((src_account, src_container))
1225
        dest_container_path = '/'.join((dest_account, dest_container))
1226
        # Lock container paths in alphabetical order
1227
        if src_container_path < dest_container_path:
1228
            self._lookup_container(src_account, src_container)
1229
            self._lookup_container(dest_account, dest_container)
1230
        else:
1231
            self._lookup_container(dest_account, dest_container)
1232
            self._lookup_container(src_account, src_container)
1233

    
1234
        path, node = self._lookup_object(src_account, src_container, src_name)
1235
        # TODO: Will do another fetch of the properties in duplicate version...
1236
        props = self._get_version(
1237
            node, src_version)  # Check to see if source exists.
1238
        src_version_id = props[self.SERIAL]
1239
        hash = props[self.HASH]
1240
        size = props[self.SIZE]
1241
        is_copy = not is_move and (src_account, src_container, src_name) != (
1242
            dest_account, dest_container, dest_name)  # New uuid.
1243
        dest_version_ids.append(self._update_object_hash(
1244
            user, dest_account, dest_container, dest_name, size, type, hash,
1245
            None, dest_domain, dest_meta, replace_meta, permissions,
1246
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1247
            report_size_change=report_size_change))
1248
        if is_move and ((src_account, src_container, src_name) !=
1249
                        (dest_account, dest_container, dest_name)):
1250
            self._delete_object(user, src_account, src_container, src_name,
1251
                                report_size_change=report_size_change)
1252

    
1253
        if delimiter:
1254
            prefix = (src_name + delimiter if not
1255
                      src_name.endswith(delimiter) else src_name)
1256
            src_names = self._list_objects_no_limit(
1257
                user, src_account, src_container, prefix, delimiter=None,
1258
                virtual=False, domain=None, keys=[], shared=False, until=None,
1259
                size_range=None, all_props=True, public=False)
1260
            src_names.sort(key=lambda x: x[2])  # order by nodes
1261
            paths = [elem[0] for elem in src_names]
1262
            nodes = [elem[2] for elem in src_names]
1263
            # TODO: Will do another fetch of the properties
1264
            # in duplicate version...
1265
            props = self._get_versions(nodes)  # Check to see if source exists.
1266

    
1267
            for prop, path, node in zip(props, paths, nodes):
1268
                src_version_id = prop[self.SERIAL]
1269
                hash = prop[self.HASH]
1270
                vtype = prop[self.TYPE]
1271
                size = prop[self.SIZE]
1272
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1273
                    delimiter) else dest_name
1274
                vdest_name = path.replace(prefix, dest_prefix, 1)
1275
                # _update_object_hash() locks destination path
1276
                dest_version_ids.append(self._update_object_hash(
1277
                    user, dest_account, dest_container, vdest_name, size,
1278
                    vtype, hash, None, dest_domain, meta={},
1279
                    replace_meta=False, permissions=None, src_node=node,
1280
                    src_version_id=src_version_id, is_copy=is_copy,
1281
                    report_size_change=report_size_change))
1282
                if is_move and ((src_account, src_container, src_name) !=
1283
                                (dest_account, dest_container, dest_name)):
1284
                    self._delete_object(user, src_account, src_container, path,
1285
                                        report_size_change=report_size_change)
1286
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1287
                dest_version_ids)
1288

    
1289
    @debug_method
1290
    @backend_method
1291
    def copy_object(self, user, src_account, src_container, src_name,
1292
                    dest_account, dest_container, dest_name, type, domain,
1293
                    meta=None, replace_meta=False, permissions=None,
1294
                    src_version=None, delimiter=None):
1295
        """Copy an object's data and metadata."""
1296

    
1297
        meta = meta or {}
1298
        dest_version_id = self._copy_object(
1299
            user, src_account, src_container, src_name, dest_account,
1300
            dest_container, dest_name, type, domain, meta, replace_meta,
1301
            permissions, src_version, False, delimiter)
1302
        return dest_version_id
1303

    
1304
    @debug_method
1305
    @backend_method
1306
    def move_object(self, user, src_account, src_container, src_name,
1307
                    dest_account, dest_container, dest_name, type, domain,
1308
                    meta=None, replace_meta=False, permissions=None,
1309
                    delimiter=None):
1310
        """Move an object's data and metadata."""
1311

    
1312
        meta = meta or {}
1313
        if user != src_account:
1314
            raise NotAllowedError
1315
        dest_version_id = self._move_object(
1316
            user, src_account, src_container, src_name, dest_account,
1317
            dest_container, dest_name, type, domain, meta, replace_meta,
1318
            permissions, None, delimiter=delimiter)
1319
        return dest_version_id
1320

    
1321
    def _delete_object(self, user, account, container, name, until=None,
1322
                       delimiter=None, report_size_change=True):
1323
        if user != account:
1324
            raise NotAllowedError
1325

    
1326
        # lookup object and lock container path also
1327
        path, node = self._lookup_object(account, container, name,
1328
                                         lock_container=True)
1329

    
1330
        if until is not None:
1331
            if node is None:
1332
                return
1333
            hashes = []
1334
            size = 0
1335
            serials = []
1336
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1337
                                           update_statistics_ancestors_depth=1)
1338
            hashes += h
1339
            size += s
1340
            serials += v
1341
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1342
                                           update_statistics_ancestors_depth=1)
1343
            hashes += h
1344
            if not self.free_versioning:
1345
                size += s
1346
            serials += v
1347
            for h in hashes:
1348
                self.store.map_delete(h)
1349
            self.node.node_purge(node, until, CLUSTER_DELETED,
1350
                                 update_statistics_ancestors_depth=1)
1351
            try:
1352
                self._get_version(node)
1353
            except NameError:
1354
                self.permissions.access_clear(path)
1355
            self._report_size_change(
1356
                user, account, -size, {
1357
                    'action': 'object purge',
1358
                    'path': path,
1359
                    'versions': ','.join(str(i) for i in serials)
1360
                }
1361
            )
1362
            return
1363

    
1364
        if not self._exists(node):
1365
            raise ItemNotExists('Object is deleted.')
1366

    
1367
        src_version_id, dest_version_id = self._put_version_duplicate(
1368
            user, node, size=0, type='', hash=None, checksum='',
1369
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1370
        del_size = self._apply_versioning(account, container, src_version_id,
1371
                                          update_statistics_ancestors_depth=1)
1372
        if report_size_change:
1373
            self._report_size_change(
1374
                user, account, -del_size,
1375
                {'action': 'object delete',
1376
                 'path': path,
1377
                 'versions': ','.join([str(dest_version_id)])})
1378
        self._report_object_change(
1379
            user, account, path, details={'action': 'object delete'})
1380
        self.permissions.access_clear(path)
1381

    
1382
        if delimiter:
1383
            prefix = name + delimiter if not name.endswith(delimiter) else name
1384
            src_names = self._list_objects_no_limit(
1385
                user, account, container, prefix, delimiter=None,
1386
                virtual=False, domain=None, keys=[], shared=False, until=None,
1387
                size_range=None, all_props=True, public=False)
1388
            paths = []
1389
            for t in src_names:
1390
                path = '/'.join((account, container, t[0]))
1391
                node = t[2]
1392
                if not self._exists(node):
1393
                    continue
1394
                src_version_id, dest_version_id = self._put_version_duplicate(
1395
                    user, node, size=0, type='', hash=None, checksum='',
1396
                    cluster=CLUSTER_DELETED,
1397
                    update_statistics_ancestors_depth=1)
1398
                del_size = self._apply_versioning(
1399
                    account, container, src_version_id,
1400
                    update_statistics_ancestors_depth=1)
1401
                if report_size_change:
1402
                    self._report_size_change(
1403
                        user, account, -del_size,
1404
                        {'action': 'object delete',
1405
                         'path': path,
1406
                         'versions': ','.join([str(dest_version_id)])})
1407
                self._report_object_change(
1408
                    user, account, path, details={'action': 'object delete'})
1409
                paths.append(path)
1410
            self.permissions.access_clear_bulk(paths)
1411

    
1412
        # remove all the cached allowed paths
1413
        # removing the specific path could be more expensive
1414
        self._reset_allowed_paths()
1415

    
1416
    @debug_method
1417
    @backend_method
1418
    def delete_object(self, user, account, container, name, until=None,
1419
                      prefix='', delimiter=None):
1420
        """Delete/purge an object."""
1421

    
1422
        self._delete_object(user, account, container, name, until, delimiter)
1423

    
1424
    @debug_method
1425
    @backend_method
1426
    def list_versions(self, user, account, container, name):
1427
        """Return a list of all object (version, version_timestamp) tuples."""
1428

    
1429
        self._can_read_object(user, account, container, name)
1430
        path, node = self._lookup_object(account, container, name)
1431
        versions = self.node.node_get_versions(node)
1432
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1433
                x[self.CLUSTER] != CLUSTER_DELETED]
1434

    
1435
    @debug_method
1436
    @backend_method
1437
    def get_uuid(self, user, uuid, check_permissions=True):
1438
        """Return the (account, container, name) for the UUID given."""
1439

    
1440
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1441
        if info is None:
1442
            raise NameError
1443
        path, serial = info
1444
        account, container, name = path.split('/', 2)
1445
        if check_permissions:
1446
            self._can_read_object(user, account, container, name)
1447
        return (account, container, name)
1448

    
1449
    @debug_method
1450
    @backend_method
1451
    def get_public(self, user, public):
1452
        """Return the (account, container, name) for the public id given."""
1453

    
1454
        path = self.permissions.public_path(public)
1455
        if path is None:
1456
            raise NameError
1457
        account, container, name = path.split('/', 2)
1458
        self._can_read_object(user, account, container, name)
1459
        return (account, container, name)
1460

    
1461
    def get_block(self, hash):
1462
        """Return a block's data."""
1463

    
1464
        logger.debug("get_block: %s", hash)
1465
        block = self.store.block_get(self._unhexlify_hash(hash))
1466
        if not block:
1467
            raise ItemNotExists('Block does not exist')
1468
        return block
1469

    
1470
    def put_block(self, data):
1471
        """Store a block and return the hash."""
1472

    
1473
        logger.debug("put_block: %s", len(data))
1474
        return binascii.hexlify(self.store.block_put(data))
1475

    
1476
    def update_block(self, hash, data, offset=0):
1477
        """Update a known block and return the hash."""
1478

    
1479
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1480
        if offset == 0 and len(data) == self.block_size:
1481
            return self.put_block(data)
1482
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1483
        return binascii.hexlify(h)
1484

    
1485
    # Path functions.
1486

    
1487
    def _generate_uuid(self):
1488
        return str(uuidlib.uuid4())
1489

    
1490
    def _put_object_node(self, path, parent, name):
1491
        path = '/'.join((path, name))
1492
        node = self.node.node_lookup(path)
1493
        if node is None:
1494
            node = self.node.node_create(parent, path)
1495
        return path, node
1496

    
1497
    def _put_path(self, user, parent, path,
1498
                  update_statistics_ancestors_depth=None):
1499
        node = self.node.node_create(parent, path)
1500
        self.node.version_create(node, None, 0, '', None, user,
1501
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1502
                                 update_statistics_ancestors_depth)
1503
        return node
1504

    
1505
    def _lookup_account(self, account, create=True):
1506
        node = self.node.node_lookup(account)
1507
        if node is None and create:
1508
            node = self._put_path(
1509
                account, self.ROOTNODE, account,
1510
                update_statistics_ancestors_depth=-1)  # User is account.
1511
        return account, node
1512

    
1513
    def _lookup_container(self, account, container):
1514
        for_update = True if self.lock_container_path else False
1515
        path = '/'.join((account, container))
1516
        node = self.node.node_lookup(path, for_update)
1517
        if node is None:
1518
            raise ItemNotExists('Container does not exist')
1519
        return path, node
1520

    
1521
    def _lookup_object(self, account, container, name, lock_container=False):
1522
        if lock_container:
1523
            self._lookup_container(account, container)
1524

    
1525
        path = '/'.join((account, container, name))
1526
        node = self.node.node_lookup(path)
1527
        if node is None:
1528
            raise ItemNotExists('Object does not exist')
1529
        return path, node
1530

    
1531
    def _lookup_objects(self, paths):
1532
        return self.node.node_lookup_bulk(paths)
1533

    
1534
    def _get_properties(self, node, until=None):
1535
        """Return properties until the timestamp given."""
1536

    
1537
        before = until if until is not None else inf
1538
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1539
        if props is None and until is not None:
1540
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1541
        if props is None:
1542
            raise ItemNotExists('Path does not exist')
1543
        return props
1544

    
1545
    def _get_properties_bulk(self, nodes):
1546
        """Return properties for the specified nodes."""
1547

    
1548
        return self.node.version_lookup(nodes, cluster=CLUSTER_NORMAL)
1549

    
1550
    def _get_statistics(self, node, until=None, compute=False):
1551
        """Return (count, sum of size, timestamp) of everything under node."""
1552

    
1553
        if until is not None:
1554
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1555
        elif compute:
1556
            stats = self.node.statistics_latest(node,
1557
                                                except_cluster=CLUSTER_DELETED)
1558
        else:
1559
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1560
        if stats is None:
1561
            stats = (0, 0, 0)
1562
        return stats
1563

    
1564
    def _get_version(self, node, version=None):
1565
        if version is None:
1566
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1567
            if props is None:
1568
                raise ItemNotExists('Object does not exist')
1569
        else:
1570
            try:
1571
                version = int(version)
1572
            except ValueError:
1573
                raise VersionNotExists('Version does not exist')
1574
            props = self.node.version_get_properties(version, node=node)
1575
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1576
                raise VersionNotExists('Version does not exist')
1577
        return props
1578

    
1579
    def _get_versions(self, nodes):
1580
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1581

    
1582
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1583
                               type=None, hash=None, checksum=None,
1584
                               cluster=CLUSTER_NORMAL, is_copy=False,
1585
                               update_statistics_ancestors_depth=None):
1586
        """Create a new version of the node."""
1587

    
1588
        props = self.node.version_lookup(
1589
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1590
        if props is not None:
1591
            src_version_id = props[self.SERIAL]
1592
            src_hash = props[self.HASH]
1593
            src_size = props[self.SIZE]
1594
            src_type = props[self.TYPE]
1595
            src_checksum = props[self.CHECKSUM]
1596
        else:
1597
            src_version_id = None
1598
            src_hash = None
1599
            src_size = 0
1600
            src_type = ''
1601
            src_checksum = ''
1602
        if size is None:  # Set metadata.
1603
            hash = src_hash  # This way hash can be set to None
1604
                             # (account or container).
1605
            size = src_size
1606
        if type is None:
1607
            type = src_type
1608
        if checksum is None:
1609
            checksum = src_checksum
1610
        uuid = self._generate_uuid(
1611
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1612

    
1613
        if src_node is None:
1614
            pre_version_id = src_version_id
1615
        else:
1616
            pre_version_id = None
1617
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1618
            if props is not None:
1619
                pre_version_id = props[self.SERIAL]
1620
        if pre_version_id is not None:
1621
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1622
                                        update_statistics_ancestors_depth)
1623

    
1624
        dest_version_id, mtime = self.node.version_create(
1625
            node, hash, size, type, src_version_id, user, uuid, checksum,
1626
            cluster, update_statistics_ancestors_depth)
1627

    
1628
        self.node.attribute_unset_is_latest(node, dest_version_id)
1629

    
1630
        return pre_version_id, dest_version_id
1631

    
1632
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1633
                                node, meta, replace=False):
1634
        if src_version_id is not None:
1635
            self.node.attribute_copy(src_version_id, dest_version_id)
1636
        if not replace:
1637
            self.node.attribute_del(dest_version_id, domain, (
1638
                k for k, v in meta.iteritems() if v == ''))
1639
            self.node.attribute_set(dest_version_id, domain, node, (
1640
                (k, v) for k, v in meta.iteritems() if v != ''))
1641
        else:
1642
            self.node.attribute_del(dest_version_id, domain)
1643
            self.node.attribute_set(dest_version_id, domain, node, ((
1644
                k, v) for k, v in meta.iteritems()))
1645

    
1646
    def _put_metadata(self, user, node, domain, meta, replace=False,
1647
                      update_statistics_ancestors_depth=None):
1648
        """Create a new version and store metadata."""
1649

    
1650
        src_version_id, dest_version_id = self._put_version_duplicate(
1651
            user, node,
1652
            update_statistics_ancestors_depth=
1653
            update_statistics_ancestors_depth)
1654
        self._put_metadata_duplicate(
1655
            src_version_id, dest_version_id, domain, node, meta, replace)
1656
        return src_version_id, dest_version_id
1657

    
1658
    def _list_limits(self, listing, marker, limit):
1659
        start = 0
1660
        if marker:
1661
            try:
1662
                start = listing.index(marker) + 1
1663
            except ValueError:
1664
                pass
1665
        if not limit or limit > 10000:
1666
            limit = 10000
1667
        return start, limit
1668

    
1669
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1670
                                marker=None, limit=10000, virtual=True,
1671
                                domain=None, keys=None, until=None,
1672
                                size_range=None, allowed=None,
1673
                                all_props=False):
1674
        keys = keys or []
1675
        allowed = allowed or []
1676
        cont_prefix = path + '/'
1677
        prefix = cont_prefix + prefix
1678
        start = cont_prefix + marker if marker else None
1679
        before = until if until is not None else inf
1680
        filterq = keys if domain else []
1681
        sizeq = size_range
1682

    
1683
        objects, prefixes = self.node.latest_version_list(
1684
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1685
            allowed, domain, filterq, sizeq, all_props)
1686
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1687
        objects.sort(key=lambda x: x[0])
1688
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1689
        return objects
1690

    
1691
    # Reporting functions.
1692

    
1693
    @debug_method
1694
    @backend_method
1695
    def _report_size_change(self, user, account, size, details=None):
1696
        details = details or {}
1697

    
1698
        if size == 0:
1699
            return
1700

    
1701
        account_node = self._lookup_account(account, True)[1]
1702
        total = self._get_statistics(account_node, compute=True)[1]
1703
        details.update({'user': user, 'total': total})
1704
        self.messages.append(
1705
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1706
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1707

    
1708
        if not self.using_external_quotaholder:
1709
            return
1710

    
1711
        try:
1712
            name = details['path'] if 'path' in details else ''
1713
            serial = self.astakosclient.issue_one_commission(
1714
                holder=account,
1715
                source=DEFAULT_SOURCE,
1716
                provisions={'pithos.diskspace': size},
1717
                name=name)
1718
        except BaseException, e:
1719
            raise QuotaError(e)
1720
        else:
1721
            self.serials.append(serial)
1722

    
1723
    @debug_method
1724
    @backend_method
1725
    def _report_object_change(self, user, account, path, details=None):
1726
        details = details or {}
1727
        details.update({'user': user})
1728
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1729
                              account, QUEUE_INSTANCE_ID, 'object', path,
1730
                              details))
1731

    
1732
    @debug_method
1733
    @backend_method
1734
    def _report_sharing_change(self, user, account, path, details=None):
1735
        details = details or {}
1736
        details.update({'user': user})
1737
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1738
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1739
                              details))
1740

    
1741
    # Policy functions.
1742

    
1743
    def _check_policy(self, policy, is_account_policy=True):
1744
        default_policy = self.default_account_policy \
1745
            if is_account_policy else self.default_container_policy
1746
        for k in policy.keys():
1747
            if policy[k] == '':
1748
                policy[k] = default_policy.get(k)
1749
        for k, v in policy.iteritems():
1750
            if k == 'quota':
1751
                q = int(v)  # May raise ValueError.
1752
                if q < 0:
1753
                    raise ValueError
1754
            elif k == 'versioning':
1755
                if v not in ['auto', 'none']:
1756
                    raise ValueError
1757
            else:
1758
                raise ValueError
1759

    
1760
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1761
        default_policy = self.default_account_policy \
1762
            if is_account_policy else self.default_container_policy
1763
        if replace:
1764
            for k, v in default_policy.iteritems():
1765
                if k not in policy:
1766
                    policy[k] = v
1767
        self.node.policy_set(node, policy)
1768

    
1769
    def _get_policy(self, node, is_account_policy=True):
1770
        default_policy = self.default_account_policy \
1771
            if is_account_policy else self.default_container_policy
1772
        policy = default_policy.copy()
1773
        policy.update(self.node.policy_get(node))
1774
        return policy
1775

    
1776
    def _apply_versioning(self, account, container, version_id,
1777
                          update_statistics_ancestors_depth=None):
1778
        """Delete the provided version if such is the policy.
1779
           Return size of object removed.
1780
        """
1781

    
1782
        if version_id is None:
1783
            return 0
1784
        path, node = self._lookup_container(account, container)
1785
        versioning = self._get_policy(
1786
            node, is_account_policy=False)['versioning']
1787
        if versioning != 'auto':
1788
            hash, size = self.node.version_remove(
1789
                version_id, update_statistics_ancestors_depth)
1790
            self.store.map_delete(hash)
1791
            return size
1792
        elif self.free_versioning:
1793
            return self.node.version_get_properties(
1794
                version_id, keys=('size',))[0]
1795
        return 0
1796

    
1797
    # Access control functions.
1798

    
1799
    def _check_groups(self, groups):
1800
        # raise ValueError('Bad characters in groups')
1801
        pass
1802

    
1803
    def _check_permissions(self, path, permissions):
1804
        # raise ValueError('Bad characters in permissions')
1805
        pass
1806

    
1807
    def _get_formatted_paths(self, paths):
1808
        formatted = []
1809
        if len(paths) == 0:
1810
            return formatted
1811
        props = self.node.get_props(paths)
1812
        if props:
1813
            for prop in props:
1814
                if prop[1].split(';', 1)[0].strip() in (
1815
                        'application/directory', 'application/folder'):
1816
                    formatted.append((prop[0].rstrip('/') + '/',
1817
                                      self.MATCH_PREFIX))
1818
                formatted.append((prop[0], self.MATCH_EXACT))
1819
        return formatted
1820

    
1821
    def _get_permissions_path(self, account, container, name):
1822
        path = '/'.join((account, container, name))
1823
        permission_paths = self.permissions.access_inherit(path)
1824
        permission_paths.sort()
1825
        permission_paths.reverse()
1826
        for p in permission_paths:
1827
            if p == path:
1828
                return p
1829
            else:
1830
                if p.count('/') < 2:
1831
                    continue
1832
                node = self.node.node_lookup(p)
1833
                props = None
1834
                if node is not None:
1835
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1836
                if props is not None:
1837
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1838
                            'application/directory', 'application/folder'):
1839
                        return p
1840
        return None
1841

    
1842
    def _get_permissions_path_bulk(self, account, container, names):
1843
        formatted_paths = []
1844
        for name in names:
1845
            path = '/'.join((account, container, name))
1846
            formatted_paths.append(path)
1847
        permission_paths = self.permissions.access_inherit_bulk(
1848
            formatted_paths)
1849
        permission_paths.sort()
1850
        permission_paths.reverse()
1851
        permission_paths_list = []
1852
        lookup_list = []
1853
        for p in permission_paths:
1854
            if p in formatted_paths:
1855
                permission_paths_list.append(p)
1856
            else:
1857
                if p.count('/') < 2:
1858
                    continue
1859
                lookup_list.append(p)
1860

    
1861
        if len(lookup_list) > 0:
1862
            props = self.node.get_props(lookup_list)
1863
            if props:
1864
                for prop in props:
1865
                    if prop[1].split(';', 1)[0].strip() in (
1866
                            'application/directory', 'application/folder'):
1867
                        permission_paths_list.append(prop[0])
1868

    
1869
        if len(permission_paths_list) > 0:
1870
            return permission_paths_list
1871

    
1872
        return None
1873

    
1874
    def _reset_allowed_paths(self):
1875
        self.read_allowed_paths = defaultdict(set)
1876
        self.write_allowed_paths = defaultdict(set)
1877

    
1878
    @check_allowed_paths(action=0)
1879
    def _can_read_account(self, user, account):
1880
        if user != account:
1881
            if account not in self._allowed_accounts(user):
1882
                raise NotAllowedError
1883

    
1884
    @check_allowed_paths(action=1)
1885
    def _can_write_account(self, user, account):
1886
        if user != account:
1887
            raise NotAllowedError
1888

    
1889
    @check_allowed_paths(action=0)
1890
    def _can_read_container(self, user, account, container):
1891
        if user != account:
1892
            if container not in self._allowed_containers(user, account):
1893
                raise NotAllowedError
1894

    
1895
    @check_allowed_paths(action=1)
1896
    def _can_write_container(self, user, account, container):
1897
        if user != account:
1898
            raise NotAllowedError
1899

    
1900
    @check_allowed_paths(action=0)
1901
    def _can_read_object(self, user, account, container, name):
1902
        if user == account:
1903
            return True
1904
        path = '/'.join((account, container, name))
1905
        if self.permissions.public_get(path) is not None:
1906
            return True
1907
        path = self._get_permissions_path(account, container, name)
1908
        if not path:
1909
            raise NotAllowedError
1910
        if (not self.permissions.access_check(path, self.READ, user) and not
1911
                self.permissions.access_check(path, self.WRITE, user)):
1912
            raise NotAllowedError
1913

    
1914
    @check_allowed_paths(action=1)
1915
    def _can_write_object(self, user, account, container, name):
1916
        if user == account:
1917
            return True
1918
        path = '/'.join((account, container, name))
1919
        path = self._get_permissions_path(account, container, name)
1920
        if not path:
1921
            raise NotAllowedError
1922
        if not self.permissions.access_check(path, self.WRITE, user):
1923
            raise NotAllowedError
1924

    
1925
    def _allowed_accounts(self, user):
1926
        allow = set()
1927
        for path in self.permissions.access_list_paths(user):
1928
            p = path.split('/', 1)[0]
1929
            allow.add(p)
1930
        self.read_allowed_paths[user] |= allow
1931
        return sorted(allow)
1932

    
1933
    def _allowed_containers(self, user, account):
1934
        allow = set()
1935
        for path in self.permissions.access_list_paths(user, account):
1936
            p = path.split('/', 2)[1]
1937
            allow.add(p)
1938
        self.read_allowed_paths[user] |= allow
1939
        return sorted(allow)
1940

    
1941
    # Domain functions
1942

    
1943
    @debug_method
1944
    @backend_method
1945
    def get_domain_objects(self, domain, user=None):
1946
        allowed_paths = self.permissions.access_list_paths(
1947
            user, include_owned=user is not None, include_containers=False)
1948
        if not allowed_paths:
1949
            return []
1950
        obj_list = self.node.domain_object_list(
1951
            domain, allowed_paths, CLUSTER_NORMAL)
1952
        return [(path,
1953
                 self._build_metadata(props, user_defined_meta),
1954
                 self.permissions.access_get(path)) for
1955
                path, props, user_defined_meta in obj_list]
1956

    
1957
    # util functions
1958

    
1959
    def _build_metadata(self, props, user_defined=None,
1960
                        include_user_defined=True):
1961
        meta = {'bytes': props[self.SIZE],
1962
                'type': props[self.TYPE],
1963
                'hash': props[self.HASH],
1964
                'version': props[self.SERIAL],
1965
                'version_timestamp': props[self.MTIME],
1966
                'modified_by': props[self.MUSER],
1967
                'uuid': props[self.UUID],
1968
                'checksum': props[self.CHECKSUM]}
1969
        if include_user_defined and user_defined is not None:
1970
            meta.update(user_defined)
1971
        return meta
1972

    
1973
    def _exists(self, node):
1974
        try:
1975
            self._get_version(node)
1976
        except ItemNotExists:
1977
            return False
1978
        else:
1979
            return True
1980

    
1981
    def _unhexlify_hash(self, hash):
1982
        try:
1983
            return binascii.unhexlify(hash)
1984
        except TypeError:
1985
            raise InvalidHash(hash)