Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 1e47e49d

History | View | Annotate | Download (78.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
                  InvalidHash, IllegalOperationError)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def check_allowed_paths(action):
169
    """Decorator for backend methods checking path access granted to user.
170

171
    The 1st argument of the decorated method is expected to be a
172
    ModularBackend instance, the 2nd the user performing the request and
173
    the path join of the rest arguments is supposed to be the requested path.
174

175
    The decorator checks whether the requested path is among the user's allowed
176
    cached paths.
177
    If this is the case, the decorator returns immediately to reduce the
178
    interactions with the database.
179
    Otherwise, it proceeds with the execution of the decorated method and if
180
    the method returns successfully (no exceptions are raised), the requested
181
    path is added to the user's cached allowed paths.
182

183
    :param action: (int) 0 for reads / 1 for writes
184
    :raises NotAllowedError: the user does not have access to the path
185
    """
186
    def decorator(func):
187
        @wraps(func)
188
        def wrapper(self, *args):
189
            user = args[0]
190
            if action == self.READ:
191
                d = self.read_allowed_paths
192
            else:
193
                d = self.write_allowed_paths
194
            path = '/'.join(args[1:])
195
            if path in d.get(user, []):
196
                return  # access is already checked
197
            else:
198
                func(self, *args)   # proceed with access check
199
                d[user].add(path)  # add path in the allowed user paths
200
        return wrapper
201
    return decorator
202

    
203

    
204
def list_method(func):
205
    @wraps(func)
206
    def wrapper(self, *args, **kw):
207
        marker = kw.get('marker')
208
        limit = kw.get('limit')
209
        result = func(self, *args, **kw)
210
        start, limit = self._list_limits(result, marker, limit)
211
        return result[start:start + limit]
212
    return wrapper
213

    
214

    
215
class ModularBackend(BaseBackend):
216
    """A modular backend.
217

218
    Uses modules for SQL functions and storage.
219
    """
220

    
221
    def __init__(self, db_module=None, db_connection=None,
222
                 block_module=None, block_path=None, block_umask=None,
223
                 block_size=None, hash_algorithm=None,
224
                 queue_module=None, queue_hosts=None, queue_exchange=None,
225
                 astakos_auth_url=None, service_token=None,
226
                 astakosclient_poolsize=None,
227
                 free_versioning=True, block_params=None,
228
                 public_url_security=None,
229
                 public_url_alphabet=None,
230
                 account_quota_policy=None,
231
                 container_quota_policy=None,
232
                 container_versioning_policy=None):
233
        db_module = db_module or DEFAULT_DB_MODULE
234
        db_connection = db_connection or DEFAULT_DB_CONNECTION
235
        block_module = block_module or DEFAULT_BLOCK_MODULE
236
        block_path = block_path or DEFAULT_BLOCK_PATH
237
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
238
        block_params = block_params or DEFAULT_BLOCK_PARAMS
239
        block_size = block_size or DEFAULT_BLOCK_SIZE
240
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
241
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
242
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
243
        container_quota_policy = container_quota_policy \
244
            or DEFAULT_CONTAINER_QUOTA
245
        container_versioning_policy = container_versioning_policy \
246
            or DEFAULT_CONTAINER_VERSIONING
247

    
248
        self.default_account_policy = {'quota': account_quota_policy}
249
        self.default_container_policy = {
250
            'quota': container_quota_policy,
251
            'versioning': container_versioning_policy
252
        }
253
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
254
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
255

    
256
        self.public_url_security = (public_url_security or
257
                                    DEFAULT_PUBLIC_URL_SECURITY)
258
        self.public_url_alphabet = (public_url_alphabet or
259
                                    DEFAULT_PUBLIC_URL_ALPHABET)
260

    
261
        self.hash_algorithm = hash_algorithm
262
        self.block_size = block_size
263
        self.free_versioning = free_versioning
264

    
265
        def load_module(m):
266
            __import__(m)
267
            return sys.modules[m]
268

    
269
        self.db_module = load_module(db_module)
270
        self.wrapper = self.db_module.DBWrapper(db_connection)
271
        params = {'wrapper': self.wrapper}
272
        self.permissions = self.db_module.Permissions(**params)
273
        self.config = self.db_module.Config(**params)
274
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
275
        for x in ['READ', 'WRITE']:
276
            setattr(self, x, getattr(self.db_module, x))
277
        self.node = self.db_module.Node(**params)
278
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
279
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
280
                  'MATCH_PREFIX', 'MATCH_EXACT']:
281
            setattr(self, x, getattr(self.db_module, x))
282

    
283
        self.ALLOWED = ['read', 'write']
284

    
285
        self.block_module = load_module(block_module)
286
        self.block_params = block_params
287
        params = {'path': block_path,
288
                  'block_size': self.block_size,
289
                  'hash_algorithm': self.hash_algorithm,
290
                  'umask': block_umask}
291
        params.update(self.block_params)
292
        self.store = self.block_module.Store(**params)
293

    
294
        if queue_module and queue_hosts:
295
            self.queue_module = load_module(queue_module)
296
            params = {'hosts': queue_hosts,
297
                      'exchange': queue_exchange,
298
                      'client_id': QUEUE_CLIENT_ID}
299
            self.queue = self.queue_module.Queue(**params)
300
        else:
301
            class NoQueue:
302
                def send(self, *args):
303
                    pass
304

    
305
                def close(self):
306
                    pass
307

    
308
            self.queue = NoQueue()
309

    
310
        self.astakos_auth_url = astakos_auth_url
311
        self.service_token = service_token
312

    
313
        if not astakos_auth_url or not AstakosClient:
314
            self.astakosclient = DisabledAstakosClient(
315
                service_token, astakos_auth_url,
316
                use_pool=True,
317
                pool_size=astakosclient_poolsize)
318
        else:
319
            self.astakosclient = AstakosClient(
320
                service_token, astakos_auth_url,
321
                use_pool=True,
322
                pool_size=astakosclient_poolsize)
323

    
324
        self.serials = []
325
        self.messages = []
326

    
327
        self._move_object = partial(self._copy_object, is_move=True)
328

    
329
        self.lock_container_path = False
330

    
331
        self.in_transaction = False
332

    
333
        self._reset_allowed_paths()
334

    
335
    def pre_exec(self, lock_container_path=False):
336
        self.lock_container_path = lock_container_path
337
        self.wrapper.execute()
338
        self.serials = []
339
        self._reset_allowed_paths()
340
        self.in_transaction = True
341

    
342
    def post_exec(self, success_status=True):
343
        if success_status:
344
            # send messages produced
345
            for m in self.messages:
346
                self.queue.send(*m)
347

    
348
            # register serials
349
            if self.serials:
350
                self.commission_serials.insert_many(
351
                    self.serials)
352

    
353
                # commit to ensure that the serials are registered
354
                # even if resolve commission fails
355
                self.wrapper.commit()
356

    
357
                # start new transaction
358
                self.wrapper.execute()
359

    
360
                r = self.astakosclient.resolve_commissions(
361
                    accept_serials=self.serials,
362
                    reject_serials=[])
363
                self.commission_serials.delete_many(
364
                    r['accepted'])
365

    
366
            self.wrapper.commit()
367
        else:
368
            if self.serials:
369
                r = self.astakosclient.resolve_commissions(
370
                    accept_serials=[],
371
                    reject_serials=self.serials)
372
                self.commission_serials.delete_many(
373
                    r['rejected'])
374
            self.wrapper.rollback()
375
        self.in_transaction = False
376

    
377
    def close(self):
378
        self.wrapper.close()
379
        self.queue.close()
380

    
381
    @property
382
    def using_external_quotaholder(self):
383
        return not isinstance(self.astakosclient, DisabledAstakosClient)
384

    
385
    @debug_method
386
    @backend_method
387
    @list_method
388
    def list_accounts(self, user, marker=None, limit=10000):
389
        """Return a list of accounts the user can access."""
390

    
391
        return self._allowed_accounts(user)
392

    
393
    def _get_account_quotas(self, account):
394
        """Get account usage from astakos."""
395

    
396
        quotas = self.astakosclient.service_get_quotas(account)[account]
397
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
398
                                                  {})
399

    
400
    def _get_account_quotas(self, account):
401
        """Get account usage from astakos."""
402

    
403
        quotas = self.astakosclient.service_get_quotas(account)[account]
404
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
405
                                                  {})
406

    
407
    @debug_method
408
    @backend_method
409
    def get_account_meta(self, user, account, domain, until=None,
410
                         include_user_defined=True):
411
        """Return a dictionary with the account metadata for the domain."""
412

    
413
        self._can_read_account(user, account)
414
        path, node = self._lookup_account(account, user == account)
415
        if user != account:
416
            if until or (node is None):
417
                raise NotAllowedError
418
        try:
419
            props = self._get_properties(node, until)
420
            mtime = props[self.MTIME]
421
        except NameError:
422
            props = None
423
            mtime = until
424
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
425
        tstamp = max(tstamp, mtime)
426
        if until is None:
427
            modified = tstamp
428
        else:
429
            modified = self._get_statistics(
430
                node, compute=True)[2]  # Overall last modification.
431
            modified = max(modified, mtime)
432

    
433
        if user != account:
434
            meta = {'name': account}
435
        else:
436
            meta = {}
437
            if props is not None and include_user_defined:
438
                meta.update(
439
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
440
            if until is not None:
441
                meta.update({'until_timestamp': tstamp})
442
            meta.update({'name': account, 'count': count, 'bytes': bytes})
443
            if self.using_external_quotaholder:
444
                external_quota = self._get_account_quotas(account)
445
                meta['bytes'] = external_quota.get('usage', 0)
446
        meta.update({'modified': modified})
447
        return meta
448

    
449
    @debug_method
450
    @backend_method
451
    def update_account_meta(self, user, account, domain, meta, replace=False):
452
        """Update the metadata associated with the account for the domain."""
453

    
454
        self._can_write_account(user, account)
455
        path, node = self._lookup_account(account, True)
456
        self._put_metadata(user, node, domain, meta, replace,
457
                           update_statistics_ancestors_depth=-1)
458

    
459
    @debug_method
460
    @backend_method
461
    def get_account_groups(self, user, account):
462
        """Return a dictionary with the user groups defined for the account."""
463

    
464
        self._can_read_account(user, account)
465
        if user != account:
466
            return {}
467
        self._lookup_account(account, True)
468
        return self.permissions.group_dict(account)
469

    
470
    @debug_method
471
    @backend_method
472
    def update_account_groups(self, user, account, groups, replace=False):
473
        """Update the groups associated with the account."""
474

    
475
        self._can_write_account(user, account)
476
        self._lookup_account(account, True)
477
        self._check_groups(groups)
478
        if replace:
479
            self.permissions.group_destroy(account)
480
        for k, v in groups.iteritems():
481
            if not replace:  # If not already deleted.
482
                self.permissions.group_delete(account, k)
483
            if v:
484
                self.permissions.group_addmany(account, k, v)
485

    
486
    @debug_method
487
    @backend_method
488
    def get_account_policy(self, user, account):
489
        """Return a dictionary with the account policy."""
490

    
491
        self._can_read_account(user, account)
492
        if user != account:
493
            return {}
494
        path, node = self._lookup_account(account, True)
495
        policy = self._get_policy(node, is_account_policy=True)
496
        if self.using_external_quotaholder:
497
            external_quota = self._get_account_quotas(account)
498
            policy['quota'] = external_quota.get('limit', 0)
499
        return policy
500

    
501
    @debug_method
502
    @backend_method
503
    def update_account_policy(self, user, account, policy, replace=False):
504
        """Update the policy associated with the account."""
505

    
506
        self._can_write_account(user, account)
507
        path, node = self._lookup_account(account, True)
508
        self._check_policy(policy, is_account_policy=True)
509
        self._put_policy(node, policy, replace, is_account_policy=True)
510

    
511
    @debug_method
512
    @backend_method
513
    def put_account(self, user, account, policy=None):
514
        """Create a new account with the given name."""
515

    
516
        policy = policy or {}
517
        self._can_write_account(user, account)
518
        node = self.node.node_lookup(account)
519
        if node is not None:
520
            raise AccountExists('Account already exists')
521
        if policy:
522
            self._check_policy(policy, is_account_policy=True)
523
        node = self._put_path(user, self.ROOTNODE, account,
524
                              update_statistics_ancestors_depth=-1)
525
        self._put_policy(node, policy, True, is_account_policy=True)
526

    
527
    @debug_method
528
    @backend_method
529
    def delete_account(self, user, account):
530
        """Delete the account with the given name."""
531

    
532
        self._can_write_account(user, account)
533
        node = self.node.node_lookup(account)
534
        if node is None:
535
            return
536
        if not self.node.node_remove(node,
537
                                     update_statistics_ancestors_depth=-1):
538
            raise AccountNotEmpty('Account is not empty')
539
        self.permissions.group_destroy(account)
540

    
541
        # remove all the cached allowed paths
542
        # removing the specific path could be more expensive
543
        self._reset_allowed_paths()
544

    
545
    @debug_method
546
    @backend_method
547
    @list_method
548
    def list_containers(self, user, account, marker=None, limit=10000,
549
                        shared=False, until=None, public=False):
550
        """Return a list of containers existing under an account."""
551

    
552
        self._can_read_account(user, account)
553
        if user != account:
554
            if until:
555
                raise NotAllowedError
556
            return self._allowed_containers(user, account)
557
        if shared or public:
558
            allowed = set()
559
            if shared:
560
                allowed.update([x.split('/', 2)[1] for x in
561
                               self.permissions.access_list_shared(account)])
562
            if public:
563
                allowed.update([x[0].split('/', 2)[1] for x in
564
                               self.permissions.public_list(account)])
565
            return sorted(allowed)
566
        node = self.node.node_lookup(account)
567
        return [x[0] for x in self._list_object_properties(
568
            node, account, '', '/', marker, limit, False, None, [], until)]
569

    
570
    @debug_method
571
    @backend_method
572
    def list_container_meta(self, user, account, container, domain,
573
                            until=None):
574
        """Return a list of the container's object meta keys for a domain."""
575

    
576
        self._can_read_container(user, account, container)
577
        allowed = []
578
        if user != account:
579
            if until:
580
                raise NotAllowedError
581
        path, node = self._lookup_container(account, container)
582
        before = until if until is not None else inf
583
        allowed = self._get_formatted_paths(allowed)
584
        return self.node.latest_attribute_keys(node, domain, before,
585
                                               CLUSTER_DELETED, allowed)
586

    
587
    @debug_method
588
    @backend_method
589
    def get_container_meta(self, user, account, container, domain, until=None,
590
                           include_user_defined=True):
591
        """Return a dictionary with the container metadata for the domain."""
592

    
593
        self._can_read_container(user, account, container)
594
        if user != account:
595
            if until:
596
                raise NotAllowedError
597
        path, node = self._lookup_container(account, container)
598
        props = self._get_properties(node, until)
599
        mtime = props[self.MTIME]
600
        count, bytes, tstamp = self._get_statistics(node, until)
601
        tstamp = max(tstamp, mtime)
602
        if until is None:
603
            modified = tstamp
604
        else:
605
            modified = self._get_statistics(
606
                node)[2]  # Overall last modification.
607
            modified = max(modified, mtime)
608

    
609
        if user != account:
610
            meta = {'name': container}
611
        else:
612
            meta = {}
613
            if include_user_defined:
614
                meta.update(
615
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
616
            if until is not None:
617
                meta.update({'until_timestamp': tstamp})
618
            meta.update({'name': container, 'count': count, 'bytes': bytes})
619
        meta.update({'modified': modified})
620
        return meta
621

    
622
    @debug_method
623
    @backend_method
624
    def update_container_meta(self, user, account, container, domain, meta,
625
                              replace=False):
626
        """Update the metadata associated with the container for the domain."""
627

    
628
        self._can_write_container(user, account, container)
629
        path, node = self._lookup_container(account, container)
630
        src_version_id, dest_version_id = self._put_metadata(
631
            user, node, domain, meta, replace,
632
            update_statistics_ancestors_depth=0)
633
        if src_version_id is not None:
634
            versioning = self._get_policy(
635
                node, is_account_policy=False)['versioning']
636
            if versioning != 'auto':
637
                self.node.version_remove(src_version_id,
638
                                         update_statistics_ancestors_depth=0)
639

    
640
    @debug_method
641
    @backend_method
642
    def get_container_policy(self, user, account, container):
643
        """Return a dictionary with the container policy."""
644

    
645
        self._can_read_container(user, account, container)
646
        if user != account:
647
            return {}
648
        path, node = self._lookup_container(account, container)
649
        return self._get_policy(node, is_account_policy=False)
650

    
651
    @debug_method
652
    @backend_method
653
    def update_container_policy(self, user, account, container, policy,
654
                                replace=False):
655
        """Update the policy associated with the container."""
656

    
657
        self._can_write_container(user, account, container)
658
        path, node = self._lookup_container(account, container)
659
        self._check_policy(policy, is_account_policy=False)
660
        self._put_policy(node, policy, replace, is_account_policy=False)
661

    
662
    @debug_method
663
    @backend_method
664
    def put_container(self, user, account, container, policy=None):
665
        """Create a new container with the given name."""
666

    
667
        policy = policy or {}
668
        self._can_write_container(user, account, container)
669
        try:
670
            path, node = self._lookup_container(account, container)
671
        except NameError:
672
            pass
673
        else:
674
            raise ContainerExists('Container already exists')
675
        if policy:
676
            self._check_policy(policy, is_account_policy=False)
677
        path = '/'.join((account, container))
678
        node = self._put_path(
679
            user, self._lookup_account(account, True)[1], path,
680
            update_statistics_ancestors_depth=-1)
681
        self._put_policy(node, policy, True, is_account_policy=False)
682

    
683
    @debug_method
684
    @backend_method
685
    def delete_container(self, user, account, container, until=None, prefix='',
686
                         delimiter=None):
687
        """Delete/purge the container with the given name."""
688

    
689
        self._can_write_container(user, account, container)
690
        path, node = self._lookup_container(account, container)
691

    
692
        if until is not None:
693
            hashes, size, serials = self.node.node_purge_children(
694
                node, until, CLUSTER_HISTORY,
695
                update_statistics_ancestors_depth=0)
696
            for h in hashes:
697
                self.store.map_delete(h)
698
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
699
                                          update_statistics_ancestors_depth=0)
700
            if not self.free_versioning:
701
                self._report_size_change(
702
                    user, account, -size, {
703
                        'action': 'container purge',
704
                        'path': path,
705
                        'versions': ','.join(str(i) for i in serials)
706
                    }
707
                )
708
            return
709

    
710
        if not delimiter:
711
            if self._get_statistics(node)[0] > 0:
712
                raise ContainerNotEmpty('Container is not empty')
713
            hashes, size, serials = self.node.node_purge_children(
714
                node, inf, CLUSTER_HISTORY,
715
                update_statistics_ancestors_depth=0)
716
            for h in hashes:
717
                self.store.map_delete(h)
718
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
719
                                          update_statistics_ancestors_depth=0)
720
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
721
            if not self.free_versioning:
722
                self._report_size_change(
723
                    user, account, -size, {
724
                        'action': 'container purge',
725
                        'path': path,
726
                        'versions': ','.join(str(i) for i in serials)
727
                    }
728
                )
729
        else:
730
            # remove only contents
731
            src_names = self._list_objects_no_limit(
732
                user, account, container, prefix='', delimiter=None,
733
                virtual=False, domain=None, keys=[], shared=False, until=None,
734
                size_range=None, all_props=True, public=False)
735
            paths = []
736
            for t in src_names:
737
                path = '/'.join((account, container, t[0]))
738
                node = t[2]
739
                if not self._exists(node):
740
                    continue
741
                src_version_id, dest_version_id = self._put_version_duplicate(
742
                    user, node, size=0, type='', hash=None, checksum='',
743
                    cluster=CLUSTER_DELETED,
744
                    update_statistics_ancestors_depth=1)
745
                del_size = self._apply_versioning(
746
                    account, container, src_version_id,
747
                    update_statistics_ancestors_depth=1)
748
                self._report_size_change(
749
                    user, account, -del_size, {
750
                        'action': 'object delete',
751
                        'path': path,
752
                        'versions': ','.join([str(dest_version_id)])})
753
                self._report_object_change(
754
                    user, account, path, details={'action': 'object delete'})
755
                paths.append(path)
756
            self.permissions.access_clear_bulk(paths)
757

    
758
        # remove all the cached allowed paths
759
        # removing the specific path could be more expensive
760
        self._reset_allowed_paths()
761

    
762
    def _list_objects(self, user, account, container, prefix, delimiter,
763
                      marker, limit, virtual, domain, keys, shared, until,
764
                      size_range, all_props, public):
765
        if user != account and until:
766
            raise NotAllowedError
767

    
768
        objects = []
769
        if shared and public:
770
            # get shared first
771
            shared_paths = self._list_object_permissions(
772
                user, account, container, prefix, shared=True, public=False)
773
            if shared_paths:
774
                path, node = self._lookup_container(account, container)
775
                shared_paths = self._get_formatted_paths(shared_paths)
776
                objects = set(self._list_object_properties(
777
                    node, path, prefix, delimiter, marker, limit, virtual,
778
                    domain, keys, until, size_range, shared_paths, all_props))
779

    
780
            # get public
781
            objects |= set(self._list_public_object_properties(
782
                user, account, container, prefix, all_props))
783
            objects = list(objects)
784

    
785
            objects.sort(key=lambda x: x[0])
786
        elif public:
787
            objects = self._list_public_object_properties(
788
                user, account, container, prefix, all_props)
789
        else:
790
            allowed = self._list_object_permissions(
791
                user, account, container, prefix, shared, public=False)
792
            if shared and not allowed:
793
                return []
794
            path, node = self._lookup_container(account, container)
795
            allowed = self._get_formatted_paths(allowed)
796
            objects = self._list_object_properties(
797
                node, path, prefix, delimiter, marker, limit, virtual, domain,
798
                keys, until, size_range, allowed, all_props)
799

    
800
        # apply limits
801
        start, limit = self._list_limits(objects, marker, limit)
802
        return objects[start:start + limit]
803

    
804
    def _list_public_object_properties(self, user, account, container, prefix,
805
                                       all_props):
806
        public = self._list_object_permissions(
807
            user, account, container, prefix, shared=False, public=True)
808
        paths, nodes = self._lookup_objects(public)
809
        path = '/'.join((account, container))
810
        cont_prefix = path + '/'
811
        paths = [x[len(cont_prefix):] for x in paths]
812
        objects = [(p,) + props for p, props in
813
                   zip(paths, self.node.version_lookup_bulk(
814
                       nodes, all_props=all_props, order_by_path=True))]
815
        return objects
816

    
817
    def _list_objects_no_limit(self, user, account, container, prefix,
818
                               delimiter, virtual, domain, keys, shared, until,
819
                               size_range, all_props, public):
820
        objects = []
821
        while True:
822
            marker = objects[-1] if objects else None
823
            limit = 10000
824
            l = self._list_objects(
825
                user, account, container, prefix, delimiter, marker, limit,
826
                virtual, domain, keys, shared, until, size_range, all_props,
827
                public)
828
            objects.extend(l)
829
            if not l or len(l) < limit:
830
                break
831
        return objects
832

    
833
    def _list_object_permissions(self, user, account, container, prefix,
834
                                 shared, public):
835
        allowed = []
836
        path = '/'.join((account, container, prefix)).rstrip('/')
837
        if user != account:
838
            allowed = self.permissions.access_list_paths(user, path)
839
            if not allowed:
840
                raise NotAllowedError
841
        else:
842
            allowed = set()
843
            if shared:
844
                allowed.update(self.permissions.access_list_shared(path))
845
            if public:
846
                allowed.update(
847
                    [x[0] for x in self.permissions.public_list(path)])
848
            allowed = sorted(allowed)
849
            if not allowed:
850
                return []
851
        return allowed
852

    
853
    @debug_method
854
    @backend_method
855
    def list_objects(self, user, account, container, prefix='', delimiter=None,
856
                     marker=None, limit=10000, virtual=True, domain=None,
857
                     keys=None, shared=False, until=None, size_range=None,
858
                     public=False):
859
        """List (object name, object version_id) under a container."""
860

    
861
        keys = keys or []
862
        return self._list_objects(
863
            user, account, container, prefix, delimiter, marker, limit,
864
            virtual, domain, keys, shared, until, size_range, False, public)
865

    
866
    @debug_method
867
    @backend_method
868
    def list_object_meta(self, user, account, container, prefix='',
869
                         delimiter=None, marker=None, limit=10000,
870
                         virtual=True, domain=None, keys=None, shared=False,
871
                         until=None, size_range=None, public=False):
872
        """Return a list of metadata dicts of objects under a container."""
873

    
874
        keys = keys or []
875
        props = self._list_objects(
876
            user, account, container, prefix, delimiter, marker, limit,
877
            virtual, domain, keys, shared, until, size_range, True, public)
878
        objects = []
879
        for p in props:
880
            if len(p) == 2:
881
                objects.append({'subdir': p[0]})
882
            else:
883
                objects.append({
884
                    'name': p[0],
885
                    'bytes': p[self.SIZE + 1],
886
                    'type': p[self.TYPE + 1],
887
                    'hash': p[self.HASH + 1],
888
                    'version': p[self.SERIAL + 1],
889
                    'version_timestamp': p[self.MTIME + 1],
890
                    'modified': p[self.MTIME + 1] if until is None else None,
891
                    'modified_by': p[self.MUSER + 1],
892
                    'uuid': p[self.UUID + 1],
893
                    'checksum': p[self.CHECKSUM + 1]})
894
        return objects
895

    
896
    @debug_method
897
    @backend_method
898
    def list_object_permissions(self, user, account, container, prefix=''):
899
        """Return a list of paths enforce permissions under a container."""
900

    
901
        return self._list_object_permissions(user, account, container, prefix,
902
                                             True, False)
903

    
904
    @debug_method
905
    @backend_method
906
    def list_object_public(self, user, account, container, prefix=''):
907
        """Return a mapping of object paths to public ids under a container."""
908

    
909
        public = {}
910
        for path, p in self.permissions.public_list('/'.join((account,
911
                                                              container,
912
                                                              prefix))):
913
            public[path] = p
914
        return public
915

    
916
    @debug_method
917
    @backend_method
918
    def get_object_meta(self, user, account, container, name, domain,
919
                        version=None, include_user_defined=True):
920
        """Return a dictionary with the object metadata for the domain."""
921

    
922
        self._can_read_object(user, account, container, name)
923
        path, node = self._lookup_object(account, container, name)
924
        props = self._get_version(node, version)
925
        if version is None:
926
            modified = props[self.MTIME]
927
        else:
928
            try:
929
                modified = self._get_version(
930
                    node)[self.MTIME]  # Overall last modification.
931
            except NameError:  # Object may be deleted.
932
                del_props = self.node.version_lookup(
933
                    node, inf, CLUSTER_DELETED)
934
                if del_props is None:
935
                    raise ItemNotExists('Object does not exist')
936
                modified = del_props[self.MTIME]
937

    
938
        meta = {}
939
        if include_user_defined:
940
            meta.update(
941
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
942
        meta.update({'name': name,
943
                     'bytes': props[self.SIZE],
944
                     'type': props[self.TYPE],
945
                     'hash': props[self.HASH],
946
                     'version': props[self.SERIAL],
947
                     'version_timestamp': props[self.MTIME],
948
                     'modified': modified,
949
                     'modified_by': props[self.MUSER],
950
                     'uuid': props[self.UUID],
951
                     'checksum': props[self.CHECKSUM]})
952
        return meta
953

    
954
    @debug_method
955
    @backend_method
956
    def update_object_meta(self, user, account, container, name, domain, meta,
957
                           replace=False):
958
        """Update object metadata for a domain and return the new version."""
959

    
960
        self._can_write_object(user, account, container, name)
961

    
962
        path, node = self._lookup_object(account, container, name,
963
                                         lock_container=True)
964
        src_version_id, dest_version_id = self._put_metadata(
965
            user, node, domain, meta, replace,
966
            update_statistics_ancestors_depth=1)
967
        self._apply_versioning(account, container, src_version_id,
968
                               update_statistics_ancestors_depth=1)
969
        return dest_version_id
970

    
971
    @debug_method
972
    @backend_method
973
    def get_object_permissions_bulk(self, user, account, container, names):
974
        """Return the action allowed on the object, the path
975
        from which the object gets its permissions from,
976
        along with a dictionary containing the permissions."""
977

    
978
        permissions_path = self._get_permissions_path_bulk(account, container,
979
                                                           names)
980
        access_objects = self.permissions.access_check_bulk(permissions_path,
981
                                                            user)
982
        #group_parents = access_objects['group_parents']
983
        nobject_permissions = {}
984
        cpath = '/'.join((account, container, ''))
985
        cpath_idx = len(cpath)
986
        for path in permissions_path:
987
            allowed = 1
988
            name = path[cpath_idx:]
989
            if user != account:
990
                try:
991
                    allowed = access_objects[path]
992
                except KeyError:
993
                    raise NotAllowedError
994
            access_dict, allowed = \
995
                self.permissions.access_get_for_bulk(access_objects[path])
996
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
997
                                         access_dict)
998
        self._lookup_objects(permissions_path)
999
        return nobject_permissions
1000

    
1001
    @debug_method
1002
    @backend_method
1003
    def get_object_permissions(self, user, account, container, name):
1004
        """Return the action allowed on the object, the path
1005
        from which the object gets its permissions from,
1006
        along with a dictionary containing the permissions."""
1007

    
1008
        allowed = 'write'
1009
        permissions_path = self._get_permissions_path(account, container, name)
1010
        if user != account:
1011
            if self.permissions.access_check(permissions_path, self.WRITE,
1012
                                             user):
1013
                allowed = 'write'
1014
            elif self.permissions.access_check(permissions_path, self.READ,
1015
                                               user):
1016
                allowed = 'read'
1017
            else:
1018
                raise NotAllowedError
1019
        self._lookup_object(account, container, name)
1020
        return (allowed,
1021
                permissions_path,
1022
                self.permissions.access_get(permissions_path))
1023

    
1024
    @debug_method
1025
    @backend_method
1026
    def update_object_permissions(self, user, account, container, name,
1027
                                  permissions):
1028
        """Update the permissions associated with the object."""
1029

    
1030
        if user != account:
1031
            raise NotAllowedError
1032
        path = self._lookup_object(account, container, name,
1033
                                   lock_container=True)[0]
1034
        self._check_permissions(path, permissions)
1035
        try:
1036
            self.permissions.access_set(path, permissions)
1037
        except:
1038
            raise ValueError
1039
        else:
1040
            self._report_sharing_change(user, account, path, {'members':
1041
                                        self.permissions.access_members(path)})
1042

    
1043
        # remove all the cached allowed paths
1044
        # filtering out only those affected could be more expensive
1045
        self._reset_allowed_paths()
1046

    
1047
    @debug_method
1048
    @backend_method
1049
    def get_object_public(self, user, account, container, name):
1050
        """Return the public id of the object if applicable."""
1051

    
1052
        self._can_read_object(user, account, container, name)
1053
        path = self._lookup_object(account, container, name)[0]
1054
        p = self.permissions.public_get(path)
1055
        return p
1056

    
1057
    @debug_method
1058
    @backend_method
1059
    def update_object_public(self, user, account, container, name, public):
1060
        """Update the public status of the object."""
1061

    
1062
        self._can_write_object(user, account, container, name)
1063
        path = self._lookup_object(account, container, name,
1064
                                   lock_container=True)[0]
1065
        if not public:
1066
            self.permissions.public_unset(path)
1067
        else:
1068
            self.permissions.public_set(
1069
                path, self.public_url_security, self.public_url_alphabet)
1070

    
1071
    @debug_method
1072
    @backend_method
1073
    def get_object_hashmap(self, user, account, container, name, version=None):
1074
        """Return the object's size and a list with partial hashes."""
1075

    
1076
        self._can_read_object(user, account, container, name)
1077
        path, node = self._lookup_object(account, container, name)
1078
        props = self._get_version(node, version)
1079
        if props[self.HASH] is None:
1080
            return 0, ()
1081
        if props[self.HASH].startswith('archip:'):
1082
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1083
                                                     props[self.SIZE])
1084
            return props[self.SIZE], [x for x in hashmap]
1085
        else:
1086
            hashmap = self.store.map_get(self._unhexlify_hash(
1087
                props[self.HASH]))
1088
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1089

    
1090
    def _update_object_hash(self, user, account, container, name, size, type,
1091
                            hash, checksum, domain, meta, replace_meta,
1092
                            permissions, src_node=None, src_version_id=None,
1093
                            is_copy=False, report_size_change=True):
1094
        if permissions is not None and user != account:
1095
            raise NotAllowedError
1096
        self._can_write_object(user, account, container, name)
1097
        if permissions is not None:
1098
            path = '/'.join((account, container, name))
1099
            self._check_permissions(path, permissions)
1100

    
1101
        account_path, account_node = self._lookup_account(account, True)
1102
        container_path, container_node = self._lookup_container(
1103
            account, container)
1104

    
1105
        path, node = self._put_object_node(
1106
            container_path, container_node, name)
1107
        pre_version_id, dest_version_id = self._put_version_duplicate(
1108
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1109
            checksum=checksum, is_copy=is_copy,
1110
            update_statistics_ancestors_depth=1)
1111

    
1112
        # Handle meta.
1113
        if src_version_id is None:
1114
            src_version_id = pre_version_id
1115
        self._put_metadata_duplicate(
1116
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1117

    
1118
        del_size = self._apply_versioning(account, container, pre_version_id,
1119
                                          update_statistics_ancestors_depth=1)
1120
        size_delta = size - del_size
1121
        if size_delta > 0:
1122
            # Check account quota.
1123
            if not self.using_external_quotaholder:
1124
                account_quota = long(self._get_policy(
1125
                    account_node, is_account_policy=True)['quota'])
1126
                account_usage = self._get_statistics(account_node,
1127
                                                     compute=True)[1]
1128
                if (account_quota > 0 and account_usage > account_quota):
1129
                    raise QuotaError(
1130
                        'Account quota exceeded: limit: %s, usage: %s' % (
1131
                            account_quota, account_usage))
1132

    
1133
            # Check container quota.
1134
            container_quota = long(self._get_policy(
1135
                container_node, is_account_policy=False)['quota'])
1136
            container_usage = self._get_statistics(container_node)[1]
1137
            if (container_quota > 0 and container_usage > container_quota):
1138
                # This must be executed in a transaction, so the version is
1139
                # never created if it fails.
1140
                raise QuotaError(
1141
                    'Container quota exceeded: limit: %s, usage: %s' % (
1142
                        container_quota, container_usage
1143
                    )
1144
                )
1145

    
1146
        if report_size_change:
1147
            self._report_size_change(
1148
                user, account, size_delta,
1149
                {'action': 'object update', 'path': path,
1150
                 'versions': ','.join([str(dest_version_id)])})
1151
        if permissions is not None:
1152
            self.permissions.access_set(path, permissions)
1153
            self._report_sharing_change(
1154
                user, account, path,
1155
                {'members': self.permissions.access_members(path)})
1156

    
1157
        self._report_object_change(
1158
            user, account, path,
1159
            details={'version': dest_version_id, 'action': 'object update'})
1160
        return dest_version_id
1161

    
1162
    @debug_method
1163
    def update_object_hashmap(self, user, account, container, name, size, type,
1164
                              hashmap, checksum, domain, meta=None,
1165
                              replace_meta=False, permissions=None):
1166
        """Create/update an object's hashmap and return the new version."""
1167

    
1168
        for h in hashmap:
1169
            if h.startswith('archip_'):
1170
                raise IllegalOperationError(
1171
                    'Cannot update Archipelago Volume hashmap.')
1172
        meta = meta or {}
1173
        if size == 0:  # No such thing as an empty hashmap.
1174
            hashmap = [self.put_block('')]
1175
        map = HashMap(self.block_size, self.hash_algorithm)
1176
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1177
        missing = self.store.block_search(map)
1178
        if missing:
1179
            ie = IndexError()
1180
            ie.data = [binascii.hexlify(x) for x in missing]
1181
            raise ie
1182

    
1183
        hash = map.hash()
1184
        hexlified = binascii.hexlify(hash)
1185
        # _update_object_hash() locks destination path
1186
        dest_version_id = self._update_object_hash(
1187
            user, account, container, name, size, type, hexlified, checksum,
1188
            domain, meta, replace_meta, permissions)
1189
        self.store.map_put(hash, map)
1190
        return dest_version_id, hexlified
1191

    
1192
    @debug_method
1193
    @backend_method
1194
    def update_object_checksum(self, user, account, container, name, version,
1195
                               checksum):
1196
        """Update an object's checksum."""
1197

    
1198
        # Update objects with greater version and same hashmap
1199
        # and size (fix metadata updates).
1200
        self._can_write_object(user, account, container, name)
1201
        path, node = self._lookup_object(account, container, name,
1202
                                         lock_container=True)
1203
        props = self._get_version(node, version)
1204
        versions = self.node.node_get_versions(node)
1205
        for x in versions:
1206
            if (x[self.SERIAL] >= int(version) and
1207
                x[self.HASH] == props[self.HASH] and
1208
                    x[self.SIZE] == props[self.SIZE]):
1209
                self.node.version_put_property(
1210
                    x[self.SERIAL], 'checksum', checksum)
1211

    
1212
    def _copy_object(self, user, src_account, src_container, src_name,
1213
                     dest_account, dest_container, dest_name, type,
1214
                     dest_domain=None, dest_meta=None, replace_meta=False,
1215
                     permissions=None, src_version=None, is_move=False,
1216
                     delimiter=None):
1217

    
1218
        report_size_change = not is_move
1219
        dest_meta = dest_meta or {}
1220
        dest_version_ids = []
1221
        self._can_read_object(user, src_account, src_container, src_name)
1222

    
1223
        src_container_path = '/'.join((src_account, src_container))
1224
        dest_container_path = '/'.join((dest_account, dest_container))
1225
        # Lock container paths in alphabetical order
1226
        if src_container_path < dest_container_path:
1227
            self._lookup_container(src_account, src_container)
1228
            self._lookup_container(dest_account, dest_container)
1229
        else:
1230
            self._lookup_container(dest_account, dest_container)
1231
            self._lookup_container(src_account, src_container)
1232

    
1233
        path, node = self._lookup_object(src_account, src_container, src_name)
1234
        # TODO: Will do another fetch of the properties in duplicate version...
1235
        props = self._get_version(
1236
            node, src_version)  # Check to see if source exists.
1237
        src_version_id = props[self.SERIAL]
1238
        hash = props[self.HASH]
1239
        size = props[self.SIZE]
1240
        is_copy = not is_move and (src_account, src_container, src_name) != (
1241
            dest_account, dest_container, dest_name)  # New uuid.
1242
        dest_version_ids.append(self._update_object_hash(
1243
            user, dest_account, dest_container, dest_name, size, type, hash,
1244
            None, dest_domain, dest_meta, replace_meta, permissions,
1245
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1246
            report_size_change=report_size_change))
1247
        if is_move and ((src_account, src_container, src_name) !=
1248
                        (dest_account, dest_container, dest_name)):
1249
            self._delete_object(user, src_account, src_container, src_name,
1250
                                report_size_change=report_size_change)
1251

    
1252
        if delimiter:
1253
            prefix = (src_name + delimiter if not
1254
                      src_name.endswith(delimiter) else src_name)
1255
            src_names = self._list_objects_no_limit(
1256
                user, src_account, src_container, prefix, delimiter=None,
1257
                virtual=False, domain=None, keys=[], shared=False, until=None,
1258
                size_range=None, all_props=True, public=False)
1259
            src_names.sort(key=lambda x: x[2])  # order by nodes
1260
            paths = [elem[0] for elem in src_names]
1261
            nodes = [elem[2] for elem in src_names]
1262
            # TODO: Will do another fetch of the properties
1263
            # in duplicate version...
1264
            props = self._get_versions(nodes)  # Check to see if source exists.
1265

    
1266
            for prop, path, node in zip(props, paths, nodes):
1267
                src_version_id = prop[self.SERIAL]
1268
                hash = prop[self.HASH]
1269
                vtype = prop[self.TYPE]
1270
                size = prop[self.SIZE]
1271
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1272
                    delimiter) else dest_name
1273
                vdest_name = path.replace(prefix, dest_prefix, 1)
1274
                # _update_object_hash() locks destination path
1275
                dest_version_ids.append(self._update_object_hash(
1276
                    user, dest_account, dest_container, vdest_name, size,
1277
                    vtype, hash, None, dest_domain, meta={},
1278
                    replace_meta=False, permissions=None, src_node=node,
1279
                    src_version_id=src_version_id, is_copy=is_copy,
1280
                    report_size_change=report_size_change))
1281
                if is_move and ((src_account, src_container, src_name) !=
1282
                                (dest_account, dest_container, dest_name)):
1283
                    self._delete_object(user, src_account, src_container, path,
1284
                                        report_size_change=report_size_change)
1285
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1286
                dest_version_ids)
1287

    
1288
    @debug_method
1289
    @backend_method
1290
    def copy_object(self, user, src_account, src_container, src_name,
1291
                    dest_account, dest_container, dest_name, type, domain,
1292
                    meta=None, replace_meta=False, permissions=None,
1293
                    src_version=None, delimiter=None):
1294
        """Copy an object's data and metadata."""
1295

    
1296
        meta = meta or {}
1297
        dest_version_id = self._copy_object(
1298
            user, src_account, src_container, src_name, dest_account,
1299
            dest_container, dest_name, type, domain, meta, replace_meta,
1300
            permissions, src_version, False, delimiter)
1301
        return dest_version_id
1302

    
1303
    @debug_method
1304
    @backend_method
1305
    def move_object(self, user, src_account, src_container, src_name,
1306
                    dest_account, dest_container, dest_name, type, domain,
1307
                    meta=None, replace_meta=False, permissions=None,
1308
                    delimiter=None):
1309
        """Move an object's data and metadata."""
1310

    
1311
        meta = meta or {}
1312
        if user != src_account:
1313
            raise NotAllowedError
1314
        dest_version_id = self._move_object(
1315
            user, src_account, src_container, src_name, dest_account,
1316
            dest_container, dest_name, type, domain, meta, replace_meta,
1317
            permissions, None, delimiter=delimiter)
1318
        return dest_version_id
1319

    
1320
    def _delete_object(self, user, account, container, name, until=None,
1321
                       delimiter=None, report_size_change=True):
1322
        if user != account:
1323
            raise NotAllowedError
1324

    
1325
        # lookup object and lock container path also
1326
        path, node = self._lookup_object(account, container, name,
1327
                                         lock_container=True)
1328

    
1329
        if until is not None:
1330
            if node is None:
1331
                return
1332
            hashes = []
1333
            size = 0
1334
            serials = []
1335
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1336
                                           update_statistics_ancestors_depth=1)
1337
            hashes += h
1338
            size += s
1339
            serials += v
1340
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1341
                                           update_statistics_ancestors_depth=1)
1342
            hashes += h
1343
            if not self.free_versioning:
1344
                size += s
1345
            serials += v
1346
            for h in hashes:
1347
                self.store.map_delete(h)
1348
            self.node.node_purge(node, until, CLUSTER_DELETED,
1349
                                 update_statistics_ancestors_depth=1)
1350
            try:
1351
                self._get_version(node)
1352
            except NameError:
1353
                self.permissions.access_clear(path)
1354
            self._report_size_change(
1355
                user, account, -size, {
1356
                    'action': 'object purge',
1357
                    'path': path,
1358
                    'versions': ','.join(str(i) for i in serials)
1359
                }
1360
            )
1361
            return
1362

    
1363
        if not self._exists(node):
1364
            raise ItemNotExists('Object is deleted.')
1365

    
1366
        src_version_id, dest_version_id = self._put_version_duplicate(
1367
            user, node, size=0, type='', hash=None, checksum='',
1368
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1369
        del_size = self._apply_versioning(account, container, src_version_id,
1370
                                          update_statistics_ancestors_depth=1)
1371
        if report_size_change:
1372
            self._report_size_change(
1373
                user, account, -del_size,
1374
                {'action': 'object delete',
1375
                 'path': path,
1376
                 'versions': ','.join([str(dest_version_id)])})
1377
        self._report_object_change(
1378
            user, account, path, details={'action': 'object delete'})
1379
        self.permissions.access_clear(path)
1380

    
1381
        if delimiter:
1382
            prefix = name + delimiter if not name.endswith(delimiter) else name
1383
            src_names = self._list_objects_no_limit(
1384
                user, account, container, prefix, delimiter=None,
1385
                virtual=False, domain=None, keys=[], shared=False, until=None,
1386
                size_range=None, all_props=True, public=False)
1387
            paths = []
1388
            for t in src_names:
1389
                path = '/'.join((account, container, t[0]))
1390
                node = t[2]
1391
                if not self._exists(node):
1392
                    continue
1393
                src_version_id, dest_version_id = self._put_version_duplicate(
1394
                    user, node, size=0, type='', hash=None, checksum='',
1395
                    cluster=CLUSTER_DELETED,
1396
                    update_statistics_ancestors_depth=1)
1397
                del_size = self._apply_versioning(
1398
                    account, container, src_version_id,
1399
                    update_statistics_ancestors_depth=1)
1400
                if report_size_change:
1401
                    self._report_size_change(
1402
                        user, account, -del_size,
1403
                        {'action': 'object delete',
1404
                         'path': path,
1405
                         'versions': ','.join([str(dest_version_id)])})
1406
                self._report_object_change(
1407
                    user, account, path, details={'action': 'object delete'})
1408
                paths.append(path)
1409
            self.permissions.access_clear_bulk(paths)
1410

    
1411
        # remove all the cached allowed paths
1412
        # removing the specific path could be more expensive
1413
        self._reset_allowed_paths()
1414

    
1415
    @debug_method
1416
    @backend_method
1417
    def delete_object(self, user, account, container, name, until=None,
1418
                      prefix='', delimiter=None):
1419
        """Delete/purge an object."""
1420

    
1421
        self._delete_object(user, account, container, name, until, delimiter)
1422

    
1423
    @debug_method
1424
    @backend_method
1425
    def list_versions(self, user, account, container, name):
1426
        """Return a list of all object (version, version_timestamp) tuples."""
1427

    
1428
        self._can_read_object(user, account, container, name)
1429
        path, node = self._lookup_object(account, container, name)
1430
        versions = self.node.node_get_versions(node)
1431
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1432
                x[self.CLUSTER] != CLUSTER_DELETED]
1433

    
1434
    @debug_method
1435
    @backend_method
1436
    def get_uuid(self, user, uuid, check_permissions=True):
1437
        """Return the (account, container, name) for the UUID given."""
1438

    
1439
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1440
        if info is None:
1441
            raise NameError
1442
        path, serial = info
1443
        account, container, name = path.split('/', 2)
1444
        if check_permissions:
1445
            self._can_read_object(user, account, container, name)
1446
        return (account, container, name)
1447

    
1448
    @debug_method
1449
    @backend_method
1450
    def get_public(self, user, public):
1451
        """Return the (account, container, name) for the public id given."""
1452

    
1453
        path = self.permissions.public_path(public)
1454
        if path is None:
1455
            raise NameError
1456
        account, container, name = path.split('/', 2)
1457
        self._can_read_object(user, account, container, name)
1458
        return (account, container, name)
1459

    
1460
    def get_block(self, hash):
1461
        """Return a block's data."""
1462

    
1463
        logger.debug("get_block: %s", hash)
1464
        if hash.startswith('archip_'):
1465
            block = self.store.block_get_archipelago(hash)
1466
        else:
1467
            block = self.store.block_get(self._unhexlify_hash(hash))
1468
        if not block:
1469
            raise ItemNotExists('Block does not exist')
1470
        return block
1471

    
1472
    def put_block(self, data):
1473
        """Store a block and return the hash."""
1474

    
1475
        logger.debug("put_block: %s", len(data))
1476
        return binascii.hexlify(self.store.block_put(data))
1477

    
1478
    def update_block(self, hash, data, offset=0):
1479
        """Update a known block and return the hash."""
1480

    
1481
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1482
        if hash.startswith('archip_'):
1483
            raise IllegalOperationError(
1484
                'Cannot update an Archipelago Volume block.')
1485
        if offset == 0 and len(data) == self.block_size:
1486
            return self.put_block(data)
1487
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1488
        return binascii.hexlify(h)
1489

    
1490
    # Path functions.
1491

    
1492
    def _generate_uuid(self):
1493
        return str(uuidlib.uuid4())
1494

    
1495
    def _put_object_node(self, path, parent, name):
1496
        path = '/'.join((path, name))
1497
        node = self.node.node_lookup(path)
1498
        if node is None:
1499
            node = self.node.node_create(parent, path)
1500
        return path, node
1501

    
1502
    def _put_path(self, user, parent, path,
1503
                  update_statistics_ancestors_depth=None):
1504
        node = self.node.node_create(parent, path)
1505
        self.node.version_create(node, None, 0, '', None, user,
1506
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1507
                                 update_statistics_ancestors_depth)
1508
        return node
1509

    
1510
    def _lookup_account(self, account, create=True):
1511
        node = self.node.node_lookup(account)
1512
        if node is None and create:
1513
            node = self._put_path(
1514
                account, self.ROOTNODE, account,
1515
                update_statistics_ancestors_depth=-1)  # User is account.
1516
        return account, node
1517

    
1518
    def _lookup_container(self, account, container):
1519
        for_update = True if self.lock_container_path else False
1520
        path = '/'.join((account, container))
1521
        node = self.node.node_lookup(path, for_update)
1522
        if node is None:
1523
            raise ItemNotExists('Container does not exist')
1524
        return path, node
1525

    
1526
    def _lookup_object(self, account, container, name, lock_container=False):
1527
        if lock_container:
1528
            self._lookup_container(account, container)
1529

    
1530
        path = '/'.join((account, container, name))
1531
        node = self.node.node_lookup(path)
1532
        if node is None:
1533
            raise ItemNotExists('Object does not exist')
1534
        return path, node
1535

    
1536
    def _lookup_objects(self, paths):
1537
        nodes = self.node.node_lookup_bulk(paths)
1538
        return paths, nodes
1539

    
1540
    def _get_properties(self, node, until=None):
1541
        """Return properties until the timestamp given."""
1542

    
1543
        before = until if until is not None else inf
1544
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1545
        if props is None and until is not None:
1546
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1547
        if props is None:
1548
            raise ItemNotExists('Path does not exist')
1549
        return props
1550

    
1551
    def _get_statistics(self, node, until=None, compute=False):
1552
        """Return (count, sum of size, timestamp) of everything under node."""
1553

    
1554
        if until is not None:
1555
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1556
        elif compute:
1557
            stats = self.node.statistics_latest(node,
1558
                                                except_cluster=CLUSTER_DELETED)
1559
        else:
1560
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1561
        if stats is None:
1562
            stats = (0, 0, 0)
1563
        return stats
1564

    
1565
    def _get_version(self, node, version=None):
1566
        if version is None:
1567
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1568
            if props is None:
1569
                raise ItemNotExists('Object does not exist')
1570
        else:
1571
            try:
1572
                version = int(version)
1573
            except ValueError:
1574
                raise VersionNotExists('Version does not exist')
1575
            props = self.node.version_get_properties(version, node=node)
1576
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1577
                raise VersionNotExists('Version does not exist')
1578
        return props
1579

    
1580
    def _get_versions(self, nodes):
1581
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1582

    
1583
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1584
                               type=None, hash=None, checksum=None,
1585
                               cluster=CLUSTER_NORMAL, is_copy=False,
1586
                               update_statistics_ancestors_depth=None):
1587
        """Create a new version of the node."""
1588

    
1589
        props = self.node.version_lookup(
1590
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1591
        if props is not None:
1592
            src_version_id = props[self.SERIAL]
1593
            src_hash = props[self.HASH]
1594
            src_size = props[self.SIZE]
1595
            src_type = props[self.TYPE]
1596
            src_checksum = props[self.CHECKSUM]
1597
        else:
1598
            src_version_id = None
1599
            src_hash = None
1600
            src_size = 0
1601
            src_type = ''
1602
            src_checksum = ''
1603
        if size is None:  # Set metadata.
1604
            hash = src_hash  # This way hash can be set to None
1605
                             # (account or container).
1606
            size = src_size
1607
        if type is None:
1608
            type = src_type
1609
        if checksum is None:
1610
            checksum = src_checksum
1611
        uuid = self._generate_uuid(
1612
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1613

    
1614
        if src_node is None:
1615
            pre_version_id = src_version_id
1616
        else:
1617
            pre_version_id = None
1618
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1619
            if props is not None:
1620
                pre_version_id = props[self.SERIAL]
1621
        if pre_version_id is not None:
1622
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1623
                                        update_statistics_ancestors_depth)
1624

    
1625
        dest_version_id, mtime = self.node.version_create(
1626
            node, hash, size, type, src_version_id, user, uuid, checksum,
1627
            cluster, update_statistics_ancestors_depth)
1628

    
1629
        self.node.attribute_unset_is_latest(node, dest_version_id)
1630

    
1631
        return pre_version_id, dest_version_id
1632

    
1633
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1634
                                node, meta, replace=False):
1635
        if src_version_id is not None:
1636
            self.node.attribute_copy(src_version_id, dest_version_id)
1637
        if not replace:
1638
            self.node.attribute_del(dest_version_id, domain, (
1639
                k for k, v in meta.iteritems() if v == ''))
1640
            self.node.attribute_set(dest_version_id, domain, node, (
1641
                (k, v) for k, v in meta.iteritems() if v != ''))
1642
        else:
1643
            self.node.attribute_del(dest_version_id, domain)
1644
            self.node.attribute_set(dest_version_id, domain, node, ((
1645
                k, v) for k, v in meta.iteritems()))
1646

    
1647
    def _put_metadata(self, user, node, domain, meta, replace=False,
1648
                      update_statistics_ancestors_depth=None):
1649
        """Create a new version and store metadata."""
1650

    
1651
        src_version_id, dest_version_id = self._put_version_duplicate(
1652
            user, node,
1653
            update_statistics_ancestors_depth=
1654
            update_statistics_ancestors_depth)
1655
        self._put_metadata_duplicate(
1656
            src_version_id, dest_version_id, domain, node, meta, replace)
1657
        return src_version_id, dest_version_id
1658

    
1659
    def _list_limits(self, listing, marker, limit):
1660
        start = 0
1661
        if marker:
1662
            try:
1663
                start = listing.index(marker) + 1
1664
            except ValueError:
1665
                pass
1666
        if not limit or limit > 10000:
1667
            limit = 10000
1668
        return start, limit
1669

    
1670
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1671
                                marker=None, limit=10000, virtual=True,
1672
                                domain=None, keys=None, until=None,
1673
                                size_range=None, allowed=None,
1674
                                all_props=False):
1675
        keys = keys or []
1676
        allowed = allowed or []
1677
        cont_prefix = path + '/'
1678
        prefix = cont_prefix + prefix
1679
        start = cont_prefix + marker if marker else None
1680
        before = until if until is not None else inf
1681
        filterq = keys if domain else []
1682
        sizeq = size_range
1683

    
1684
        objects, prefixes = self.node.latest_version_list(
1685
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1686
            allowed, domain, filterq, sizeq, all_props)
1687
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1688
        objects.sort(key=lambda x: x[0])
1689
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1690
        return objects
1691

    
1692
    # Reporting functions.
1693

    
1694
    @debug_method
1695
    @backend_method
1696
    def _report_size_change(self, user, account, size, details=None):
1697
        details = details or {}
1698

    
1699
        if size == 0:
1700
            return
1701

    
1702
        account_node = self._lookup_account(account, True)[1]
1703
        total = self._get_statistics(account_node, compute=True)[1]
1704
        details.update({'user': user, 'total': total})
1705
        self.messages.append(
1706
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1707
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1708

    
1709
        if not self.using_external_quotaholder:
1710
            return
1711

    
1712
        try:
1713
            name = details['path'] if 'path' in details else ''
1714
            serial = self.astakosclient.issue_one_commission(
1715
                holder=account,
1716
                source=DEFAULT_SOURCE,
1717
                provisions={'pithos.diskspace': size},
1718
                name=name)
1719
        except BaseException, e:
1720
            raise QuotaError(e)
1721
        else:
1722
            self.serials.append(serial)
1723

    
1724
    @debug_method
1725
    @backend_method
1726
    def _report_object_change(self, user, account, path, details=None):
1727
        details = details or {}
1728
        details.update({'user': user})
1729
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1730
                              account, QUEUE_INSTANCE_ID, 'object', path,
1731
                              details))
1732

    
1733
    @debug_method
1734
    @backend_method
1735
    def _report_sharing_change(self, user, account, path, details=None):
1736
        details = details or {}
1737
        details.update({'user': user})
1738
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1739
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1740
                              details))
1741

    
1742
    # Policy functions.
1743

    
1744
    def _check_policy(self, policy, is_account_policy=True):
1745
        default_policy = self.default_account_policy \
1746
            if is_account_policy else self.default_container_policy
1747
        for k in policy.keys():
1748
            if policy[k] == '':
1749
                policy[k] = default_policy.get(k)
1750
        for k, v in policy.iteritems():
1751
            if k == 'quota':
1752
                q = int(v)  # May raise ValueError.
1753
                if q < 0:
1754
                    raise ValueError
1755
            elif k == 'versioning':
1756
                if v not in ['auto', 'none']:
1757
                    raise ValueError
1758
            else:
1759
                raise ValueError
1760

    
1761
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1762
        default_policy = self.default_account_policy \
1763
            if is_account_policy else self.default_container_policy
1764
        if replace:
1765
            for k, v in default_policy.iteritems():
1766
                if k not in policy:
1767
                    policy[k] = v
1768
        self.node.policy_set(node, policy)
1769

    
1770
    def _get_policy(self, node, is_account_policy=True):
1771
        default_policy = self.default_account_policy \
1772
            if is_account_policy else self.default_container_policy
1773
        policy = default_policy.copy()
1774
        policy.update(self.node.policy_get(node))
1775
        return policy
1776

    
1777
    def _apply_versioning(self, account, container, version_id,
1778
                          update_statistics_ancestors_depth=None):
1779
        """Delete the provided version if such is the policy.
1780
           Return size of object removed.
1781
        """
1782

    
1783
        if version_id is None:
1784
            return 0
1785
        path, node = self._lookup_container(account, container)
1786
        versioning = self._get_policy(
1787
            node, is_account_policy=False)['versioning']
1788
        if versioning != 'auto':
1789
            hash, size = self.node.version_remove(
1790
                version_id, update_statistics_ancestors_depth)
1791
            self.store.map_delete(hash)
1792
            return size
1793
        elif self.free_versioning:
1794
            return self.node.version_get_properties(
1795
                version_id, keys=('size',))[0]
1796
        return 0
1797

    
1798
    # Access control functions.
1799

    
1800
    def _check_groups(self, groups):
1801
        # raise ValueError('Bad characters in groups')
1802
        pass
1803

    
1804
    def _check_permissions(self, path, permissions):
1805
        # raise ValueError('Bad characters in permissions')
1806
        pass
1807

    
1808
    def _get_formatted_paths(self, paths):
1809
        formatted = []
1810
        if len(paths) == 0:
1811
            return formatted
1812
        props = self.node.get_props(paths)
1813
        if props:
1814
            for prop in props:
1815
                if prop[1].split(';', 1)[0].strip() in (
1816
                        'application/directory', 'application/folder'):
1817
                    formatted.append((prop[0].rstrip('/') + '/',
1818
                                      self.MATCH_PREFIX))
1819
                formatted.append((prop[0], self.MATCH_EXACT))
1820
        return formatted
1821

    
1822
    def _get_permissions_path(self, account, container, name):
1823
        path = '/'.join((account, container, name))
1824
        permission_paths = self.permissions.access_inherit(path)
1825
        permission_paths.sort()
1826
        permission_paths.reverse()
1827
        for p in permission_paths:
1828
            if p == path:
1829
                return p
1830
            else:
1831
                if p.count('/') < 2:
1832
                    continue
1833
                node = self.node.node_lookup(p)
1834
                props = None
1835
                if node is not None:
1836
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1837
                if props is not None:
1838
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1839
                            'application/directory', 'application/folder'):
1840
                        return p
1841
        return None
1842

    
1843
    def _get_permissions_path_bulk(self, account, container, names):
1844
        formatted_paths = []
1845
        for name in names:
1846
            path = '/'.join((account, container, name))
1847
            formatted_paths.append(path)
1848
        permission_paths = self.permissions.access_inherit_bulk(
1849
            formatted_paths)
1850
        permission_paths.sort()
1851
        permission_paths.reverse()
1852
        permission_paths_list = []
1853
        lookup_list = []
1854
        for p in permission_paths:
1855
            if p in formatted_paths:
1856
                permission_paths_list.append(p)
1857
            else:
1858
                if p.count('/') < 2:
1859
                    continue
1860
                lookup_list.append(p)
1861

    
1862
        if len(lookup_list) > 0:
1863
            props = self.node.get_props(lookup_list)
1864
            if props:
1865
                for prop in props:
1866
                    if prop[1].split(';', 1)[0].strip() in (
1867
                            'application/directory', 'application/folder'):
1868
                        permission_paths_list.append(prop[0])
1869

    
1870
        if len(permission_paths_list) > 0:
1871
            return permission_paths_list
1872

    
1873
        return None
1874

    
1875
    def _reset_allowed_paths(self):
1876
        self.read_allowed_paths = defaultdict(set)
1877
        self.write_allowed_paths = defaultdict(set)
1878

    
1879
    @check_allowed_paths(action=0)
1880
    def _can_read_account(self, user, account):
1881
        if user != account:
1882
            if account not in self._allowed_accounts(user):
1883
                raise NotAllowedError
1884

    
1885
    @check_allowed_paths(action=1)
1886
    def _can_write_account(self, user, account):
1887
        if user != account:
1888
            raise NotAllowedError
1889

    
1890
    @check_allowed_paths(action=0)
1891
    def _can_read_container(self, user, account, container):
1892
        if user != account:
1893
            if container not in self._allowed_containers(user, account):
1894
                raise NotAllowedError
1895

    
1896
    @check_allowed_paths(action=1)
1897
    def _can_write_container(self, user, account, container):
1898
        if user != account:
1899
            raise NotAllowedError
1900

    
1901
    @check_allowed_paths(action=0)
1902
    def _can_read_object(self, user, account, container, name):
1903
        if user == account:
1904
            return True
1905
        path = '/'.join((account, container, name))
1906
        if self.permissions.public_get(path) is not None:
1907
            return True
1908
        path = self._get_permissions_path(account, container, name)
1909
        if not path:
1910
            raise NotAllowedError
1911
        if (not self.permissions.access_check(path, self.READ, user) and not
1912
                self.permissions.access_check(path, self.WRITE, user)):
1913
            raise NotAllowedError
1914

    
1915
    @check_allowed_paths(action=1)
1916
    def _can_write_object(self, user, account, container, name):
1917
        if user == account:
1918
            return True
1919
        path = '/'.join((account, container, name))
1920
        path = self._get_permissions_path(account, container, name)
1921
        if not path:
1922
            raise NotAllowedError
1923
        if not self.permissions.access_check(path, self.WRITE, user):
1924
            raise NotAllowedError
1925

    
1926
    def _allowed_accounts(self, user):
1927
        allow = set()
1928
        for path in self.permissions.access_list_paths(user):
1929
            p = path.split('/', 1)[0]
1930
            allow.add(p)
1931
        self.read_allowed_paths[user] |= allow
1932
        return sorted(allow)
1933

    
1934
    def _allowed_containers(self, user, account):
1935
        allow = set()
1936
        for path in self.permissions.access_list_paths(user, account):
1937
            p = path.split('/', 2)[1]
1938
            allow.add(p)
1939
        self.read_allowed_paths[user] |= allow
1940
        return sorted(allow)
1941

    
1942
    # Domain functions
1943

    
1944
    @debug_method
1945
    @backend_method
1946
    def get_domain_objects(self, domain, user=None):
1947
        allowed_paths = self.permissions.access_list_paths(
1948
            user, include_owned=user is not None, include_containers=False)
1949
        if not allowed_paths:
1950
            return []
1951
        obj_list = self.node.domain_object_list(
1952
            domain, allowed_paths, CLUSTER_NORMAL)
1953
        return [(path,
1954
                 self._build_metadata(props, user_defined_meta),
1955
                 self.permissions.access_get(path)) for
1956
                path, props, user_defined_meta in obj_list]
1957

    
1958
    # util functions
1959

    
1960
    def _build_metadata(self, props, user_defined=None,
1961
                        include_user_defined=True):
1962
        meta = {'bytes': props[self.SIZE],
1963
                'type': props[self.TYPE],
1964
                'hash': props[self.HASH],
1965
                'version': props[self.SERIAL],
1966
                'version_timestamp': props[self.MTIME],
1967
                'modified_by': props[self.MUSER],
1968
                'uuid': props[self.UUID],
1969
                'checksum': props[self.CHECKSUM]}
1970
        if include_user_defined and user_defined is not None:
1971
            meta.update(user_defined)
1972
        return meta
1973

    
1974
    def _exists(self, node):
1975
        try:
1976
            self._get_version(node)
1977
        except ItemNotExists:
1978
            return False
1979
        else:
1980
            return True
1981

    
1982
    def _unhexlify_hash(self, hash):
1983
        try:
1984
            return binascii.unhexlify(hash)
1985
        except TypeError:
1986
            raise InvalidHash(hash)