Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 809b482a

History | View | Annotate | Download (80.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
try:
45
    from astakosclient import AstakosClient
46
except ImportError:
47
    AstakosClient = None
48

    
49
from pithos.backends.base import (
50
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
51
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
52
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
53
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
54
    InvalidHash, IllegalOperationError)
55

    
56

    
57
class DisabledAstakosClient(object):
58
    def __init__(self, *args, **kwargs):
59
        self.args = args
60
        self.kwargs = kwargs
61

    
62
    def __getattr__(self, name):
63
        m = ("AstakosClient has been disabled, "
64
             "yet an attempt to access it was made")
65
        raise AssertionError(m)
66

    
67

    
68
# Stripped-down version of the HashMap class found in tools.
69

    
70
class HashMap(list):
71

    
72
    def __init__(self, blocksize, blockhash):
73
        super(HashMap, self).__init__()
74
        self.blocksize = blocksize
75
        self.blockhash = blockhash
76

    
77
    def _hash_raw(self, v):
78
        h = hashlib.new(self.blockhash)
79
        h.update(v)
80
        return h.digest()
81

    
82
    def hash(self):
83
        if len(self) == 0:
84
            return self._hash_raw('')
85
        if len(self) == 1:
86
            return self.__getitem__(0)
87

    
88
        h = list(self)
89
        s = 2
90
        while s < len(h):
91
            s = s * 2
92
        h += [('\x00' * len(h[0]))] * (s - len(h))
93
        while len(h) > 1:
94
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
95
        return h[0]
96

    
97
# Default modules and settings.
98
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
99
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
100
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
101
DEFAULT_BLOCK_PATH = 'data/'
102
DEFAULT_BLOCK_UMASK = 0o022
103
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
104
DEFAULT_HASH_ALGORITHM = 'sha256'
105
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
106
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
107
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
108
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
109
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
110
                               'abcdefghijklmnopqrstuvwxyz'
111
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
112
DEFAULT_PUBLIC_URL_SECURITY = 16
113

    
114
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
115
QUEUE_CLIENT_ID = 'pithos'
116
QUEUE_INSTANCE_ID = '1'
117

    
118
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
119

    
120
inf = float('inf')
121

    
122
ULTIMATE_ANSWER = 42
123

    
124
DEFAULT_SOURCE = 'system'
125
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
126

    
127
logger = logging.getLogger(__name__)
128

    
129

    
130
def backend_method(func):
131
    @wraps(func)
132
    def wrapper(self, *args, **kw):
133
        # if we are inside a database transaction
134
        # just proceed with the method execution
135
        # otherwise manage a new transaction
136
        if self.in_transaction:
137
            return func(self, *args, **kw)
138

    
139
        try:
140
            self.pre_exec()
141
            result = func(self, *args, **kw)
142
            success_status = True
143
            return result
144
        except:
145
            success_status = False
146
            raise
147
        finally:
148
            self.post_exec(success_status)
149
    return wrapper
150

    
151

    
152
def debug_method(func):
153
    @wraps(func)
154
    def wrapper(self, *args, **kw):
155
        try:
156
            result = func(self, *args, **kw)
157
            return result
158
        except:
159
            result = format_exc()
160
            raise
161
        finally:
162
            all_args = map(repr, args)
163
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
164
            logger.debug(">>> %s(%s) <<< %s" % (
165
                func.__name__, ', '.join(all_args).rstrip(', '), result))
166
    return wrapper
167

    
168

    
169
def check_allowed_paths(action):
170
    """Decorator for backend methods checking path access granted to user.
171

172
    The 1st argument of the decorated method is expected to be a
173
    ModularBackend instance, the 2nd the user performing the request and
174
    the path join of the rest arguments is supposed to be the requested path.
175

176
    The decorator checks whether the requested path is among the user's allowed
177
    cached paths.
178
    If this is the case, the decorator returns immediately to reduce the
179
    interactions with the database.
180
    Otherwise, it proceeds with the execution of the decorated method and if
181
    the method returns successfully (no exceptions are raised), the requested
182
    path is added to the user's cached allowed paths.
183

184
    :param action: (int) 0 for reads / 1 for writes
185
    :raises NotAllowedError: the user does not have access to the path
186
    """
187
    def decorator(func):
188
        @wraps(func)
189
        def wrapper(self, *args):
190
            user = args[0]
191
            if action == self.READ:
192
                d = self.read_allowed_paths
193
            else:
194
                d = self.write_allowed_paths
195
            path = '/'.join(args[1:])
196
            if path in d.get(user, []):
197
                return  # access is already checked
198
            else:
199
                func(self, *args)   # proceed with access check
200
                d[user].add(path)  # add path in the allowed user paths
201
        return wrapper
202
    return decorator
203

    
204

    
205
def list_method(func):
206
    @wraps(func)
207
    def wrapper(self, *args, **kw):
208
        marker = kw.get('marker')
209
        limit = kw.get('limit')
210
        result = func(self, *args, **kw)
211
        start, limit = self._list_limits(result, marker, limit)
212
        return result[start:start + limit]
213
    return wrapper
214

    
215

    
216
class ModularBackend(BaseBackend):
217
    """A modular backend.
218

219
    Uses modules for SQL functions and storage.
220
    """
221

    
222
    def __init__(self, db_module=None, db_connection=None,
223
                 block_module=None, block_path=None, block_umask=None,
224
                 block_size=None, hash_algorithm=None,
225
                 queue_module=None, queue_hosts=None, queue_exchange=None,
226
                 astakos_auth_url=None, service_token=None,
227
                 astakosclient_poolsize=None,
228
                 free_versioning=True, block_params=None,
229
                 public_url_security=None,
230
                 public_url_alphabet=None,
231
                 account_quota_policy=None,
232
                 container_quota_policy=None,
233
                 container_versioning_policy=None):
234
        db_module = db_module or DEFAULT_DB_MODULE
235
        db_connection = db_connection or DEFAULT_DB_CONNECTION
236
        block_module = block_module or DEFAULT_BLOCK_MODULE
237
        block_path = block_path or DEFAULT_BLOCK_PATH
238
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
239
        block_params = block_params or DEFAULT_BLOCK_PARAMS
240
        block_size = block_size or DEFAULT_BLOCK_SIZE
241
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
242
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
243
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
244
        container_quota_policy = container_quota_policy \
245
            or DEFAULT_CONTAINER_QUOTA
246
        container_versioning_policy = container_versioning_policy \
247
            or DEFAULT_CONTAINER_VERSIONING
248

    
249
        self.default_account_policy = {'quota': account_quota_policy}
250
        self.default_container_policy = {
251
            'quota': container_quota_policy,
252
            'versioning': container_versioning_policy
253
        }
254
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
255
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
256

    
257
        self.public_url_security = (public_url_security or
258
                                    DEFAULT_PUBLIC_URL_SECURITY)
259
        self.public_url_alphabet = (public_url_alphabet or
260
                                    DEFAULT_PUBLIC_URL_ALPHABET)
261

    
262
        self.hash_algorithm = hash_algorithm
263
        self.block_size = block_size
264
        self.free_versioning = free_versioning
265

    
266
        def load_module(m):
267
            __import__(m)
268
            return sys.modules[m]
269

    
270
        self.db_module = load_module(db_module)
271
        self.wrapper = self.db_module.DBWrapper(db_connection)
272
        params = {'wrapper': self.wrapper}
273
        self.permissions = self.db_module.Permissions(**params)
274
        self.config = self.db_module.Config(**params)
275
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
276
        for x in ['READ', 'WRITE']:
277
            setattr(self, x, getattr(self.db_module, x))
278
        self.node = self.db_module.Node(**params)
279
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
280
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
281
                  'MATCH_PREFIX', 'MATCH_EXACT']:
282
            setattr(self, x, getattr(self.db_module, x))
283

    
284
        self.ALLOWED = ['read', 'write']
285

    
286
        self.block_module = load_module(block_module)
287
        self.block_params = block_params
288
        params = {'path': block_path,
289
                  'block_size': self.block_size,
290
                  'hash_algorithm': self.hash_algorithm,
291
                  'umask': block_umask}
292
        params.update(self.block_params)
293
        self.store = self.block_module.Store(**params)
294

    
295
        if queue_module and queue_hosts:
296
            self.queue_module = load_module(queue_module)
297
            params = {'hosts': queue_hosts,
298
                      'exchange': queue_exchange,
299
                      'client_id': QUEUE_CLIENT_ID}
300
            self.queue = self.queue_module.Queue(**params)
301
        else:
302
            class NoQueue:
303
                def send(self, *args):
304
                    pass
305

    
306
                def close(self):
307
                    pass
308

    
309
            self.queue = NoQueue()
310

    
311
        self.astakos_auth_url = astakos_auth_url
312
        self.service_token = service_token
313

    
314
        if not astakos_auth_url or not AstakosClient:
315
            self.astakosclient = DisabledAstakosClient(
316
                service_token, astakos_auth_url,
317
                use_pool=True,
318
                pool_size=astakosclient_poolsize)
319
        else:
320
            self.astakosclient = AstakosClient(
321
                service_token, astakos_auth_url,
322
                use_pool=True,
323
                pool_size=astakosclient_poolsize)
324

    
325
        self.serials = []
326
        self.messages = []
327

    
328
        self._move_object = partial(self._copy_object, is_move=True)
329

    
330
        self.lock_container_path = False
331

    
332
        self.in_transaction = False
333

    
334
        self._reset_allowed_paths()
335

    
336
    def pre_exec(self, lock_container_path=False):
337
        self.lock_container_path = lock_container_path
338
        self.wrapper.execute()
339
        self.serials = []
340
        self._reset_allowed_paths()
341
        self.in_transaction = True
342

    
343
    def post_exec(self, success_status=True):
344
        if success_status:
345
            # send messages produced
346
            for m in self.messages:
347
                self.queue.send(*m)
348

    
349
            # register serials
350
            if self.serials:
351
                self.commission_serials.insert_many(
352
                    self.serials)
353

    
354
                # commit to ensure that the serials are registered
355
                # even if resolve commission fails
356
                self.wrapper.commit()
357

    
358
                # start new transaction
359
                self.wrapper.execute()
360

    
361
                r = self.astakosclient.resolve_commissions(
362
                    accept_serials=self.serials,
363
                    reject_serials=[])
364
                self.commission_serials.delete_many(
365
                    r['accepted'])
366

    
367
            self.wrapper.commit()
368
        else:
369
            if self.serials:
370
                r = self.astakosclient.resolve_commissions(
371
                    accept_serials=[],
372
                    reject_serials=self.serials)
373
                self.commission_serials.delete_many(
374
                    r['rejected'])
375
            self.wrapper.rollback()
376
        self.in_transaction = False
377

    
378
    def close(self):
379
        self.wrapper.close()
380
        self.queue.close()
381

    
382
    @property
383
    def using_external_quotaholder(self):
384
        return not isinstance(self.astakosclient, DisabledAstakosClient)
385

    
386
    @debug_method
387
    @backend_method
388
    @list_method
389
    def list_accounts(self, user, marker=None, limit=10000):
390
        """Return a list of accounts the user can access."""
391

    
392
        return self._allowed_accounts(user)
393

    
394
    def _get_account_quotas(self, account):
395
        """Get account usage from astakos."""
396

    
397
        quotas = self.astakosclient.service_get_quotas(account)[account]
398
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
399
                                                  {})
400

    
401
    def _get_account_quotas(self, account):
402
        """Get account usage from astakos."""
403

    
404
        quotas = self.astakosclient.service_get_quotas(account)[account]
405
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
406
                                                  {})
407

    
408
    @debug_method
409
    @backend_method
410
    def get_account_meta(self, user, account, domain, until=None,
411
                         include_user_defined=True):
412
        """Return a dictionary with the account metadata for the domain."""
413

    
414
        self._can_read_account(user, account)
415
        path, node = self._lookup_account(account, user == account)
416
        if user != account:
417
            if until or (node is None):
418
                raise NotAllowedError
419
        try:
420
            props = self._get_properties(node, until)
421
            mtime = props[self.MTIME]
422
        except NameError:
423
            props = None
424
            mtime = until
425
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
426
        tstamp = max(tstamp, mtime)
427
        if until is None:
428
            modified = tstamp
429
        else:
430
            modified = self._get_statistics(
431
                node, compute=True)[2]  # Overall last modification.
432
            modified = max(modified, mtime)
433

    
434
        if user != account:
435
            meta = {'name': account}
436
        else:
437
            meta = {}
438
            if props is not None and include_user_defined:
439
                meta.update(
440
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
441
            if until is not None:
442
                meta.update({'until_timestamp': tstamp})
443
            meta.update({'name': account, 'count': count, 'bytes': bytes})
444
            if self.using_external_quotaholder:
445
                external_quota = self._get_account_quotas(account)
446
                meta['bytes'] = external_quota.get('usage', 0)
447
        meta.update({'modified': modified})
448
        return meta
449

    
450
    @debug_method
451
    @backend_method
452
    def update_account_meta(self, user, account, domain, meta, replace=False):
453
        """Update the metadata associated with the account for the domain."""
454

    
455
        self._can_write_account(user, account)
456
        path, node = self._lookup_account(account, True)
457
        self._put_metadata(user, node, domain, meta, replace,
458
                           update_statistics_ancestors_depth=-1)
459

    
460
    @debug_method
461
    @backend_method
462
    def get_account_groups(self, user, account):
463
        """Return a dictionary with the user groups defined for the account."""
464

    
465
        self._can_read_account(user, account)
466
        if user != account:
467
            return {}
468
        self._lookup_account(account, True)
469
        return self.permissions.group_dict(account)
470

    
471
    @debug_method
472
    @backend_method
473
    def update_account_groups(self, user, account, groups, replace=False):
474
        """Update the groups associated with the account."""
475

    
476
        self._can_write_account(user, account)
477
        self._lookup_account(account, True)
478
        self._check_groups(groups)
479
        if replace:
480
            self.permissions.group_destroy(account)
481
        for k, v in groups.iteritems():
482
            if not replace:  # If not already deleted.
483
                self.permissions.group_delete(account, k)
484
            if v:
485
                self.permissions.group_addmany(account, k, v)
486

    
487
    @debug_method
488
    @backend_method
489
    def get_account_policy(self, user, account):
490
        """Return a dictionary with the account policy."""
491

    
492
        self._can_read_account(user, account)
493
        if user != account:
494
            return {}
495
        path, node = self._lookup_account(account, True)
496
        policy = self._get_policy(node, is_account_policy=True)
497
        if self.using_external_quotaholder:
498
            external_quota = self._get_account_quotas(account)
499
            policy['quota'] = external_quota.get('limit', 0)
500
        return policy
501

    
502
    @debug_method
503
    @backend_method
504
    def update_account_policy(self, user, account, policy, replace=False):
505
        """Update the policy associated with the account."""
506

    
507
        self._can_write_account(user, account)
508
        path, node = self._lookup_account(account, True)
509
        self._check_policy(policy, is_account_policy=True)
510
        self._put_policy(node, policy, replace, is_account_policy=True)
511

    
512
    @debug_method
513
    @backend_method
514
    def put_account(self, user, account, policy=None):
515
        """Create a new account with the given name."""
516

    
517
        policy = policy or {}
518
        self._can_write_account(user, account)
519
        node = self.node.node_lookup(account)
520
        if node is not None:
521
            raise AccountExists('Account already exists')
522
        if policy:
523
            self._check_policy(policy, is_account_policy=True)
524
        node = self._put_path(user, self.ROOTNODE, account,
525
                              update_statistics_ancestors_depth=-1)
526
        self._put_policy(node, policy, True, is_account_policy=True)
527

    
528
    @debug_method
529
    @backend_method
530
    def delete_account(self, user, account):
531
        """Delete the account with the given name."""
532

    
533
        self._can_write_account(user, account)
534
        node = self.node.node_lookup(account)
535
        if node is None:
536
            return
537
        if not self.node.node_remove(node,
538
                                     update_statistics_ancestors_depth=-1):
539
            raise AccountNotEmpty('Account is not empty')
540
        self.permissions.group_destroy(account)
541

    
542
        # remove all the cached allowed paths
543
        # removing the specific path could be more expensive
544
        self._reset_allowed_paths()
545

    
546
    @debug_method
547
    @backend_method
548
    @list_method
549
    def list_containers(self, user, account, marker=None, limit=10000,
550
                        shared=False, until=None, public=False):
551
        """Return a list of containers existing under an account."""
552

    
553
        self._can_read_account(user, account)
554
        if user != account:
555
            if until:
556
                raise NotAllowedError
557
            return self._allowed_containers(user, account)
558
        if shared or public:
559
            allowed = set()
560
            if shared:
561
                allowed.update([x.split('/', 2)[1] for x in
562
                               self.permissions.access_list_shared(account)])
563
            if public:
564
                allowed.update([x[0].split('/', 2)[1] for x in
565
                               self.permissions.public_list(account)])
566
            return sorted(allowed)
567
        node = self.node.node_lookup(account)
568
        return [x[0] for x in self._list_object_properties(
569
            node, account, '', '/', marker, limit, False, None, [], until)]
570

    
571
    @debug_method
572
    @backend_method
573
    def list_container_meta(self, user, account, container, domain,
574
                            until=None):
575
        """Return a list of the container's object meta keys for a domain."""
576

    
577
        self._can_read_container(user, account, container)
578
        allowed = []
579
        if user != account:
580
            if until:
581
                raise NotAllowedError
582
        path, node = self._lookup_container(account, container)
583
        before = until if until is not None else inf
584
        allowed = self._get_formatted_paths(allowed)
585
        return self.node.latest_attribute_keys(node, domain, before,
586
                                               CLUSTER_DELETED, allowed)
587

    
588
    @debug_method
589
    @backend_method
590
    def get_container_meta(self, user, account, container, domain, until=None,
591
                           include_user_defined=True):
592
        """Return a dictionary with the container metadata for the domain."""
593

    
594
        self._can_read_container(user, account, container)
595
        if user != account:
596
            if until:
597
                raise NotAllowedError
598
        path, node = self._lookup_container(account, container)
599
        props = self._get_properties(node, until)
600
        mtime = props[self.MTIME]
601
        count, bytes, tstamp = self._get_statistics(node, until)
602
        tstamp = max(tstamp, mtime)
603
        if until is None:
604
            modified = tstamp
605
        else:
606
            modified = self._get_statistics(
607
                node)[2]  # Overall last modification.
608
            modified = max(modified, mtime)
609

    
610
        if user != account:
611
            meta = {'name': container}
612
        else:
613
            meta = {}
614
            if include_user_defined:
615
                meta.update(
616
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
617
            if until is not None:
618
                meta.update({'until_timestamp': tstamp})
619
            meta.update({'name': container, 'count': count, 'bytes': bytes})
620
        meta.update({'modified': modified})
621
        return meta
622

    
623
    @debug_method
624
    @backend_method
625
    def update_container_meta(self, user, account, container, domain, meta,
626
                              replace=False):
627
        """Update the metadata associated with the container for the domain."""
628

    
629
        self._can_write_container(user, account, container)
630
        path, node = self._lookup_container(account, container)
631
        src_version_id, dest_version_id = self._put_metadata(
632
            user, node, domain, meta, replace,
633
            update_statistics_ancestors_depth=0)
634
        if src_version_id is not None:
635
            versioning = self._get_policy(
636
                node, is_account_policy=False)['versioning']
637
            if versioning != 'auto':
638
                self.node.version_remove(src_version_id,
639
                                         update_statistics_ancestors_depth=0)
640

    
641
    @debug_method
642
    @backend_method
643
    def get_container_policy(self, user, account, container):
644
        """Return a dictionary with the container policy."""
645

    
646
        self._can_read_container(user, account, container)
647
        if user != account:
648
            return {}
649
        path, node = self._lookup_container(account, container)
650
        return self._get_policy(node, is_account_policy=False)
651

    
652
    @debug_method
653
    @backend_method
654
    def update_container_policy(self, user, account, container, policy,
655
                                replace=False):
656
        """Update the policy associated with the container."""
657

    
658
        self._can_write_container(user, account, container)
659
        path, node = self._lookup_container(account, container)
660
        self._check_policy(policy, is_account_policy=False)
661
        self._put_policy(node, policy, replace, is_account_policy=False)
662

    
663
    @debug_method
664
    @backend_method
665
    def put_container(self, user, account, container, policy=None):
666
        """Create a new container with the given name."""
667

    
668
        policy = policy or {}
669
        self._can_write_container(user, account, container)
670
        try:
671
            path, node = self._lookup_container(account, container)
672
        except NameError:
673
            pass
674
        else:
675
            raise ContainerExists('Container already exists')
676
        if policy:
677
            self._check_policy(policy, is_account_policy=False)
678
        path = '/'.join((account, container))
679
        node = self._put_path(
680
            user, self._lookup_account(account, True)[1], path,
681
            update_statistics_ancestors_depth=-1)
682
        self._put_policy(node, policy, True, is_account_policy=False)
683

    
684
    @debug_method
685
    @backend_method
686
    def delete_container(self, user, account, container, until=None, prefix='',
687
                         delimiter=None):
688
        """Delete/purge the container with the given name."""
689

    
690
        self._can_write_container(user, account, container)
691
        path, node = self._lookup_container(account, container)
692

    
693
        if until is not None:
694
            hashes, size, serials = self.node.node_purge_children(
695
                node, until, CLUSTER_HISTORY,
696
                update_statistics_ancestors_depth=0)
697
            for h in hashes:
698
                self.store.map_delete(h)
699
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
700
                                          update_statistics_ancestors_depth=0)
701
            if not self.free_versioning:
702
                self._report_size_change(
703
                    user, account, -size, {
704
                        'action': 'container purge',
705
                        'path': path,
706
                        'versions': ','.join(str(i) for i in serials)
707
                    }
708
                )
709
            return
710

    
711
        if not delimiter:
712
            if self._get_statistics(node)[0] > 0:
713
                raise ContainerNotEmpty('Container is not empty')
714
            hashes, size, serials = self.node.node_purge_children(
715
                node, inf, CLUSTER_HISTORY,
716
                update_statistics_ancestors_depth=0)
717
            for h in hashes:
718
                self.store.map_delete(h)
719
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
720
                                          update_statistics_ancestors_depth=0)
721
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
722
            if not self.free_versioning:
723
                self._report_size_change(
724
                    user, account, -size, {
725
                        'action': 'container purge',
726
                        'path': path,
727
                        'versions': ','.join(str(i) for i in serials)
728
                    }
729
                )
730
        else:
731
            # remove only contents
732
            src_names = self._list_objects_no_limit(
733
                user, account, container, prefix='', delimiter=None,
734
                virtual=False, domain=None, keys=[], shared=False, until=None,
735
                size_range=None, all_props=True, public=False)
736
            paths = []
737
            for t in src_names:
738
                path = '/'.join((account, container, t[0]))
739
                node = t[2]
740
                if not self._exists(node):
741
                    continue
742
                src_version_id, dest_version_id = self._put_version_duplicate(
743
                    user, node, size=0, type='', hash=None, checksum='',
744
                    cluster=CLUSTER_DELETED,
745
                    update_statistics_ancestors_depth=1)
746
                del_size = self._apply_versioning(
747
                    account, container, src_version_id,
748
                    update_statistics_ancestors_depth=1)
749
                self._report_size_change(
750
                    user, account, -del_size, {
751
                        'action': 'object delete',
752
                        'path': path,
753
                        'versions': ','.join([str(dest_version_id)])})
754
                self._report_object_change(
755
                    user, account, path, details={'action': 'object delete'})
756
                paths.append(path)
757
            self.permissions.access_clear_bulk(paths)
758

    
759
        # remove all the cached allowed paths
760
        # removing the specific path could be more expensive
761
        self._reset_allowed_paths()
762

    
763
    def _list_objects(self, user, account, container, prefix, delimiter,
764
                      marker, limit, virtual, domain, keys, shared, until,
765
                      size_range, all_props, public):
766
        if user != account and until:
767
            raise NotAllowedError
768

    
769
        objects = []
770
        if shared and public:
771
            # get shared first
772
            shared_paths = self._list_object_permissions(
773
                user, account, container, prefix, shared=True, public=False)
774
            if shared_paths:
775
                path, node = self._lookup_container(account, container)
776
                shared_paths = self._get_formatted_paths(shared_paths)
777
                objects = set(self._list_object_properties(
778
                    node, path, prefix, delimiter, marker, limit, virtual,
779
                    domain, keys, until, size_range, shared_paths, all_props))
780

    
781
            # get public
782
            objects |= set(self._list_public_object_properties(
783
                user, account, container, prefix, all_props))
784
            objects = list(objects)
785

    
786
            objects.sort(key=lambda x: x[0])
787
        elif public:
788
            objects = self._list_public_object_properties(
789
                user, account, container, prefix, all_props)
790
        else:
791
            allowed = self._list_object_permissions(
792
                user, account, container, prefix, shared, public=False)
793
            if shared and not allowed:
794
                return []
795
            path, node = self._lookup_container(account, container)
796
            allowed = self._get_formatted_paths(allowed)
797
            objects = self._list_object_properties(
798
                node, path, prefix, delimiter, marker, limit, virtual, domain,
799
                keys, until, size_range, allowed, all_props)
800

    
801
        # apply limits
802
        start, limit = self._list_limits(objects, marker, limit)
803
        return objects[start:start + limit]
804

    
805
    def _list_public_object_properties(self, user, account, container, prefix,
806
                                       all_props):
807
        public = self._list_object_permissions(
808
            user, account, container, prefix, shared=False, public=True)
809
        paths, nodes = self._lookup_objects(public)
810
        path = '/'.join((account, container))
811
        cont_prefix = path + '/'
812
        paths = [x[len(cont_prefix):] for x in paths]
813
        objects = [(p,) + props for p, props in
814
                   zip(paths, self.node.version_lookup_bulk(
815
                       nodes, all_props=all_props, order_by_path=True))]
816
        return objects
817

    
818
    def _list_objects_no_limit(self, user, account, container, prefix,
819
                               delimiter, virtual, domain, keys, shared, until,
820
                               size_range, all_props, public):
821
        objects = []
822
        while True:
823
            marker = objects[-1] if objects else None
824
            limit = 10000
825
            l = self._list_objects(
826
                user, account, container, prefix, delimiter, marker, limit,
827
                virtual, domain, keys, shared, until, size_range, all_props,
828
                public)
829
            objects.extend(l)
830
            if not l or len(l) < limit:
831
                break
832
        return objects
833

    
834
    def _list_object_permissions(self, user, account, container, prefix,
835
                                 shared, public):
836
        allowed = []
837
        path = '/'.join((account, container, prefix)).rstrip('/')
838
        if user != account:
839
            allowed = self.permissions.access_list_paths(user, path)
840
            if not allowed:
841
                raise NotAllowedError
842
        else:
843
            allowed = set()
844
            if shared:
845
                allowed.update(self.permissions.access_list_shared(path))
846
            if public:
847
                allowed.update(
848
                    [x[0] for x in self.permissions.public_list(path)])
849
            allowed = sorted(allowed)
850
            if not allowed:
851
                return []
852
        return allowed
853

    
854
    @debug_method
855
    @backend_method
856
    def list_objects(self, user, account, container, prefix='', delimiter=None,
857
                     marker=None, limit=10000, virtual=True, domain=None,
858
                     keys=None, shared=False, until=None, size_range=None,
859
                     public=False):
860
        """List (object name, object version_id) under a container."""
861

    
862
        keys = keys or []
863
        return self._list_objects(
864
            user, account, container, prefix, delimiter, marker, limit,
865
            virtual, domain, keys, shared, until, size_range, False, public)
866

    
867
    @debug_method
868
    @backend_method
869
    def list_object_meta(self, user, account, container, prefix='',
870
                         delimiter=None, marker=None, limit=10000,
871
                         virtual=True, domain=None, keys=None, shared=False,
872
                         until=None, size_range=None, public=False):
873
        """Return a list of metadata dicts of objects under a container."""
874

    
875
        keys = keys or []
876
        props = self._list_objects(
877
            user, account, container, prefix, delimiter, marker, limit,
878
            virtual, domain, keys, shared, until, size_range, True, public)
879
        objects = []
880
        for p in props:
881
            if len(p) == 2:
882
                objects.append({'subdir': p[0]})
883
            else:
884
                objects.append({
885
                    'name': p[0],
886
                    'bytes': p[self.SIZE + 1],
887
                    'type': p[self.TYPE + 1],
888
                    'hash': p[self.HASH + 1],
889
                    'version': p[self.SERIAL + 1],
890
                    'version_timestamp': p[self.MTIME + 1],
891
                    'modified': p[self.MTIME + 1] if until is None else None,
892
                    'modified_by': p[self.MUSER + 1],
893
                    'uuid': p[self.UUID + 1],
894
                    'checksum': p[self.CHECKSUM + 1]})
895
        return objects
896

    
897
    @debug_method
898
    @backend_method
899
    def list_object_permissions(self, user, account, container, prefix=''):
900
        """Return a list of paths enforce permissions under a container."""
901

    
902
        return self._list_object_permissions(user, account, container, prefix,
903
                                             True, False)
904

    
905
    @debug_method
906
    @backend_method
907
    def list_object_public(self, user, account, container, prefix=''):
908
        """Return a mapping of object paths to public ids under a container."""
909

    
910
        public = {}
911
        for path, p in self.permissions.public_list('/'.join((account,
912
                                                              container,
913
                                                              prefix))):
914
            public[path] = p
915
        return public
916

    
917
    @debug_method
918
    @backend_method
919
    def get_object_meta(self, user, account, container, name, domain,
920
                        version=None, include_user_defined=True):
921
        """Return a dictionary with the object metadata for the domain."""
922

    
923
        self._can_read_object(user, account, container, name)
924
        path, node = self._lookup_object(account, container, name)
925
        props = self._get_version(node, version)
926
        if version is None:
927
            modified = props[self.MTIME]
928
        else:
929
            try:
930
                modified = self._get_version(
931
                    node)[self.MTIME]  # Overall last modification.
932
            except NameError:  # Object may be deleted.
933
                del_props = self.node.version_lookup(
934
                    node, inf, CLUSTER_DELETED)
935
                if del_props is None:
936
                    raise ItemNotExists('Object does not exist')
937
                modified = del_props[self.MTIME]
938

    
939
        meta = {}
940
        if include_user_defined:
941
            meta.update(
942
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
943
        meta.update({'name': name,
944
                     'bytes': props[self.SIZE],
945
                     'type': props[self.TYPE],
946
                     'hash': props[self.HASH],
947
                     'version': props[self.SERIAL],
948
                     'version_timestamp': props[self.MTIME],
949
                     'modified': modified,
950
                     'modified_by': props[self.MUSER],
951
                     'uuid': props[self.UUID],
952
                     'checksum': props[self.CHECKSUM]})
953
        return meta
954

    
955
    @debug_method
956
    @backend_method
957
    def update_object_meta(self, user, account, container, name, domain, meta,
958
                           replace=False):
959
        """Update object metadata for a domain and return the new version."""
960

    
961
        self._can_write_object(user, account, container, name)
962

    
963
        path, node = self._lookup_object(account, container, name,
964
                                         lock_container=True)
965
        src_version_id, dest_version_id = self._put_metadata(
966
            user, node, domain, meta, replace,
967
            update_statistics_ancestors_depth=1)
968
        self._apply_versioning(account, container, src_version_id,
969
                               update_statistics_ancestors_depth=1)
970
        return dest_version_id
971

    
972
    @debug_method
973
    @backend_method
974
    def get_object_permissions_bulk(self, user, account, container, names):
975
        """Return the action allowed on the object, the path
976
        from which the object gets its permissions from,
977
        along with a dictionary containing the permissions."""
978

    
979
        permissions_path = self._get_permissions_path_bulk(account, container,
980
                                                           names)
981
        access_objects = self.permissions.access_check_bulk(permissions_path,
982
                                                            user)
983
        #group_parents = access_objects['group_parents']
984
        nobject_permissions = {}
985
        cpath = '/'.join((account, container, ''))
986
        cpath_idx = len(cpath)
987
        for path in permissions_path:
988
            allowed = 1
989
            name = path[cpath_idx:]
990
            if user != account:
991
                try:
992
                    allowed = access_objects[path]
993
                except KeyError:
994
                    raise NotAllowedError
995
            access_dict, allowed = \
996
                self.permissions.access_get_for_bulk(access_objects[path])
997
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
998
                                         access_dict)
999
        self._lookup_objects(permissions_path)
1000
        return nobject_permissions
1001

    
1002
    @debug_method
1003
    @backend_method
1004
    def get_object_permissions(self, user, account, container, name):
1005
        """Return the action allowed on the object, the path
1006
        from which the object gets its permissions from,
1007
        along with a dictionary containing the permissions."""
1008

    
1009
        allowed = 'write'
1010
        permissions_path = self._get_permissions_path(account, container, name)
1011
        if user != account:
1012
            if self.permissions.access_check(permissions_path, self.WRITE,
1013
                                             user):
1014
                allowed = 'write'
1015
            elif self.permissions.access_check(permissions_path, self.READ,
1016
                                               user):
1017
                allowed = 'read'
1018
            else:
1019
                raise NotAllowedError
1020
        self._lookup_object(account, container, name)
1021
        return (allowed,
1022
                permissions_path,
1023
                self.permissions.access_get(permissions_path))
1024

    
1025
    @debug_method
1026
    @backend_method
1027
    def update_object_permissions(self, user, account, container, name,
1028
                                  permissions):
1029
        """Update the permissions associated with the object."""
1030

    
1031
        if user != account:
1032
            raise NotAllowedError
1033
        path = self._lookup_object(account, container, name,
1034
                                   lock_container=True)[0]
1035
        self._check_permissions(path, permissions)
1036
        try:
1037
            self.permissions.access_set(path, permissions)
1038
        except:
1039
            raise ValueError
1040
        else:
1041
            self._report_sharing_change(user, account, path, {'members':
1042
                                        self.permissions.access_members(path)})
1043

    
1044
        # remove all the cached allowed paths
1045
        # filtering out only those affected could be more expensive
1046
        self._reset_allowed_paths()
1047

    
1048
    @debug_method
1049
    @backend_method
1050
    def get_object_public(self, user, account, container, name):
1051
        """Return the public id of the object if applicable."""
1052

    
1053
        self._can_read_object(user, account, container, name)
1054
        path = self._lookup_object(account, container, name)[0]
1055
        p = self.permissions.public_get(path)
1056
        return p
1057

    
1058
    @debug_method
1059
    @backend_method
1060
    def update_object_public(self, user, account, container, name, public):
1061
        """Update the public status of the object."""
1062

    
1063
        self._can_write_object(user, account, container, name)
1064
        path = self._lookup_object(account, container, name,
1065
                                   lock_container=True)[0]
1066
        if not public:
1067
            self.permissions.public_unset(path)
1068
        else:
1069
            self.permissions.public_set(
1070
                path, self.public_url_security, self.public_url_alphabet)
1071

    
1072
    @debug_method
1073
    @backend_method
1074
    def get_object_hashmap(self, user, account, container, name, version=None):
1075
        """Return the object's size and a list with partial hashes."""
1076

    
1077
        self._can_read_object(user, account, container, name)
1078
        path, node = self._lookup_object(account, container, name)
1079
        props = self._get_version(node, version)
1080
        if props[self.HASH] is None:
1081
            return 0, ()
1082
        if props[self.HASH].startswith('archip:'):
1083
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1084
                                                     props[self.SIZE])
1085
            return props[self.SIZE], [x for x in hashmap]
1086
        else:
1087
            hashmap = self.store.map_get(self._unhexlify_hash(
1088
                props[self.HASH]))
1089
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1090

    
1091
    def _update_object_hash(self, user, account, container, name, size, type,
1092
                            hash, checksum, domain, meta, replace_meta,
1093
                            permissions, src_node=None, src_version_id=None,
1094
                            is_copy=False, report_size_change=True):
1095
        if permissions is not None and user != account:
1096
            raise NotAllowedError
1097
        self._can_write_object(user, account, container, name)
1098
        if permissions is not None:
1099
            path = '/'.join((account, container, name))
1100
            self._check_permissions(path, permissions)
1101

    
1102
        account_path, account_node = self._lookup_account(account, True)
1103
        container_path, container_node = self._lookup_container(
1104
            account, container)
1105

    
1106
        path, node = self._put_object_node(
1107
            container_path, container_node, name)
1108
        pre_version_id, dest_version_id = self._put_version_duplicate(
1109
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1110
            checksum=checksum, is_copy=is_copy,
1111
            update_statistics_ancestors_depth=1)
1112

    
1113
        # Handle meta.
1114
        if src_version_id is None:
1115
            src_version_id = pre_version_id
1116
        self._put_metadata_duplicate(
1117
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1118

    
1119
        del_size = self._apply_versioning(account, container, pre_version_id,
1120
                                          update_statistics_ancestors_depth=1)
1121
        size_delta = size - del_size
1122
        if size_delta > 0:
1123
            # Check account quota.
1124
            if not self.using_external_quotaholder:
1125
                account_quota = long(self._get_policy(
1126
                    account_node, is_account_policy=True)['quota'])
1127
                account_usage = self._get_statistics(account_node,
1128
                                                     compute=True)[1]
1129
                if (account_quota > 0 and account_usage > account_quota):
1130
                    raise QuotaError(
1131
                        'Account quota exceeded: limit: %s, usage: %s' % (
1132
                            account_quota, account_usage))
1133

    
1134
            # Check container quota.
1135
            container_quota = long(self._get_policy(
1136
                container_node, is_account_policy=False)['quota'])
1137
            container_usage = self._get_statistics(container_node)[1]
1138
            if (container_quota > 0 and container_usage > container_quota):
1139
                # This must be executed in a transaction, so the version is
1140
                # never created if it fails.
1141
                raise QuotaError(
1142
                    'Container quota exceeded: limit: %s, usage: %s' % (
1143
                        container_quota, container_usage
1144
                    )
1145
                )
1146

    
1147
        if report_size_change:
1148
            self._report_size_change(
1149
                user, account, size_delta,
1150
                {'action': 'object update', 'path': path,
1151
                 'versions': ','.join([str(dest_version_id)])})
1152
        if permissions is not None:
1153
            self.permissions.access_set(path, permissions)
1154
            self._report_sharing_change(
1155
                user, account, path,
1156
                {'members': self.permissions.access_members(path)})
1157

    
1158
        self._report_object_change(
1159
            user, account, path,
1160
            details={'version': dest_version_id, 'action': 'object update'})
1161
        return dest_version_id
1162

    
1163
    @debug_method
1164
    @backend_method
1165
    def register_object_map(self, user, account, container, name, size, type,
1166
                            mapfile, checksum='', domain='pithos', meta=None,
1167
                            replace_meta=False, permissions=None):
1168
        """Register an object mapfile without providing any data.
1169

1170
        Lock the container path, create a node pointing to the object path,
1171
        create a version pointing to the mapfile
1172
        and issue the size change in the quotaholder.
1173

1174
        :param user: the user account which performs the action
1175

1176
        :param account: the account under which the object resides
1177

1178
        :param container: the container under which the object resides
1179

1180
        :param name: the object name
1181

1182
        :param size: the object size
1183

1184
        :param type: the object mimetype
1185

1186
        :param mapfile: the mapfile pointing to the object data
1187

1188
        :param checkcum: the md5 checksum (optional)
1189

1190
        :param domain: the object domain
1191

1192
        :param meta: a dict with custom object metadata
1193

1194
        :param replace_meta: replace existing metadata or not
1195

1196
        :param permissions: a dict with the read and write object permissions
1197

1198
        :returns: the new object uuid
1199

1200
        :raises: ItemNotExists, NotAllowedError, QuotaError
1201
        """
1202

    
1203
        meta = meta or {}
1204
        try:
1205
            self.lock_container_path = True
1206
            self.put_container(user, account, container, policy=None)
1207
        except ContainerExists:
1208
            pass
1209
        finally:
1210
            self.lock_container_path = False
1211
        dest_version_id = self._update_object_hash(
1212
            user, account, container, name, size, type, mapfile, checksum,
1213
            domain, meta, replace_meta, permissions)
1214
        return self.node.version_get_properties(dest_version_id,
1215
                                                keys=('uuid',))[0]
1216

    
1217
    @debug_method
1218
    def update_object_hashmap(self, user, account, container, name, size, type,
1219
                              hashmap, checksum, domain, meta=None,
1220
                              replace_meta=False, permissions=None):
1221
        """Create/update an object's hashmap and return the new version."""
1222

    
1223
        for h in hashmap:
1224
            if h.startswith('archip_'):
1225
                raise IllegalOperationError(
1226
                    'Cannot update Archipelago Volume hashmap.')
1227
        meta = meta or {}
1228
        if size == 0:  # No such thing as an empty hashmap.
1229
            hashmap = [self.put_block('')]
1230
        map = HashMap(self.block_size, self.hash_algorithm)
1231
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1232
        missing = self.store.block_search(map)
1233
        if missing:
1234
            ie = IndexError()
1235
            ie.data = [binascii.hexlify(x) for x in missing]
1236
            raise ie
1237

    
1238
        hash = map.hash()
1239
        hexlified = binascii.hexlify(hash)
1240
        # _update_object_hash() locks destination path
1241
        dest_version_id = self._update_object_hash(
1242
            user, account, container, name, size, type, hexlified, checksum,
1243
            domain, meta, replace_meta, permissions)
1244
        self.store.map_put(hash, map)
1245
        return dest_version_id, hexlified
1246

    
1247
    @debug_method
1248
    @backend_method
1249
    def update_object_checksum(self, user, account, container, name, version,
1250
                               checksum):
1251
        """Update an object's checksum."""
1252

    
1253
        # Update objects with greater version and same hashmap
1254
        # and size (fix metadata updates).
1255
        self._can_write_object(user, account, container, name)
1256
        path, node = self._lookup_object(account, container, name,
1257
                                         lock_container=True)
1258
        props = self._get_version(node, version)
1259
        versions = self.node.node_get_versions(node)
1260
        for x in versions:
1261
            if (x[self.SERIAL] >= int(version) and
1262
                x[self.HASH] == props[self.HASH] and
1263
                    x[self.SIZE] == props[self.SIZE]):
1264
                self.node.version_put_property(
1265
                    x[self.SERIAL], 'checksum', checksum)
1266

    
1267
    def _copy_object(self, user, src_account, src_container, src_name,
1268
                     dest_account, dest_container, dest_name, type,
1269
                     dest_domain=None, dest_meta=None, replace_meta=False,
1270
                     permissions=None, src_version=None, is_move=False,
1271
                     delimiter=None):
1272

    
1273
        report_size_change = not is_move
1274
        dest_meta = dest_meta or {}
1275
        dest_version_ids = []
1276
        self._can_read_object(user, src_account, src_container, src_name)
1277

    
1278
        src_container_path = '/'.join((src_account, src_container))
1279
        dest_container_path = '/'.join((dest_account, dest_container))
1280
        # Lock container paths in alphabetical order
1281
        if src_container_path < dest_container_path:
1282
            self._lookup_container(src_account, src_container)
1283
            self._lookup_container(dest_account, dest_container)
1284
        else:
1285
            self._lookup_container(dest_account, dest_container)
1286
            self._lookup_container(src_account, src_container)
1287

    
1288
        path, node = self._lookup_object(src_account, src_container, src_name)
1289
        # TODO: Will do another fetch of the properties in duplicate version...
1290
        props = self._get_version(
1291
            node, src_version)  # Check to see if source exists.
1292
        src_version_id = props[self.SERIAL]
1293
        hash = props[self.HASH]
1294
        size = props[self.SIZE]
1295
        is_copy = not is_move and (src_account, src_container, src_name) != (
1296
            dest_account, dest_container, dest_name)  # New uuid.
1297
        dest_version_ids.append(self._update_object_hash(
1298
            user, dest_account, dest_container, dest_name, size, type, hash,
1299
            None, dest_domain, dest_meta, replace_meta, permissions,
1300
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1301
            report_size_change=report_size_change))
1302
        if is_move and ((src_account, src_container, src_name) !=
1303
                        (dest_account, dest_container, dest_name)):
1304
            self._delete_object(user, src_account, src_container, src_name,
1305
                                report_size_change=report_size_change)
1306

    
1307
        if delimiter:
1308
            prefix = (src_name + delimiter if not
1309
                      src_name.endswith(delimiter) else src_name)
1310
            src_names = self._list_objects_no_limit(
1311
                user, src_account, src_container, prefix, delimiter=None,
1312
                virtual=False, domain=None, keys=[], shared=False, until=None,
1313
                size_range=None, all_props=True, public=False)
1314
            src_names.sort(key=lambda x: x[2])  # order by nodes
1315
            paths = [elem[0] for elem in src_names]
1316
            nodes = [elem[2] for elem in src_names]
1317
            # TODO: Will do another fetch of the properties
1318
            # in duplicate version...
1319
            props = self._get_versions(nodes)  # Check to see if source exists.
1320

    
1321
            for prop, path, node in zip(props, paths, nodes):
1322
                src_version_id = prop[self.SERIAL]
1323
                hash = prop[self.HASH]
1324
                vtype = prop[self.TYPE]
1325
                size = prop[self.SIZE]
1326
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1327
                    delimiter) else dest_name
1328
                vdest_name = path.replace(prefix, dest_prefix, 1)
1329
                # _update_object_hash() locks destination path
1330
                dest_version_ids.append(self._update_object_hash(
1331
                    user, dest_account, dest_container, vdest_name, size,
1332
                    vtype, hash, None, dest_domain, meta={},
1333
                    replace_meta=False, permissions=None, src_node=node,
1334
                    src_version_id=src_version_id, is_copy=is_copy,
1335
                    report_size_change=report_size_change))
1336
                if is_move and ((src_account, src_container, src_name) !=
1337
                                (dest_account, dest_container, dest_name)):
1338
                    self._delete_object(user, src_account, src_container, path,
1339
                                        report_size_change=report_size_change)
1340
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1341
                dest_version_ids)
1342

    
1343
    @debug_method
1344
    @backend_method
1345
    def copy_object(self, user, src_account, src_container, src_name,
1346
                    dest_account, dest_container, dest_name, type, domain,
1347
                    meta=None, replace_meta=False, permissions=None,
1348
                    src_version=None, delimiter=None):
1349
        """Copy an object's data and metadata."""
1350

    
1351
        meta = meta or {}
1352
        dest_version_id = self._copy_object(
1353
            user, src_account, src_container, src_name, dest_account,
1354
            dest_container, dest_name, type, domain, meta, replace_meta,
1355
            permissions, src_version, False, delimiter)
1356
        return dest_version_id
1357

    
1358
    @debug_method
1359
    @backend_method
1360
    def move_object(self, user, src_account, src_container, src_name,
1361
                    dest_account, dest_container, dest_name, type, domain,
1362
                    meta=None, replace_meta=False, permissions=None,
1363
                    delimiter=None):
1364
        """Move an object's data and metadata."""
1365

    
1366
        meta = meta or {}
1367
        if user != src_account:
1368
            raise NotAllowedError
1369
        dest_version_id = self._move_object(
1370
            user, src_account, src_container, src_name, dest_account,
1371
            dest_container, dest_name, type, domain, meta, replace_meta,
1372
            permissions, None, delimiter=delimiter)
1373
        return dest_version_id
1374

    
1375
    def _delete_object(self, user, account, container, name, until=None,
1376
                       delimiter=None, report_size_change=True):
1377
        if user != account:
1378
            raise NotAllowedError
1379

    
1380
        # lookup object and lock container path also
1381
        path, node = self._lookup_object(account, container, name,
1382
                                         lock_container=True)
1383

    
1384
        if until is not None:
1385
            if node is None:
1386
                return
1387
            hashes = []
1388
            size = 0
1389
            serials = []
1390
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1391
                                           update_statistics_ancestors_depth=1)
1392
            hashes += h
1393
            size += s
1394
            serials += v
1395
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1396
                                           update_statistics_ancestors_depth=1)
1397
            hashes += h
1398
            if not self.free_versioning:
1399
                size += s
1400
            serials += v
1401
            for h in hashes:
1402
                self.store.map_delete(h)
1403
            self.node.node_purge(node, until, CLUSTER_DELETED,
1404
                                 update_statistics_ancestors_depth=1)
1405
            try:
1406
                self._get_version(node)
1407
            except NameError:
1408
                self.permissions.access_clear(path)
1409
            self._report_size_change(
1410
                user, account, -size, {
1411
                    'action': 'object purge',
1412
                    'path': path,
1413
                    'versions': ','.join(str(i) for i in serials)
1414
                }
1415
            )
1416
            return
1417

    
1418
        if not self._exists(node):
1419
            raise ItemNotExists('Object is deleted.')
1420

    
1421
        src_version_id, dest_version_id = self._put_version_duplicate(
1422
            user, node, size=0, type='', hash=None, checksum='',
1423
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1424
        del_size = self._apply_versioning(account, container, src_version_id,
1425
                                          update_statistics_ancestors_depth=1)
1426
        if report_size_change:
1427
            self._report_size_change(
1428
                user, account, -del_size,
1429
                {'action': 'object delete',
1430
                 'path': path,
1431
                 'versions': ','.join([str(dest_version_id)])})
1432
        self._report_object_change(
1433
            user, account, path, details={'action': 'object delete'})
1434
        self.permissions.access_clear(path)
1435

    
1436
        if delimiter:
1437
            prefix = name + delimiter if not name.endswith(delimiter) else name
1438
            src_names = self._list_objects_no_limit(
1439
                user, account, container, prefix, delimiter=None,
1440
                virtual=False, domain=None, keys=[], shared=False, until=None,
1441
                size_range=None, all_props=True, public=False)
1442
            paths = []
1443
            for t in src_names:
1444
                path = '/'.join((account, container, t[0]))
1445
                node = t[2]
1446
                if not self._exists(node):
1447
                    continue
1448
                src_version_id, dest_version_id = self._put_version_duplicate(
1449
                    user, node, size=0, type='', hash=None, checksum='',
1450
                    cluster=CLUSTER_DELETED,
1451
                    update_statistics_ancestors_depth=1)
1452
                del_size = self._apply_versioning(
1453
                    account, container, src_version_id,
1454
                    update_statistics_ancestors_depth=1)
1455
                if report_size_change:
1456
                    self._report_size_change(
1457
                        user, account, -del_size,
1458
                        {'action': 'object delete',
1459
                         'path': path,
1460
                         'versions': ','.join([str(dest_version_id)])})
1461
                self._report_object_change(
1462
                    user, account, path, details={'action': 'object delete'})
1463
                paths.append(path)
1464
            self.permissions.access_clear_bulk(paths)
1465

    
1466
        # remove all the cached allowed paths
1467
        # removing the specific path could be more expensive
1468
        self._reset_allowed_paths()
1469

    
1470
    @debug_method
1471
    @backend_method
1472
    def delete_object(self, user, account, container, name, until=None,
1473
                      prefix='', delimiter=None):
1474
        """Delete/purge an object."""
1475

    
1476
        self._delete_object(user, account, container, name, until, delimiter)
1477

    
1478
    @debug_method
1479
    @backend_method
1480
    def list_versions(self, user, account, container, name):
1481
        """Return a list of all object (version, version_timestamp) tuples."""
1482

    
1483
        self._can_read_object(user, account, container, name)
1484
        path, node = self._lookup_object(account, container, name)
1485
        versions = self.node.node_get_versions(node)
1486
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1487
                x[self.CLUSTER] != CLUSTER_DELETED]
1488

    
1489
    @debug_method
1490
    @backend_method
1491
    def get_uuid(self, user, uuid, check_permissions=True):
1492
        """Return the (account, container, name) for the UUID given."""
1493

    
1494
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1495
        if info is None:
1496
            raise NameError
1497
        path, serial = info
1498
        account, container, name = path.split('/', 2)
1499
        if check_permissions:
1500
            self._can_read_object(user, account, container, name)
1501
        return (account, container, name)
1502

    
1503
    @debug_method
1504
    @backend_method
1505
    def get_public(self, user, public):
1506
        """Return the (account, container, name) for the public id given."""
1507

    
1508
        path = self.permissions.public_path(public)
1509
        if path is None:
1510
            raise NameError
1511
        account, container, name = path.split('/', 2)
1512
        self._can_read_object(user, account, container, name)
1513
        return (account, container, name)
1514

    
1515
    def get_block(self, hash):
1516
        """Return a block's data."""
1517

    
1518
        logger.debug("get_block: %s", hash)
1519
        if hash.startswith('archip_'):
1520
            block = self.store.block_get_archipelago(hash)
1521
        else:
1522
            block = self.store.block_get(self._unhexlify_hash(hash))
1523
        if not block:
1524
            raise ItemNotExists('Block does not exist')
1525
        return block
1526

    
1527
    def put_block(self, data):
1528
        """Store a block and return the hash."""
1529

    
1530
        logger.debug("put_block: %s", len(data))
1531
        return binascii.hexlify(self.store.block_put(data))
1532

    
1533
    def update_block(self, hash, data, offset=0):
1534
        """Update a known block and return the hash."""
1535

    
1536
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1537
        if hash.startswith('archip_'):
1538
            raise IllegalOperationError(
1539
                'Cannot update an Archipelago Volume block.')
1540
        if offset == 0 and len(data) == self.block_size:
1541
            return self.put_block(data)
1542
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1543
        return binascii.hexlify(h)
1544

    
1545
    # Path functions.
1546

    
1547
    def _generate_uuid(self):
1548
        return str(uuidlib.uuid4())
1549

    
1550
    def _put_object_node(self, path, parent, name):
1551
        path = '/'.join((path, name))
1552
        node = self.node.node_lookup(path)
1553
        if node is None:
1554
            node = self.node.node_create(parent, path)
1555
        return path, node
1556

    
1557
    def _put_path(self, user, parent, path,
1558
                  update_statistics_ancestors_depth=None):
1559
        node = self.node.node_create(parent, path)
1560
        self.node.version_create(node, None, 0, '', None, user,
1561
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1562
                                 update_statistics_ancestors_depth)
1563
        return node
1564

    
1565
    def _lookup_account(self, account, create=True):
1566
        node = self.node.node_lookup(account)
1567
        if node is None and create:
1568
            node = self._put_path(
1569
                account, self.ROOTNODE, account,
1570
                update_statistics_ancestors_depth=-1)  # User is account.
1571
        return account, node
1572

    
1573
    def _lookup_container(self, account, container):
1574
        for_update = True if self.lock_container_path else False
1575
        path = '/'.join((account, container))
1576
        node = self.node.node_lookup(path, for_update)
1577
        if node is None:
1578
            raise ItemNotExists('Container does not exist')
1579
        return path, node
1580

    
1581
    def _lookup_object(self, account, container, name, lock_container=False):
1582
        if lock_container:
1583
            self._lookup_container(account, container)
1584

    
1585
        path = '/'.join((account, container, name))
1586
        node = self.node.node_lookup(path)
1587
        if node is None:
1588
            raise ItemNotExists('Object does not exist')
1589
        return path, node
1590

    
1591
    def _lookup_objects(self, paths):
1592
        nodes = self.node.node_lookup_bulk(paths)
1593
        return paths, nodes
1594

    
1595
    def _get_properties(self, node, until=None):
1596
        """Return properties until the timestamp given."""
1597

    
1598
        before = until if until is not None else inf
1599
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1600
        if props is None and until is not None:
1601
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1602
        if props is None:
1603
            raise ItemNotExists('Path does not exist')
1604
        return props
1605

    
1606
    def _get_statistics(self, node, until=None, compute=False):
1607
        """Return (count, sum of size, timestamp) of everything under node."""
1608

    
1609
        if until is not None:
1610
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1611
        elif compute:
1612
            stats = self.node.statistics_latest(node,
1613
                                                except_cluster=CLUSTER_DELETED)
1614
        else:
1615
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1616
        if stats is None:
1617
            stats = (0, 0, 0)
1618
        return stats
1619

    
1620
    def _get_version(self, node, version=None):
1621
        if version is None:
1622
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1623
            if props is None:
1624
                raise ItemNotExists('Object does not exist')
1625
        else:
1626
            try:
1627
                version = int(version)
1628
            except ValueError:
1629
                raise VersionNotExists('Version does not exist')
1630
            props = self.node.version_get_properties(version, node=node)
1631
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1632
                raise VersionNotExists('Version does not exist')
1633
        return props
1634

    
1635
    def _get_versions(self, nodes):
1636
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1637

    
1638
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1639
                               type=None, hash=None, checksum=None,
1640
                               cluster=CLUSTER_NORMAL, is_copy=False,
1641
                               update_statistics_ancestors_depth=None):
1642
        """Create a new version of the node."""
1643

    
1644
        props = self.node.version_lookup(
1645
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1646
        if props is not None:
1647
            src_version_id = props[self.SERIAL]
1648
            src_hash = props[self.HASH]
1649
            src_size = props[self.SIZE]
1650
            src_type = props[self.TYPE]
1651
            src_checksum = props[self.CHECKSUM]
1652
        else:
1653
            src_version_id = None
1654
            src_hash = None
1655
            src_size = 0
1656
            src_type = ''
1657
            src_checksum = ''
1658
        if size is None:  # Set metadata.
1659
            hash = src_hash  # This way hash can be set to None
1660
                             # (account or container).
1661
            size = src_size
1662
        if type is None:
1663
            type = src_type
1664
        if checksum is None:
1665
            checksum = src_checksum
1666
        uuid = self._generate_uuid(
1667
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1668

    
1669
        if src_node is None:
1670
            pre_version_id = src_version_id
1671
        else:
1672
            pre_version_id = None
1673
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1674
            if props is not None:
1675
                pre_version_id = props[self.SERIAL]
1676
        if pre_version_id is not None:
1677
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1678
                                        update_statistics_ancestors_depth)
1679

    
1680
        dest_version_id, mtime = self.node.version_create(
1681
            node, hash, size, type, src_version_id, user, uuid, checksum,
1682
            cluster, update_statistics_ancestors_depth)
1683

    
1684
        self.node.attribute_unset_is_latest(node, dest_version_id)
1685

    
1686
        return pre_version_id, dest_version_id
1687

    
1688
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1689
                                node, meta, replace=False):
1690
        if src_version_id is not None:
1691
            self.node.attribute_copy(src_version_id, dest_version_id)
1692
        if not replace:
1693
            self.node.attribute_del(dest_version_id, domain, (
1694
                k for k, v in meta.iteritems() if v == ''))
1695
            self.node.attribute_set(dest_version_id, domain, node, (
1696
                (k, v) for k, v in meta.iteritems() if v != ''))
1697
        else:
1698
            self.node.attribute_del(dest_version_id, domain)
1699
            self.node.attribute_set(dest_version_id, domain, node, ((
1700
                k, v) for k, v in meta.iteritems()))
1701

    
1702
    def _put_metadata(self, user, node, domain, meta, replace=False,
1703
                      update_statistics_ancestors_depth=None):
1704
        """Create a new version and store metadata."""
1705

    
1706
        src_version_id, dest_version_id = self._put_version_duplicate(
1707
            user, node,
1708
            update_statistics_ancestors_depth=
1709
            update_statistics_ancestors_depth)
1710
        self._put_metadata_duplicate(
1711
            src_version_id, dest_version_id, domain, node, meta, replace)
1712
        return src_version_id, dest_version_id
1713

    
1714
    def _list_limits(self, listing, marker, limit):
1715
        start = 0
1716
        if marker:
1717
            try:
1718
                start = listing.index(marker) + 1
1719
            except ValueError:
1720
                pass
1721
        if not limit or limit > 10000:
1722
            limit = 10000
1723
        return start, limit
1724

    
1725
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1726
                                marker=None, limit=10000, virtual=True,
1727
                                domain=None, keys=None, until=None,
1728
                                size_range=None, allowed=None,
1729
                                all_props=False):
1730
        keys = keys or []
1731
        allowed = allowed or []
1732
        cont_prefix = path + '/'
1733
        prefix = cont_prefix + prefix
1734
        start = cont_prefix + marker if marker else None
1735
        before = until if until is not None else inf
1736
        filterq = keys if domain else []
1737
        sizeq = size_range
1738

    
1739
        objects, prefixes = self.node.latest_version_list(
1740
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1741
            allowed, domain, filterq, sizeq, all_props)
1742
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1743
        objects.sort(key=lambda x: x[0])
1744
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1745
        return objects
1746

    
1747
    # Reporting functions.
1748

    
1749
    @debug_method
1750
    @backend_method
1751
    def _report_size_change(self, user, account, size, details=None):
1752
        details = details or {}
1753

    
1754
        if size == 0:
1755
            return
1756

    
1757
        account_node = self._lookup_account(account, True)[1]
1758
        total = self._get_statistics(account_node, compute=True)[1]
1759
        details.update({'user': user, 'total': total})
1760
        self.messages.append(
1761
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1762
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1763

    
1764
        if not self.using_external_quotaholder:
1765
            return
1766

    
1767
        try:
1768
            name = details['path'] if 'path' in details else ''
1769
            serial = self.astakosclient.issue_one_commission(
1770
                holder=account,
1771
                source=DEFAULT_SOURCE,
1772
                provisions={'pithos.diskspace': size},
1773
                name=name)
1774
        except BaseException, e:
1775
            raise QuotaError(e)
1776
        else:
1777
            self.serials.append(serial)
1778

    
1779
    @debug_method
1780
    @backend_method
1781
    def _report_object_change(self, user, account, path, details=None):
1782
        details = details or {}
1783
        details.update({'user': user})
1784
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1785
                              account, QUEUE_INSTANCE_ID, 'object', path,
1786
                              details))
1787

    
1788
    @debug_method
1789
    @backend_method
1790
    def _report_sharing_change(self, user, account, path, details=None):
1791
        details = details or {}
1792
        details.update({'user': user})
1793
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1794
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1795
                              details))
1796

    
1797
    # Policy functions.
1798

    
1799
    def _check_policy(self, policy, is_account_policy=True):
1800
        default_policy = self.default_account_policy \
1801
            if is_account_policy else self.default_container_policy
1802
        for k in policy.keys():
1803
            if policy[k] == '':
1804
                policy[k] = default_policy.get(k)
1805
        for k, v in policy.iteritems():
1806
            if k == 'quota':
1807
                q = int(v)  # May raise ValueError.
1808
                if q < 0:
1809
                    raise ValueError
1810
            elif k == 'versioning':
1811
                if v not in ['auto', 'none']:
1812
                    raise ValueError
1813
            else:
1814
                raise ValueError
1815

    
1816
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1817
        default_policy = self.default_account_policy \
1818
            if is_account_policy else self.default_container_policy
1819
        if replace:
1820
            for k, v in default_policy.iteritems():
1821
                if k not in policy:
1822
                    policy[k] = v
1823
        self.node.policy_set(node, policy)
1824

    
1825
    def _get_policy(self, node, is_account_policy=True):
1826
        default_policy = self.default_account_policy \
1827
            if is_account_policy else self.default_container_policy
1828
        policy = default_policy.copy()
1829
        policy.update(self.node.policy_get(node))
1830
        return policy
1831

    
1832
    def _apply_versioning(self, account, container, version_id,
1833
                          update_statistics_ancestors_depth=None):
1834
        """Delete the provided version if such is the policy.
1835
           Return size of object removed.
1836
        """
1837

    
1838
        if version_id is None:
1839
            return 0
1840
        path, node = self._lookup_container(account, container)
1841
        versioning = self._get_policy(
1842
            node, is_account_policy=False)['versioning']
1843
        if versioning != 'auto':
1844
            hash, size = self.node.version_remove(
1845
                version_id, update_statistics_ancestors_depth)
1846
            self.store.map_delete(hash)
1847
            return size
1848
        elif self.free_versioning:
1849
            return self.node.version_get_properties(
1850
                version_id, keys=('size',))[0]
1851
        return 0
1852

    
1853
    # Access control functions.
1854

    
1855
    def _check_groups(self, groups):
1856
        # raise ValueError('Bad characters in groups')
1857
        pass
1858

    
1859
    def _check_permissions(self, path, permissions):
1860
        # raise ValueError('Bad characters in permissions')
1861
        pass
1862

    
1863
    def _get_formatted_paths(self, paths):
1864
        formatted = []
1865
        if len(paths) == 0:
1866
            return formatted
1867
        props = self.node.get_props(paths)
1868
        if props:
1869
            for prop in props:
1870
                if prop[1].split(';', 1)[0].strip() in (
1871
                        'application/directory', 'application/folder'):
1872
                    formatted.append((prop[0].rstrip('/') + '/',
1873
                                      self.MATCH_PREFIX))
1874
                formatted.append((prop[0], self.MATCH_EXACT))
1875
        return formatted
1876

    
1877
    def _get_permissions_path(self, account, container, name):
1878
        path = '/'.join((account, container, name))
1879
        permission_paths = self.permissions.access_inherit(path)
1880
        permission_paths.sort()
1881
        permission_paths.reverse()
1882
        for p in permission_paths:
1883
            if p == path:
1884
                return p
1885
            else:
1886
                if p.count('/') < 2:
1887
                    continue
1888
                node = self.node.node_lookup(p)
1889
                props = None
1890
                if node is not None:
1891
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1892
                if props is not None:
1893
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1894
                            'application/directory', 'application/folder'):
1895
                        return p
1896
        return None
1897

    
1898
    def _get_permissions_path_bulk(self, account, container, names):
1899
        formatted_paths = []
1900
        for name in names:
1901
            path = '/'.join((account, container, name))
1902
            formatted_paths.append(path)
1903
        permission_paths = self.permissions.access_inherit_bulk(
1904
            formatted_paths)
1905
        permission_paths.sort()
1906
        permission_paths.reverse()
1907
        permission_paths_list = []
1908
        lookup_list = []
1909
        for p in permission_paths:
1910
            if p in formatted_paths:
1911
                permission_paths_list.append(p)
1912
            else:
1913
                if p.count('/') < 2:
1914
                    continue
1915
                lookup_list.append(p)
1916

    
1917
        if len(lookup_list) > 0:
1918
            props = self.node.get_props(lookup_list)
1919
            if props:
1920
                for prop in props:
1921
                    if prop[1].split(';', 1)[0].strip() in (
1922
                            'application/directory', 'application/folder'):
1923
                        permission_paths_list.append(prop[0])
1924

    
1925
        if len(permission_paths_list) > 0:
1926
            return permission_paths_list
1927

    
1928
        return None
1929

    
1930
    def _reset_allowed_paths(self):
1931
        self.read_allowed_paths = defaultdict(set)
1932
        self.write_allowed_paths = defaultdict(set)
1933

    
1934
    @check_allowed_paths(action=0)
1935
    def _can_read_account(self, user, account):
1936
        if user != account:
1937
            if account not in self._allowed_accounts(user):
1938
                raise NotAllowedError
1939

    
1940
    @check_allowed_paths(action=1)
1941
    def _can_write_account(self, user, account):
1942
        if user != account:
1943
            raise NotAllowedError
1944

    
1945
    @check_allowed_paths(action=0)
1946
    def _can_read_container(self, user, account, container):
1947
        if user != account:
1948
            if container not in self._allowed_containers(user, account):
1949
                raise NotAllowedError
1950

    
1951
    @check_allowed_paths(action=1)
1952
    def _can_write_container(self, user, account, container):
1953
        if user != account:
1954
            raise NotAllowedError
1955

    
1956
    @check_allowed_paths(action=0)
1957
    def _can_read_object(self, user, account, container, name):
1958
        if user == account:
1959
            return True
1960
        path = '/'.join((account, container, name))
1961
        if self.permissions.public_get(path) is not None:
1962
            return True
1963
        path = self._get_permissions_path(account, container, name)
1964
        if not path:
1965
            raise NotAllowedError
1966
        if (not self.permissions.access_check(path, self.READ, user) and not
1967
                self.permissions.access_check(path, self.WRITE, user)):
1968
            raise NotAllowedError
1969

    
1970
    @check_allowed_paths(action=1)
1971
    def _can_write_object(self, user, account, container, name):
1972
        if user == account:
1973
            return True
1974
        path = '/'.join((account, container, name))
1975
        path = self._get_permissions_path(account, container, name)
1976
        if not path:
1977
            raise NotAllowedError
1978
        if not self.permissions.access_check(path, self.WRITE, user):
1979
            raise NotAllowedError
1980

    
1981
    def _allowed_accounts(self, user):
1982
        allow = set()
1983
        for path in self.permissions.access_list_paths(user):
1984
            p = path.split('/', 1)[0]
1985
            allow.add(p)
1986
        self.read_allowed_paths[user] |= allow
1987
        return sorted(allow)
1988

    
1989
    def _allowed_containers(self, user, account):
1990
        allow = set()
1991
        for path in self.permissions.access_list_paths(user, account):
1992
            p = path.split('/', 2)[1]
1993
            allow.add(p)
1994
        self.read_allowed_paths[user] |= allow
1995
        return sorted(allow)
1996

    
1997
    # Domain functions
1998

    
1999
    @debug_method
2000
    @backend_method
2001
    def get_domain_objects(self, domain, user=None):
2002
        allowed_paths = self.permissions.access_list_paths(
2003
            user, include_owned=user is not None, include_containers=False)
2004
        if not allowed_paths:
2005
            return []
2006
        obj_list = self.node.domain_object_list(
2007
            domain, allowed_paths, CLUSTER_NORMAL)
2008
        return [(path,
2009
                 self._build_metadata(props, user_defined_meta),
2010
                 self.permissions.access_get(path)) for
2011
                path, props, user_defined_meta in obj_list]
2012

    
2013
    # util functions
2014

    
2015
    def _build_metadata(self, props, user_defined=None,
2016
                        include_user_defined=True):
2017
        meta = {'bytes': props[self.SIZE],
2018
                'type': props[self.TYPE],
2019
                'hash': props[self.HASH],
2020
                'version': props[self.SERIAL],
2021
                'version_timestamp': props[self.MTIME],
2022
                'modified_by': props[self.MUSER],
2023
                'uuid': props[self.UUID],
2024
                'checksum': props[self.CHECKSUM]}
2025
        if include_user_defined and user_defined is not None:
2026
            meta.update(user_defined)
2027
        return meta
2028

    
2029
    def _exists(self, node):
2030
        try:
2031
            self._get_version(node)
2032
        except ItemNotExists:
2033
            return False
2034
        else:
2035
            return True
2036

    
2037
    def _unhexlify_hash(self, hash):
2038
        try:
2039
            return binascii.unhexlify(hash)
2040
        except TypeError:
2041
            raise InvalidHash(hash)