Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b20f5e4a

History | View | Annotate | Download (80.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43

    
44
from pithos.workers import glue
45
from archipelago.common import Segment, Xseg_ctx
46
from objpool import ObjectPool
47

    
48

    
49
try:
50
    from astakosclient import AstakosClient
51
except ImportError:
52
    AstakosClient = None
53

    
54
from pithos.backends.base import (
55
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
56
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
57
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
58
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
59
    InvalidHash, IllegalOperationError)
60

    
61

    
62
class DisabledAstakosClient(object):
63
    def __init__(self, *args, **kwargs):
64
        self.args = args
65
        self.kwargs = kwargs
66

    
67
    def __getattr__(self, name):
68
        m = ("AstakosClient has been disabled, "
69
             "yet an attempt to access it was made")
70
        raise AssertionError(m)
71

    
72

    
73
# Stripped-down version of the HashMap class found in tools.
74

    
75
class HashMap(list):
76

    
77
    def __init__(self, blocksize, blockhash):
78
        super(HashMap, self).__init__()
79
        self.blocksize = blocksize
80
        self.blockhash = blockhash
81

    
82
    def _hash_raw(self, v):
83
        h = hashlib.new(self.blockhash)
84
        h.update(v)
85
        return h.digest()
86

    
87
    def hash(self):
88
        if len(self) == 0:
89
            return self._hash_raw('')
90
        if len(self) == 1:
91
            return self.__getitem__(0)
92

    
93
        h = list(self)
94
        s = 2
95
        while s < len(h):
96
            s = s * 2
97
        h += [('\x00' * len(h[0]))] * (s - len(h))
98
        while len(h) > 1:
99
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
100
        return h[0]
101

    
102
# Default modules and settings.
103
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
104
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
105
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
106
DEFAULT_BLOCK_PATH = 'data/'
107
DEFAULT_BLOCK_UMASK = 0o022
108
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
109
DEFAULT_HASH_ALGORITHM = 'sha256'
110
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
111
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
112
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
113
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
114
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
115
                               'abcdefghijklmnopqrstuvwxyz'
116
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
117
DEFAULT_PUBLIC_URL_SECURITY = 16
118
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
119

    
120
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
121
QUEUE_CLIENT_ID = 'pithos'
122
QUEUE_INSTANCE_ID = '1'
123

    
124
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
125

    
126
inf = float('inf')
127

    
128
ULTIMATE_ANSWER = 42
129

    
130
DEFAULT_SOURCE = 'system'
131
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
132

    
133
logger = logging.getLogger(__name__)
134

    
135

    
136
def backend_method(func):
137
    @wraps(func)
138
    def wrapper(self, *args, **kw):
139
        # if we are inside a database transaction
140
        # just proceed with the method execution
141
        # otherwise manage a new transaction
142
        if self.in_transaction:
143
            return func(self, *args, **kw)
144

    
145
        try:
146
            self.pre_exec()
147
            result = func(self, *args, **kw)
148
            success_status = True
149
            return result
150
        except:
151
            success_status = False
152
            raise
153
        finally:
154
            self.post_exec(success_status)
155
    return wrapper
156

    
157

    
158
def debug_method(func):
159
    @wraps(func)
160
    def wrapper(self, *args, **kw):
161
        try:
162
            result = func(self, *args, **kw)
163
            return result
164
        except:
165
            result = format_exc()
166
            raise
167
        finally:
168
            all_args = map(repr, args)
169
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
170
            logger.debug(">>> %s(%s) <<< %s" % (
171
                func.__name__, ', '.join(all_args).rstrip(', '), result))
172
    return wrapper
173

    
174

    
175
def check_allowed_paths(action):
176
    """Decorator for backend methods checking path access granted to user.
177

178
    The 1st argument of the decorated method is expected to be a
179
    ModularBackend instance, the 2nd the user performing the request and
180
    the path join of the rest arguments is supposed to be the requested path.
181

182
    The decorator checks whether the requested path is among the user's allowed
183
    cached paths.
184
    If this is the case, the decorator returns immediately to reduce the
185
    interactions with the database.
186
    Otherwise, it proceeds with the execution of the decorated method and if
187
    the method returns successfully (no exceptions are raised), the requested
188
    path is added to the user's cached allowed paths.
189

190
    :param action: (int) 0 for reads / 1 for writes
191
    :raises NotAllowedError: the user does not have access to the path
192
    """
193
    def decorator(func):
194
        @wraps(func)
195
        def wrapper(self, *args):
196
            user = args[0]
197
            if action == self.READ:
198
                d = self.read_allowed_paths
199
            else:
200
                d = self.write_allowed_paths
201
            path = '/'.join(args[1:])
202
            if path in d.get(user, []):
203
                return  # access is already checked
204
            else:
205
                func(self, *args)   # proceed with access check
206
                d[user].add(path)  # add path in the allowed user paths
207
        return wrapper
208
    return decorator
209

    
210

    
211
def list_method(func):
212
    @wraps(func)
213
    def wrapper(self, *args, **kw):
214
        marker = kw.get('marker')
215
        limit = kw.get('limit')
216
        result = func(self, *args, **kw)
217
        start, limit = self._list_limits(result, marker, limit)
218
        return result[start:start + limit]
219
    return wrapper
220

    
221

    
222
class ModularBackend(BaseBackend):
223
    """A modular backend.
224

225
    Uses modules for SQL functions and storage.
226
    """
227

    
228
    def __init__(self, db_module=None, db_connection=None,
229
                 block_module=None, block_path=None, block_umask=None,
230
                 block_size=None, hash_algorithm=None,
231
                 queue_module=None, queue_hosts=None, queue_exchange=None,
232
                 astakos_auth_url=None, service_token=None,
233
                 astakosclient_poolsize=None,
234
                 free_versioning=True, block_params=None,
235
                 public_url_security=None,
236
                 public_url_alphabet=None,
237
                 account_quota_policy=None,
238
                 container_quota_policy=None,
239
                 container_versioning_policy=None,
240
                 archipelago_conf_file=None,
241
                 xseg_pool_size=8):
242
        db_module = db_module or DEFAULT_DB_MODULE
243
        db_connection = db_connection or DEFAULT_DB_CONNECTION
244
        block_module = block_module or DEFAULT_BLOCK_MODULE
245
        block_path = block_path or DEFAULT_BLOCK_PATH
246
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
247
        block_params = block_params or DEFAULT_BLOCK_PARAMS
248
        block_size = block_size or DEFAULT_BLOCK_SIZE
249
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
250
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
251
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
252
        container_quota_policy = container_quota_policy \
253
            or DEFAULT_CONTAINER_QUOTA
254
        container_versioning_policy = container_versioning_policy \
255
            or DEFAULT_CONTAINER_VERSIONING
256
        archipelago_conf_file = archipelago_conf_file \
257
            or DEFAULT_ARCHIPELAGO_CONF_FILE
258

    
259
        self.default_account_policy = {'quota': account_quota_policy}
260
        self.default_container_policy = {
261
            'quota': container_quota_policy,
262
            'versioning': container_versioning_policy
263
        }
264
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
265
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
266

    
267
        self.public_url_security = (public_url_security or
268
                                    DEFAULT_PUBLIC_URL_SECURITY)
269
        self.public_url_alphabet = (public_url_alphabet or
270
                                    DEFAULT_PUBLIC_URL_ALPHABET)
271

    
272
        self.hash_algorithm = hash_algorithm
273
        self.block_size = block_size
274
        self.free_versioning = free_versioning
275

    
276
        def load_module(m):
277
            __import__(m)
278
            return sys.modules[m]
279

    
280
        self.db_module = load_module(db_module)
281
        self.wrapper = self.db_module.DBWrapper(db_connection)
282
        params = {'wrapper': self.wrapper}
283
        self.permissions = self.db_module.Permissions(**params)
284
        self.config = self.db_module.Config(**params)
285
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
286
        for x in ['READ', 'WRITE']:
287
            setattr(self, x, getattr(self.db_module, x))
288
        self.node = self.db_module.Node(**params)
289
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
290
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
291
                  'MATCH_PREFIX', 'MATCH_EXACT']:
292
            setattr(self, x, getattr(self.db_module, x))
293

    
294
        self.ALLOWED = ['read', 'write']
295

    
296
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
297
                                      cfile=archipelago_conf_file,
298
                                      pool_size=xseg_pool_size)
299
        self.block_module = load_module(block_module)
300
        self.block_params = block_params
301
        params = {'path': block_path,
302
                  'block_size': self.block_size,
303
                  'hash_algorithm': self.hash_algorithm,
304
                  'umask': block_umask}
305
        params.update(self.block_params)
306
        self.store = self.block_module.Store(**params)
307

    
308
        if queue_module and queue_hosts:
309
            self.queue_module = load_module(queue_module)
310
            params = {'hosts': queue_hosts,
311
                      'exchange': queue_exchange,
312
                      'client_id': QUEUE_CLIENT_ID}
313
            self.queue = self.queue_module.Queue(**params)
314
        else:
315
            class NoQueue:
316
                def send(self, *args):
317
                    pass
318

    
319
                def close(self):
320
                    pass
321

    
322
            self.queue = NoQueue()
323

    
324
        self.astakos_auth_url = astakos_auth_url
325
        self.service_token = service_token
326

    
327
        if not astakos_auth_url or not AstakosClient:
328
            self.astakosclient = DisabledAstakosClient(
329
                service_token, astakos_auth_url,
330
                use_pool=True,
331
                pool_size=astakosclient_poolsize)
332
        else:
333
            self.astakosclient = AstakosClient(
334
                service_token, astakos_auth_url,
335
                use_pool=True,
336
                pool_size=astakosclient_poolsize)
337

    
338
        self.serials = []
339
        self.messages = []
340

    
341
        self._move_object = partial(self._copy_object, is_move=True)
342

    
343
        self.lock_container_path = False
344

    
345
        self.in_transaction = False
346

    
347
        self._reset_allowed_paths()
348

    
349
    def pre_exec(self, lock_container_path=False):
350
        self.lock_container_path = lock_container_path
351
        self.wrapper.execute()
352
        self.serials = []
353
        self._reset_allowed_paths()
354
        self.in_transaction = True
355

    
356
    def post_exec(self, success_status=True):
357
        if success_status:
358
            # send messages produced
359
            for m in self.messages:
360
                self.queue.send(*m)
361

    
362
            # register serials
363
            if self.serials:
364
                self.commission_serials.insert_many(
365
                    self.serials)
366

    
367
                # commit to ensure that the serials are registered
368
                # even if resolve commission fails
369
                self.wrapper.commit()
370

    
371
                # start new transaction
372
                self.wrapper.execute()
373

    
374
                r = self.astakosclient.resolve_commissions(
375
                    accept_serials=self.serials,
376
                    reject_serials=[])
377
                self.commission_serials.delete_many(
378
                    r['accepted'])
379

    
380
            self.wrapper.commit()
381
        else:
382
            if self.serials:
383
                r = self.astakosclient.resolve_commissions(
384
                    accept_serials=[],
385
                    reject_serials=self.serials)
386
                self.commission_serials.delete_many(
387
                    r['rejected'])
388
            self.wrapper.rollback()
389
        self.in_transaction = False
390

    
391
    def close(self):
392
        self.wrapper.close()
393
        self.queue.close()
394

    
395
    @property
396
    def using_external_quotaholder(self):
397
        return not isinstance(self.astakosclient, DisabledAstakosClient)
398

    
399
    @debug_method
400
    @backend_method
401
    @list_method
402
    def list_accounts(self, user, marker=None, limit=10000):
403
        """Return a list of accounts the user can access."""
404

    
405
        return self._allowed_accounts(user)
406

    
407
    def _get_account_quotas(self, account):
408
        """Get account usage from astakos."""
409

    
410
        quotas = self.astakosclient.service_get_quotas(account)[account]
411
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
412
                                                  {})
413

    
414
    def _get_account_quotas(self, account):
415
        """Get account usage from astakos."""
416

    
417
        quotas = self.astakosclient.service_get_quotas(account)[account]
418
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
419
                                                  {})
420

    
421
    @debug_method
422
    @backend_method
423
    def get_account_meta(self, user, account, domain, until=None,
424
                         include_user_defined=True):
425
        """Return a dictionary with the account metadata for the domain."""
426

    
427
        self._can_read_account(user, account)
428
        path, node = self._lookup_account(account, user == account)
429
        if user != account:
430
            if until or (node is None):
431
                raise NotAllowedError
432
        try:
433
            props = self._get_properties(node, until)
434
            mtime = props[self.MTIME]
435
        except NameError:
436
            props = None
437
            mtime = until
438
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
439
        tstamp = max(tstamp, mtime)
440
        if until is None:
441
            modified = tstamp
442
        else:
443
            modified = self._get_statistics(
444
                node, compute=True)[2]  # Overall last modification.
445
            modified = max(modified, mtime)
446

    
447
        if user != account:
448
            meta = {'name': account}
449
        else:
450
            meta = {}
451
            if props is not None and include_user_defined:
452
                meta.update(
453
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
454
            if until is not None:
455
                meta.update({'until_timestamp': tstamp})
456
            meta.update({'name': account, 'count': count, 'bytes': bytes})
457
            if self.using_external_quotaholder:
458
                external_quota = self._get_account_quotas(account)
459
                meta['bytes'] = external_quota.get('usage', 0)
460
        meta.update({'modified': modified})
461
        return meta
462

    
463
    @debug_method
464
    @backend_method
465
    def update_account_meta(self, user, account, domain, meta, replace=False):
466
        """Update the metadata associated with the account for the domain."""
467

    
468
        self._can_write_account(user, account)
469
        path, node = self._lookup_account(account, True)
470
        self._put_metadata(user, node, domain, meta, replace,
471
                           update_statistics_ancestors_depth=-1)
472

    
473
    @debug_method
474
    @backend_method
475
    def get_account_groups(self, user, account):
476
        """Return a dictionary with the user groups defined for the account."""
477

    
478
        self._can_read_account(user, account)
479
        if user != account:
480
            return {}
481
        self._lookup_account(account, True)
482
        return self.permissions.group_dict(account)
483

    
484
    @debug_method
485
    @backend_method
486
    def update_account_groups(self, user, account, groups, replace=False):
487
        """Update the groups associated with the account."""
488

    
489
        self._can_write_account(user, account)
490
        self._lookup_account(account, True)
491
        self._check_groups(groups)
492
        if replace:
493
            self.permissions.group_destroy(account)
494
        for k, v in groups.iteritems():
495
            if not replace:  # If not already deleted.
496
                self.permissions.group_delete(account, k)
497
            if v:
498
                self.permissions.group_addmany(account, k, v)
499

    
500
    @debug_method
501
    @backend_method
502
    def get_account_policy(self, user, account):
503
        """Return a dictionary with the account policy."""
504

    
505
        self._can_read_account(user, account)
506
        if user != account:
507
            return {}
508
        path, node = self._lookup_account(account, True)
509
        policy = self._get_policy(node, is_account_policy=True)
510
        if self.using_external_quotaholder:
511
            external_quota = self._get_account_quotas(account)
512
            policy['quota'] = external_quota.get('limit', 0)
513
        return policy
514

    
515
    @debug_method
516
    @backend_method
517
    def update_account_policy(self, user, account, policy, replace=False):
518
        """Update the policy associated with the account."""
519

    
520
        self._can_write_account(user, account)
521
        path, node = self._lookup_account(account, True)
522
        self._check_policy(policy, is_account_policy=True)
523
        self._put_policy(node, policy, replace, is_account_policy=True)
524

    
525
    @debug_method
526
    @backend_method
527
    def put_account(self, user, account, policy=None):
528
        """Create a new account with the given name."""
529

    
530
        policy = policy or {}
531
        self._can_write_account(user, account)
532
        node = self.node.node_lookup(account)
533
        if node is not None:
534
            raise AccountExists('Account already exists')
535
        if policy:
536
            self._check_policy(policy, is_account_policy=True)
537
        node = self._put_path(user, self.ROOTNODE, account,
538
                              update_statistics_ancestors_depth=-1)
539
        self._put_policy(node, policy, True, is_account_policy=True)
540

    
541
    @debug_method
542
    @backend_method
543
    def delete_account(self, user, account):
544
        """Delete the account with the given name."""
545

    
546
        self._can_write_account(user, account)
547
        node = self.node.node_lookup(account)
548
        if node is None:
549
            return
550
        if not self.node.node_remove(node,
551
                                     update_statistics_ancestors_depth=-1):
552
            raise AccountNotEmpty('Account is not empty')
553
        self.permissions.group_destroy(account)
554

    
555
        # remove all the cached allowed paths
556
        # removing the specific path could be more expensive
557
        self._reset_allowed_paths()
558

    
559
    @debug_method
560
    @backend_method
561
    @list_method
562
    def list_containers(self, user, account, marker=None, limit=10000,
563
                        shared=False, until=None, public=False):
564
        """Return a list of containers existing under an account."""
565

    
566
        self._can_read_account(user, account)
567
        if user != account:
568
            if until:
569
                raise NotAllowedError
570
            return self._allowed_containers(user, account)
571
        if shared or public:
572
            allowed = set()
573
            if shared:
574
                allowed.update([x.split('/', 2)[1] for x in
575
                               self.permissions.access_list_shared(account)])
576
            if public:
577
                allowed.update([x[0].split('/', 2)[1] for x in
578
                               self.permissions.public_list(account)])
579
            return sorted(allowed)
580
        node = self.node.node_lookup(account)
581
        return [x[0] for x in self._list_object_properties(
582
            node, account, '', '/', marker, limit, False, None, [], until)]
583

    
584
    @debug_method
585
    @backend_method
586
    def list_container_meta(self, user, account, container, domain,
587
                            until=None):
588
        """Return a list of the container's object meta keys for a domain."""
589

    
590
        self._can_read_container(user, account, container)
591
        allowed = []
592
        if user != account:
593
            if until:
594
                raise NotAllowedError
595
        path, node = self._lookup_container(account, container)
596
        before = until if until is not None else inf
597
        allowed = self._get_formatted_paths(allowed)
598
        return self.node.latest_attribute_keys(node, domain, before,
599
                                               CLUSTER_DELETED, allowed)
600

    
601
    @debug_method
602
    @backend_method
603
    def get_container_meta(self, user, account, container, domain, until=None,
604
                           include_user_defined=True):
605
        """Return a dictionary with the container metadata for the domain."""
606

    
607
        self._can_read_container(user, account, container)
608
        if user != account:
609
            if until:
610
                raise NotAllowedError
611
        path, node = self._lookup_container(account, container)
612
        props = self._get_properties(node, until)
613
        mtime = props[self.MTIME]
614
        count, bytes, tstamp = self._get_statistics(node, until)
615
        tstamp = max(tstamp, mtime)
616
        if until is None:
617
            modified = tstamp
618
        else:
619
            modified = self._get_statistics(
620
                node)[2]  # Overall last modification.
621
            modified = max(modified, mtime)
622

    
623
        if user != account:
624
            meta = {'name': container}
625
        else:
626
            meta = {}
627
            if include_user_defined:
628
                meta.update(
629
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
630
            if until is not None:
631
                meta.update({'until_timestamp': tstamp})
632
            meta.update({'name': container, 'count': count, 'bytes': bytes})
633
        meta.update({'modified': modified})
634
        return meta
635

    
636
    @debug_method
637
    @backend_method
638
    def update_container_meta(self, user, account, container, domain, meta,
639
                              replace=False):
640
        """Update the metadata associated with the container for the domain."""
641

    
642
        self._can_write_container(user, account, container)
643
        path, node = self._lookup_container(account, container)
644
        src_version_id, dest_version_id = self._put_metadata(
645
            user, node, domain, meta, replace,
646
            update_statistics_ancestors_depth=0)
647
        if src_version_id is not None:
648
            versioning = self._get_policy(
649
                node, is_account_policy=False)['versioning']
650
            if versioning != 'auto':
651
                self.node.version_remove(src_version_id,
652
                                         update_statistics_ancestors_depth=0)
653

    
654
    @debug_method
655
    @backend_method
656
    def get_container_policy(self, user, account, container):
657
        """Return a dictionary with the container policy."""
658

    
659
        self._can_read_container(user, account, container)
660
        if user != account:
661
            return {}
662
        path, node = self._lookup_container(account, container)
663
        return self._get_policy(node, is_account_policy=False)
664

    
665
    @debug_method
666
    @backend_method
667
    def update_container_policy(self, user, account, container, policy,
668
                                replace=False):
669
        """Update the policy associated with the container."""
670

    
671
        self._can_write_container(user, account, container)
672
        path, node = self._lookup_container(account, container)
673
        self._check_policy(policy, is_account_policy=False)
674
        self._put_policy(node, policy, replace, is_account_policy=False)
675

    
676
    @debug_method
677
    @backend_method
678
    def put_container(self, user, account, container, policy=None):
679
        """Create a new container with the given name."""
680

    
681
        policy = policy or {}
682
        self._can_write_container(user, account, container)
683
        try:
684
            path, node = self._lookup_container(account, container)
685
        except NameError:
686
            pass
687
        else:
688
            raise ContainerExists('Container already exists')
689
        if policy:
690
            self._check_policy(policy, is_account_policy=False)
691
        path = '/'.join((account, container))
692
        node = self._put_path(
693
            user, self._lookup_account(account, True)[1], path,
694
            update_statistics_ancestors_depth=-1)
695
        self._put_policy(node, policy, True, is_account_policy=False)
696

    
697
    @debug_method
698
    @backend_method
699
    def delete_container(self, user, account, container, until=None, prefix='',
700
                         delimiter=None):
701
        """Delete/purge the container with the given name."""
702

    
703
        self._can_write_container(user, account, container)
704
        path, node = self._lookup_container(account, container)
705

    
706
        if until is not None:
707
            hashes, size, serials = self.node.node_purge_children(
708
                node, until, CLUSTER_HISTORY,
709
                update_statistics_ancestors_depth=0)
710
            for h in hashes:
711
                self.store.map_delete(h)
712
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
713
                                          update_statistics_ancestors_depth=0)
714
            if not self.free_versioning:
715
                self._report_size_change(
716
                    user, account, -size, {
717
                        'action': 'container purge',
718
                        'path': path,
719
                        'versions': ','.join(str(i) for i in serials)
720
                    }
721
                )
722
            return
723

    
724
        if not delimiter:
725
            if self._get_statistics(node)[0] > 0:
726
                raise ContainerNotEmpty('Container is not empty')
727
            hashes, size, serials = self.node.node_purge_children(
728
                node, inf, CLUSTER_HISTORY,
729
                update_statistics_ancestors_depth=0)
730
            for h in hashes:
731
                self.store.map_delete(h)
732
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
733
                                          update_statistics_ancestors_depth=0)
734
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
735
            if not self.free_versioning:
736
                self._report_size_change(
737
                    user, account, -size, {
738
                        'action': 'container purge',
739
                        'path': path,
740
                        'versions': ','.join(str(i) for i in serials)
741
                    }
742
                )
743
        else:
744
            # remove only contents
745
            src_names = self._list_objects_no_limit(
746
                user, account, container, prefix='', delimiter=None,
747
                virtual=False, domain=None, keys=[], shared=False, until=None,
748
                size_range=None, all_props=True, public=False)
749
            paths = []
750
            for t in src_names:
751
                path = '/'.join((account, container, t[0]))
752
                node = t[2]
753
                if not self._exists(node):
754
                    continue
755
                src_version_id, dest_version_id = self._put_version_duplicate(
756
                    user, node, size=0, type='', hash=None, checksum='',
757
                    cluster=CLUSTER_DELETED,
758
                    update_statistics_ancestors_depth=1)
759
                del_size = self._apply_versioning(
760
                    account, container, src_version_id,
761
                    update_statistics_ancestors_depth=1)
762
                self._report_size_change(
763
                    user, account, -del_size, {
764
                        'action': 'object delete',
765
                        'path': path,
766
                        'versions': ','.join([str(dest_version_id)])})
767
                self._report_object_change(
768
                    user, account, path, details={'action': 'object delete'})
769
                paths.append(path)
770
            self.permissions.access_clear_bulk(paths)
771

    
772
        # remove all the cached allowed paths
773
        # removing the specific path could be more expensive
774
        self._reset_allowed_paths()
775

    
776
    def _list_objects(self, user, account, container, prefix, delimiter,
777
                      marker, limit, virtual, domain, keys, shared, until,
778
                      size_range, all_props, public):
779
        if user != account and until:
780
            raise NotAllowedError
781

    
782
        objects = []
783
        if shared and public:
784
            # get shared first
785
            shared_paths = self._list_object_permissions(
786
                user, account, container, prefix, shared=True, public=False)
787
            if shared_paths:
788
                path, node = self._lookup_container(account, container)
789
                shared_paths = self._get_formatted_paths(shared_paths)
790
                objects = set(self._list_object_properties(
791
                    node, path, prefix, delimiter, marker, limit, virtual,
792
                    domain, keys, until, size_range, shared_paths, all_props))
793

    
794
            # get public
795
            objects |= set(self._list_public_object_properties(
796
                user, account, container, prefix, all_props))
797
            objects = list(objects)
798

    
799
            objects.sort(key=lambda x: x[0])
800
        elif public:
801
            objects = self._list_public_object_properties(
802
                user, account, container, prefix, all_props)
803
        else:
804
            allowed = self._list_object_permissions(
805
                user, account, container, prefix, shared, public=False)
806
            if shared and not allowed:
807
                return []
808
            path, node = self._lookup_container(account, container)
809
            allowed = self._get_formatted_paths(allowed)
810
            objects = self._list_object_properties(
811
                node, path, prefix, delimiter, marker, limit, virtual, domain,
812
                keys, until, size_range, allowed, all_props)
813

    
814
        # apply limits
815
        start, limit = self._list_limits(objects, marker, limit)
816
        return objects[start:start + limit]
817

    
818
    def _list_public_object_properties(self, user, account, container, prefix,
819
                                       all_props):
820
        public = self._list_object_permissions(
821
            user, account, container, prefix, shared=False, public=True)
822
        paths, nodes = self._lookup_objects(public)
823
        path = '/'.join((account, container))
824
        cont_prefix = path + '/'
825
        paths = [x[len(cont_prefix):] for x in paths]
826
        objects = [(p,) + props for p, props in
827
                   zip(paths, self.node.version_lookup_bulk(
828
                       nodes, all_props=all_props, order_by_path=True))]
829
        return objects
830

    
831
    def _list_objects_no_limit(self, user, account, container, prefix,
832
                               delimiter, virtual, domain, keys, shared, until,
833
                               size_range, all_props, public):
834
        objects = []
835
        while True:
836
            marker = objects[-1] if objects else None
837
            limit = 10000
838
            l = self._list_objects(
839
                user, account, container, prefix, delimiter, marker, limit,
840
                virtual, domain, keys, shared, until, size_range, all_props,
841
                public)
842
            objects.extend(l)
843
            if not l or len(l) < limit:
844
                break
845
        return objects
846

    
847
    def _list_object_permissions(self, user, account, container, prefix,
848
                                 shared, public):
849
        allowed = []
850
        path = '/'.join((account, container, prefix)).rstrip('/')
851
        if user != account:
852
            allowed = self.permissions.access_list_paths(user, path)
853
            if not allowed:
854
                raise NotAllowedError
855
        else:
856
            allowed = set()
857
            if shared:
858
                allowed.update(self.permissions.access_list_shared(path))
859
            if public:
860
                allowed.update(
861
                    [x[0] for x in self.permissions.public_list(path)])
862
            allowed = sorted(allowed)
863
            if not allowed:
864
                return []
865
        return allowed
866

    
867
    @debug_method
868
    @backend_method
869
    def list_objects(self, user, account, container, prefix='', delimiter=None,
870
                     marker=None, limit=10000, virtual=True, domain=None,
871
                     keys=None, shared=False, until=None, size_range=None,
872
                     public=False):
873
        """List (object name, object version_id) under a container."""
874

    
875
        keys = keys or []
876
        return self._list_objects(
877
            user, account, container, prefix, delimiter, marker, limit,
878
            virtual, domain, keys, shared, until, size_range, False, public)
879

    
880
    @debug_method
881
    @backend_method
882
    def list_object_meta(self, user, account, container, prefix='',
883
                         delimiter=None, marker=None, limit=10000,
884
                         virtual=True, domain=None, keys=None, shared=False,
885
                         until=None, size_range=None, public=False):
886
        """Return a list of metadata dicts of objects under a container."""
887

    
888
        keys = keys or []
889
        props = self._list_objects(
890
            user, account, container, prefix, delimiter, marker, limit,
891
            virtual, domain, keys, shared, until, size_range, True, public)
892
        objects = []
893
        for p in props:
894
            if len(p) == 2:
895
                objects.append({'subdir': p[0]})
896
            else:
897
                objects.append({
898
                    'name': p[0],
899
                    'bytes': p[self.SIZE + 1],
900
                    'type': p[self.TYPE + 1],
901
                    'hash': p[self.HASH + 1],
902
                    'version': p[self.SERIAL + 1],
903
                    'version_timestamp': p[self.MTIME + 1],
904
                    'modified': p[self.MTIME + 1] if until is None else None,
905
                    'modified_by': p[self.MUSER + 1],
906
                    'uuid': p[self.UUID + 1],
907
                    'checksum': p[self.CHECKSUM + 1]})
908
        return objects
909

    
910
    @debug_method
911
    @backend_method
912
    def list_object_permissions(self, user, account, container, prefix=''):
913
        """Return a list of paths enforce permissions under a container."""
914

    
915
        return self._list_object_permissions(user, account, container, prefix,
916
                                             True, False)
917

    
918
    @debug_method
919
    @backend_method
920
    def list_object_public(self, user, account, container, prefix=''):
921
        """Return a mapping of object paths to public ids under a container."""
922

    
923
        public = {}
924
        for path, p in self.permissions.public_list('/'.join((account,
925
                                                              container,
926
                                                              prefix))):
927
            public[path] = p
928
        return public
929

    
930
    @debug_method
931
    @backend_method
932
    def get_object_meta(self, user, account, container, name, domain,
933
                        version=None, include_user_defined=True):
934
        """Return a dictionary with the object metadata for the domain."""
935

    
936
        self._can_read_object(user, account, container, name)
937
        path, node = self._lookup_object(account, container, name)
938
        props = self._get_version(node, version)
939
        if version is None:
940
            modified = props[self.MTIME]
941
        else:
942
            try:
943
                modified = self._get_version(
944
                    node)[self.MTIME]  # Overall last modification.
945
            except NameError:  # Object may be deleted.
946
                del_props = self.node.version_lookup(
947
                    node, inf, CLUSTER_DELETED)
948
                if del_props is None:
949
                    raise ItemNotExists('Object does not exist')
950
                modified = del_props[self.MTIME]
951

    
952
        meta = {}
953
        if include_user_defined:
954
            meta.update(
955
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
956
        meta.update({'name': name,
957
                     'bytes': props[self.SIZE],
958
                     'type': props[self.TYPE],
959
                     'hash': props[self.HASH],
960
                     'version': props[self.SERIAL],
961
                     'version_timestamp': props[self.MTIME],
962
                     'modified': modified,
963
                     'modified_by': props[self.MUSER],
964
                     'uuid': props[self.UUID],
965
                     'checksum': props[self.CHECKSUM]})
966
        return meta
967

    
968
    @debug_method
969
    @backend_method
970
    def update_object_meta(self, user, account, container, name, domain, meta,
971
                           replace=False):
972
        """Update object metadata for a domain and return the new version."""
973

    
974
        self._can_write_object(user, account, container, name)
975

    
976
        path, node = self._lookup_object(account, container, name,
977
                                         lock_container=True)
978
        src_version_id, dest_version_id = self._put_metadata(
979
            user, node, domain, meta, replace,
980
            update_statistics_ancestors_depth=1)
981
        self._apply_versioning(account, container, src_version_id,
982
                               update_statistics_ancestors_depth=1)
983
        return dest_version_id
984

    
985
    @debug_method
986
    @backend_method
987
    def get_object_permissions_bulk(self, user, account, container, names):
988
        """Return the action allowed on the object, the path
989
        from which the object gets its permissions from,
990
        along with a dictionary containing the permissions."""
991

    
992
        permissions_path = self._get_permissions_path_bulk(account, container,
993
                                                           names)
994
        access_objects = self.permissions.access_check_bulk(permissions_path,
995
                                                            user)
996
        #group_parents = access_objects['group_parents']
997
        nobject_permissions = {}
998
        cpath = '/'.join((account, container, ''))
999
        cpath_idx = len(cpath)
1000
        for path in permissions_path:
1001
            allowed = 1
1002
            name = path[cpath_idx:]
1003
            if user != account:
1004
                try:
1005
                    allowed = access_objects[path]
1006
                except KeyError:
1007
                    raise NotAllowedError
1008
            access_dict, allowed = \
1009
                self.permissions.access_get_for_bulk(access_objects[path])
1010
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1011
                                         access_dict)
1012
        self._lookup_objects(permissions_path)
1013
        return nobject_permissions
1014

    
1015
    @debug_method
1016
    @backend_method
1017
    def get_object_permissions(self, user, account, container, name):
1018
        """Return the action allowed on the object, the path
1019
        from which the object gets its permissions from,
1020
        along with a dictionary containing the permissions."""
1021

    
1022
        allowed = 'write'
1023
        permissions_path = self._get_permissions_path(account, container, name)
1024
        if user != account:
1025
            if self.permissions.access_check(permissions_path, self.WRITE,
1026
                                             user):
1027
                allowed = 'write'
1028
            elif self.permissions.access_check(permissions_path, self.READ,
1029
                                               user):
1030
                allowed = 'read'
1031
            else:
1032
                raise NotAllowedError
1033
        self._lookup_object(account, container, name)
1034
        return (allowed,
1035
                permissions_path,
1036
                self.permissions.access_get(permissions_path))
1037

    
1038
    @debug_method
1039
    @backend_method
1040
    def update_object_permissions(self, user, account, container, name,
1041
                                  permissions):
1042
        """Update the permissions associated with the object."""
1043

    
1044
        if user != account:
1045
            raise NotAllowedError
1046
        path = self._lookup_object(account, container, name,
1047
                                   lock_container=True)[0]
1048
        self._check_permissions(path, permissions)
1049
        try:
1050
            self.permissions.access_set(path, permissions)
1051
        except:
1052
            raise ValueError
1053
        else:
1054
            self._report_sharing_change(user, account, path, {'members':
1055
                                        self.permissions.access_members(path)})
1056

    
1057
        # remove all the cached allowed paths
1058
        # filtering out only those affected could be more expensive
1059
        self._reset_allowed_paths()
1060

    
1061
    @debug_method
1062
    @backend_method
1063
    def get_object_public(self, user, account, container, name):
1064
        """Return the public id of the object if applicable."""
1065

    
1066
        self._can_read_object(user, account, container, name)
1067
        path = self._lookup_object(account, container, name)[0]
1068
        p = self.permissions.public_get(path)
1069
        return p
1070

    
1071
    @debug_method
1072
    @backend_method
1073
    def update_object_public(self, user, account, container, name, public):
1074
        """Update the public status of the object."""
1075

    
1076
        self._can_write_object(user, account, container, name)
1077
        path = self._lookup_object(account, container, name,
1078
                                   lock_container=True)[0]
1079
        if not public:
1080
            self.permissions.public_unset(path)
1081
        else:
1082
            self.permissions.public_set(
1083
                path, self.public_url_security, self.public_url_alphabet)
1084

    
1085
    @debug_method
1086
    @backend_method
1087
    def get_object_hashmap(self, user, account, container, name, version=None):
1088
        """Return the object's size and a list with partial hashes."""
1089

    
1090
        self._can_read_object(user, account, container, name)
1091
        path, node = self._lookup_object(account, container, name)
1092
        props = self._get_version(node, version)
1093
        if props[self.HASH] is None:
1094
            return 0, ()
1095
        if props[self.HASH].startswith('archip:'):
1096
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1097
                                                     props[self.SIZE])
1098
            return props[self.SIZE], [x for x in hashmap]
1099
        else:
1100
            hashmap = self.store.map_get(self._unhexlify_hash(
1101
                props[self.HASH]))
1102
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1103

    
1104
    def _update_object_hash(self, user, account, container, name, size, type,
1105
                            hash, checksum, domain, meta, replace_meta,
1106
                            permissions, src_node=None, src_version_id=None,
1107
                            is_copy=False, report_size_change=True):
1108
        if permissions is not None and user != account:
1109
            raise NotAllowedError
1110
        self._can_write_object(user, account, container, name)
1111
        if permissions is not None:
1112
            path = '/'.join((account, container, name))
1113
            self._check_permissions(path, permissions)
1114

    
1115
        account_path, account_node = self._lookup_account(account, True)
1116
        container_path, container_node = self._lookup_container(
1117
            account, container)
1118

    
1119
        path, node = self._put_object_node(
1120
            container_path, container_node, name)
1121
        pre_version_id, dest_version_id = self._put_version_duplicate(
1122
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1123
            checksum=checksum, is_copy=is_copy,
1124
            update_statistics_ancestors_depth=1)
1125

    
1126
        # Handle meta.
1127
        if src_version_id is None:
1128
            src_version_id = pre_version_id
1129
        self._put_metadata_duplicate(
1130
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1131

    
1132
        del_size = self._apply_versioning(account, container, pre_version_id,
1133
                                          update_statistics_ancestors_depth=1)
1134
        size_delta = size - del_size
1135
        if size_delta > 0:
1136
            # Check account quota.
1137
            if not self.using_external_quotaholder:
1138
                account_quota = long(self._get_policy(
1139
                    account_node, is_account_policy=True)['quota'])
1140
                account_usage = self._get_statistics(account_node,
1141
                                                     compute=True)[1]
1142
                if (account_quota > 0 and account_usage > account_quota):
1143
                    raise QuotaError(
1144
                        'Account quota exceeded: limit: %s, usage: %s' % (
1145
                            account_quota, account_usage))
1146

    
1147
            # Check container quota.
1148
            container_quota = long(self._get_policy(
1149
                container_node, is_account_policy=False)['quota'])
1150
            container_usage = self._get_statistics(container_node)[1]
1151
            if (container_quota > 0 and container_usage > container_quota):
1152
                # This must be executed in a transaction, so the version is
1153
                # never created if it fails.
1154
                raise QuotaError(
1155
                    'Container quota exceeded: limit: %s, usage: %s' % (
1156
                        container_quota, container_usage
1157
                    )
1158
                )
1159

    
1160
        if report_size_change:
1161
            self._report_size_change(
1162
                user, account, size_delta,
1163
                {'action': 'object update', 'path': path,
1164
                 'versions': ','.join([str(dest_version_id)])})
1165
        if permissions is not None:
1166
            self.permissions.access_set(path, permissions)
1167
            self._report_sharing_change(
1168
                user, account, path,
1169
                {'members': self.permissions.access_members(path)})
1170

    
1171
        self._report_object_change(
1172
            user, account, path,
1173
            details={'version': dest_version_id, 'action': 'object update'})
1174
        return dest_version_id
1175

    
1176
    @debug_method
1177
    @backend_method
1178
    def register_object_map(self, user, account, container, name, size, type,
1179
                            mapfile, checksum='', domain='pithos', meta=None,
1180
                            replace_meta=False, permissions=None):
1181
        """Register an object mapfile without providing any data.
1182

1183
        Lock the container path, create a node pointing to the object path,
1184
        create a version pointing to the mapfile
1185
        and issue the size change in the quotaholder.
1186

1187
        :param user: the user account which performs the action
1188

1189
        :param account: the account under which the object resides
1190

1191
        :param container: the container under which the object resides
1192

1193
        :param name: the object name
1194

1195
        :param size: the object size
1196

1197
        :param type: the object mimetype
1198

1199
        :param mapfile: the mapfile pointing to the object data
1200

1201
        :param checkcum: the md5 checksum (optional)
1202

1203
        :param domain: the object domain
1204

1205
        :param meta: a dict with custom object metadata
1206

1207
        :param replace_meta: replace existing metadata or not
1208

1209
        :param permissions: a dict with the read and write object permissions
1210

1211
        :returns: the new object uuid
1212

1213
        :raises: ItemNotExists, NotAllowedError, QuotaError
1214
        """
1215

    
1216
        meta = meta or {}
1217
        try:
1218
            self.lock_container_path = True
1219
            self.put_container(user, account, container, policy=None)
1220
        except ContainerExists:
1221
            pass
1222
        finally:
1223
            self.lock_container_path = False
1224
        dest_version_id = self._update_object_hash(
1225
            user, account, container, name, size, type, mapfile, checksum,
1226
            domain, meta, replace_meta, permissions)
1227
        return self.node.version_get_properties(dest_version_id,
1228
                                                keys=('uuid',))[0]
1229

    
1230
    @debug_method
1231
    def update_object_hashmap(self, user, account, container, name, size, type,
1232
                              hashmap, checksum, domain, meta=None,
1233
                              replace_meta=False, permissions=None):
1234
        """Create/update an object's hashmap and return the new version."""
1235

    
1236
        for h in hashmap:
1237
            if h.startswith('archip_'):
1238
                raise IllegalOperationError(
1239
                    'Cannot update Archipelago Volume hashmap.')
1240
        meta = meta or {}
1241
        if size == 0:  # No such thing as an empty hashmap.
1242
            hashmap = [self.put_block('')]
1243
        map = HashMap(self.block_size, self.hash_algorithm)
1244
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1245
        missing = self.store.block_search(map)
1246
        if missing:
1247
            ie = IndexError()
1248
            ie.data = [binascii.hexlify(x) for x in missing]
1249
            raise ie
1250

    
1251
        hash = map.hash()
1252
        hexlified = binascii.hexlify(hash)
1253
        # _update_object_hash() locks destination path
1254
        dest_version_id = self._update_object_hash(
1255
            user, account, container, name, size, type, hexlified, checksum,
1256
            domain, meta, replace_meta, permissions)
1257
        self.store.map_put(hash, map)
1258
        return dest_version_id, hexlified
1259

    
1260
    @debug_method
1261
    @backend_method
1262
    def update_object_checksum(self, user, account, container, name, version,
1263
                               checksum):
1264
        """Update an object's checksum."""
1265

    
1266
        # Update objects with greater version and same hashmap
1267
        # and size (fix metadata updates).
1268
        self._can_write_object(user, account, container, name)
1269
        path, node = self._lookup_object(account, container, name,
1270
                                         lock_container=True)
1271
        props = self._get_version(node, version)
1272
        versions = self.node.node_get_versions(node)
1273
        for x in versions:
1274
            if (x[self.SERIAL] >= int(version) and
1275
                x[self.HASH] == props[self.HASH] and
1276
                    x[self.SIZE] == props[self.SIZE]):
1277
                self.node.version_put_property(
1278
                    x[self.SERIAL], 'checksum', checksum)
1279

    
1280
    def _copy_object(self, user, src_account, src_container, src_name,
1281
                     dest_account, dest_container, dest_name, type,
1282
                     dest_domain=None, dest_meta=None, replace_meta=False,
1283
                     permissions=None, src_version=None, is_move=False,
1284
                     delimiter=None):
1285

    
1286
        report_size_change = not is_move
1287
        dest_meta = dest_meta or {}
1288
        dest_version_ids = []
1289
        self._can_read_object(user, src_account, src_container, src_name)
1290

    
1291
        src_container_path = '/'.join((src_account, src_container))
1292
        dest_container_path = '/'.join((dest_account, dest_container))
1293
        # Lock container paths in alphabetical order
1294
        if src_container_path < dest_container_path:
1295
            self._lookup_container(src_account, src_container)
1296
            self._lookup_container(dest_account, dest_container)
1297
        else:
1298
            self._lookup_container(dest_account, dest_container)
1299
            self._lookup_container(src_account, src_container)
1300

    
1301
        path, node = self._lookup_object(src_account, src_container, src_name)
1302
        # TODO: Will do another fetch of the properties in duplicate version...
1303
        props = self._get_version(
1304
            node, src_version)  # Check to see if source exists.
1305
        src_version_id = props[self.SERIAL]
1306
        hash = props[self.HASH]
1307
        size = props[self.SIZE]
1308
        is_copy = not is_move and (src_account, src_container, src_name) != (
1309
            dest_account, dest_container, dest_name)  # New uuid.
1310
        dest_version_ids.append(self._update_object_hash(
1311
            user, dest_account, dest_container, dest_name, size, type, hash,
1312
            None, dest_domain, dest_meta, replace_meta, permissions,
1313
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1314
            report_size_change=report_size_change))
1315
        if is_move and ((src_account, src_container, src_name) !=
1316
                        (dest_account, dest_container, dest_name)):
1317
            self._delete_object(user, src_account, src_container, src_name,
1318
                                report_size_change=report_size_change)
1319

    
1320
        if delimiter:
1321
            prefix = (src_name + delimiter if not
1322
                      src_name.endswith(delimiter) else src_name)
1323
            src_names = self._list_objects_no_limit(
1324
                user, src_account, src_container, prefix, delimiter=None,
1325
                virtual=False, domain=None, keys=[], shared=False, until=None,
1326
                size_range=None, all_props=True, public=False)
1327
            src_names.sort(key=lambda x: x[2])  # order by nodes
1328
            paths = [elem[0] for elem in src_names]
1329
            nodes = [elem[2] for elem in src_names]
1330
            # TODO: Will do another fetch of the properties
1331
            # in duplicate version...
1332
            props = self._get_versions(nodes)  # Check to see if source exists.
1333

    
1334
            for prop, path, node in zip(props, paths, nodes):
1335
                src_version_id = prop[self.SERIAL]
1336
                hash = prop[self.HASH]
1337
                vtype = prop[self.TYPE]
1338
                size = prop[self.SIZE]
1339
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1340
                    delimiter) else dest_name
1341
                vdest_name = path.replace(prefix, dest_prefix, 1)
1342
                # _update_object_hash() locks destination path
1343
                dest_version_ids.append(self._update_object_hash(
1344
                    user, dest_account, dest_container, vdest_name, size,
1345
                    vtype, hash, None, dest_domain, meta={},
1346
                    replace_meta=False, permissions=None, src_node=node,
1347
                    src_version_id=src_version_id, is_copy=is_copy,
1348
                    report_size_change=report_size_change))
1349
                if is_move and ((src_account, src_container, src_name) !=
1350
                                (dest_account, dest_container, dest_name)):
1351
                    self._delete_object(user, src_account, src_container, path,
1352
                                        report_size_change=report_size_change)
1353
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1354
                dest_version_ids)
1355

    
1356
    @debug_method
1357
    @backend_method
1358
    def copy_object(self, user, src_account, src_container, src_name,
1359
                    dest_account, dest_container, dest_name, type, domain,
1360
                    meta=None, replace_meta=False, permissions=None,
1361
                    src_version=None, delimiter=None):
1362
        """Copy an object's data and metadata."""
1363

    
1364
        meta = meta or {}
1365
        dest_version_id = self._copy_object(
1366
            user, src_account, src_container, src_name, dest_account,
1367
            dest_container, dest_name, type, domain, meta, replace_meta,
1368
            permissions, src_version, False, delimiter)
1369
        return dest_version_id
1370

    
1371
    @debug_method
1372
    @backend_method
1373
    def move_object(self, user, src_account, src_container, src_name,
1374
                    dest_account, dest_container, dest_name, type, domain,
1375
                    meta=None, replace_meta=False, permissions=None,
1376
                    delimiter=None):
1377
        """Move an object's data and metadata."""
1378

    
1379
        meta = meta or {}
1380
        if user != src_account:
1381
            raise NotAllowedError
1382
        dest_version_id = self._move_object(
1383
            user, src_account, src_container, src_name, dest_account,
1384
            dest_container, dest_name, type, domain, meta, replace_meta,
1385
            permissions, None, delimiter=delimiter)
1386
        return dest_version_id
1387

    
1388
    def _delete_object(self, user, account, container, name, until=None,
1389
                       delimiter=None, report_size_change=True):
1390
        if user != account:
1391
            raise NotAllowedError
1392

    
1393
        # lookup object and lock container path also
1394
        path, node = self._lookup_object(account, container, name,
1395
                                         lock_container=True)
1396

    
1397
        if until is not None:
1398
            if node is None:
1399
                return
1400
            hashes = []
1401
            size = 0
1402
            serials = []
1403
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1404
                                           update_statistics_ancestors_depth=1)
1405
            hashes += h
1406
            size += s
1407
            serials += v
1408
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1409
                                           update_statistics_ancestors_depth=1)
1410
            hashes += h
1411
            if not self.free_versioning:
1412
                size += s
1413
            serials += v
1414
            for h in hashes:
1415
                self.store.map_delete(h)
1416
            self.node.node_purge(node, until, CLUSTER_DELETED,
1417
                                 update_statistics_ancestors_depth=1)
1418
            try:
1419
                self._get_version(node)
1420
            except NameError:
1421
                self.permissions.access_clear(path)
1422
            self._report_size_change(
1423
                user, account, -size, {
1424
                    'action': 'object purge',
1425
                    'path': path,
1426
                    'versions': ','.join(str(i) for i in serials)
1427
                }
1428
            )
1429
            return
1430

    
1431
        if not self._exists(node):
1432
            raise ItemNotExists('Object is deleted.')
1433

    
1434
        src_version_id, dest_version_id = self._put_version_duplicate(
1435
            user, node, size=0, type='', hash=None, checksum='',
1436
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1437
        del_size = self._apply_versioning(account, container, src_version_id,
1438
                                          update_statistics_ancestors_depth=1)
1439
        if report_size_change:
1440
            self._report_size_change(
1441
                user, account, -del_size,
1442
                {'action': 'object delete',
1443
                 'path': path,
1444
                 'versions': ','.join([str(dest_version_id)])})
1445
        self._report_object_change(
1446
            user, account, path, details={'action': 'object delete'})
1447
        self.permissions.access_clear(path)
1448

    
1449
        if delimiter:
1450
            prefix = name + delimiter if not name.endswith(delimiter) else name
1451
            src_names = self._list_objects_no_limit(
1452
                user, account, container, prefix, delimiter=None,
1453
                virtual=False, domain=None, keys=[], shared=False, until=None,
1454
                size_range=None, all_props=True, public=False)
1455
            paths = []
1456
            for t in src_names:
1457
                path = '/'.join((account, container, t[0]))
1458
                node = t[2]
1459
                if not self._exists(node):
1460
                    continue
1461
                src_version_id, dest_version_id = self._put_version_duplicate(
1462
                    user, node, size=0, type='', hash=None, checksum='',
1463
                    cluster=CLUSTER_DELETED,
1464
                    update_statistics_ancestors_depth=1)
1465
                del_size = self._apply_versioning(
1466
                    account, container, src_version_id,
1467
                    update_statistics_ancestors_depth=1)
1468
                if report_size_change:
1469
                    self._report_size_change(
1470
                        user, account, -del_size,
1471
                        {'action': 'object delete',
1472
                         'path': path,
1473
                         'versions': ','.join([str(dest_version_id)])})
1474
                self._report_object_change(
1475
                    user, account, path, details={'action': 'object delete'})
1476
                paths.append(path)
1477
            self.permissions.access_clear_bulk(paths)
1478

    
1479
        # remove all the cached allowed paths
1480
        # removing the specific path could be more expensive
1481
        self._reset_allowed_paths()
1482

    
1483
    @debug_method
1484
    @backend_method
1485
    def delete_object(self, user, account, container, name, until=None,
1486
                      prefix='', delimiter=None):
1487
        """Delete/purge an object."""
1488

    
1489
        self._delete_object(user, account, container, name, until, delimiter)
1490

    
1491
    @debug_method
1492
    @backend_method
1493
    def list_versions(self, user, account, container, name):
1494
        """Return a list of all object (version, version_timestamp) tuples."""
1495

    
1496
        self._can_read_object(user, account, container, name)
1497
        path, node = self._lookup_object(account, container, name)
1498
        versions = self.node.node_get_versions(node)
1499
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1500
                x[self.CLUSTER] != CLUSTER_DELETED]
1501

    
1502
    @debug_method
1503
    @backend_method
1504
    def get_uuid(self, user, uuid, check_permissions=True):
1505
        """Return the (account, container, name) for the UUID given."""
1506

    
1507
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1508
        if info is None:
1509
            raise NameError
1510
        path, serial = info
1511
        account, container, name = path.split('/', 2)
1512
        if check_permissions:
1513
            self._can_read_object(user, account, container, name)
1514
        return (account, container, name)
1515

    
1516
    @debug_method
1517
    @backend_method
1518
    def get_public(self, user, public):
1519
        """Return the (account, container, name) for the public id given."""
1520

    
1521
        path = self.permissions.public_path(public)
1522
        if path is None:
1523
            raise NameError
1524
        account, container, name = path.split('/', 2)
1525
        self._can_read_object(user, account, container, name)
1526
        return (account, container, name)
1527

    
1528
    def get_block(self, hash):
1529
        """Return a block's data."""
1530

    
1531
        logger.debug("get_block: %s", hash)
1532
        if hash.startswith('archip_'):
1533
            block = self.store.block_get_archipelago(hash)
1534
        else:
1535
            block = self.store.block_get(self._unhexlify_hash(hash))
1536
        if not block:
1537
            raise ItemNotExists('Block does not exist')
1538
        return block
1539

    
1540
    def put_block(self, data):
1541
        """Store a block and return the hash."""
1542

    
1543
        logger.debug("put_block: %s", len(data))
1544
        return binascii.hexlify(self.store.block_put(data))
1545

    
1546
    def update_block(self, hash, data, offset=0):
1547
        """Update a known block and return the hash."""
1548

    
1549
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1550
        if hash.startswith('archip_'):
1551
            raise IllegalOperationError(
1552
                'Cannot update an Archipelago Volume block.')
1553
        if offset == 0 and len(data) == self.block_size:
1554
            return self.put_block(data)
1555
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1556
        return binascii.hexlify(h)
1557

    
1558
    # Path functions.
1559

    
1560
    def _generate_uuid(self):
1561
        return str(uuidlib.uuid4())
1562

    
1563
    def _put_object_node(self, path, parent, name):
1564
        path = '/'.join((path, name))
1565
        node = self.node.node_lookup(path)
1566
        if node is None:
1567
            node = self.node.node_create(parent, path)
1568
        return path, node
1569

    
1570
    def _put_path(self, user, parent, path,
1571
                  update_statistics_ancestors_depth=None):
1572
        node = self.node.node_create(parent, path)
1573
        self.node.version_create(node, None, 0, '', None, user,
1574
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1575
                                 update_statistics_ancestors_depth)
1576
        return node
1577

    
1578
    def _lookup_account(self, account, create=True):
1579
        node = self.node.node_lookup(account)
1580
        if node is None and create:
1581
            node = self._put_path(
1582
                account, self.ROOTNODE, account,
1583
                update_statistics_ancestors_depth=-1)  # User is account.
1584
        return account, node
1585

    
1586
    def _lookup_container(self, account, container):
1587
        for_update = True if self.lock_container_path else False
1588
        path = '/'.join((account, container))
1589
        node = self.node.node_lookup(path, for_update)
1590
        if node is None:
1591
            raise ItemNotExists('Container does not exist')
1592
        return path, node
1593

    
1594
    def _lookup_object(self, account, container, name, lock_container=False):
1595
        if lock_container:
1596
            self._lookup_container(account, container)
1597

    
1598
        path = '/'.join((account, container, name))
1599
        node = self.node.node_lookup(path)
1600
        if node is None:
1601
            raise ItemNotExists('Object does not exist')
1602
        return path, node
1603

    
1604
    def _lookup_objects(self, paths):
1605
        nodes = self.node.node_lookup_bulk(paths)
1606
        return paths, nodes
1607

    
1608
    def _get_properties(self, node, until=None):
1609
        """Return properties until the timestamp given."""
1610

    
1611
        before = until if until is not None else inf
1612
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1613
        if props is None and until is not None:
1614
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1615
        if props is None:
1616
            raise ItemNotExists('Path does not exist')
1617
        return props
1618

    
1619
    def _get_statistics(self, node, until=None, compute=False):
1620
        """Return (count, sum of size, timestamp) of everything under node."""
1621

    
1622
        if until is not None:
1623
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1624
        elif compute:
1625
            stats = self.node.statistics_latest(node,
1626
                                                except_cluster=CLUSTER_DELETED)
1627
        else:
1628
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1629
        if stats is None:
1630
            stats = (0, 0, 0)
1631
        return stats
1632

    
1633
    def _get_version(self, node, version=None):
1634
        if version is None:
1635
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1636
            if props is None:
1637
                raise ItemNotExists('Object does not exist')
1638
        else:
1639
            try:
1640
                version = int(version)
1641
            except ValueError:
1642
                raise VersionNotExists('Version does not exist')
1643
            props = self.node.version_get_properties(version, node=node)
1644
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1645
                raise VersionNotExists('Version does not exist')
1646
        return props
1647

    
1648
    def _get_versions(self, nodes):
1649
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1650

    
1651
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1652
                               type=None, hash=None, checksum=None,
1653
                               cluster=CLUSTER_NORMAL, is_copy=False,
1654
                               update_statistics_ancestors_depth=None):
1655
        """Create a new version of the node."""
1656

    
1657
        props = self.node.version_lookup(
1658
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1659
        if props is not None:
1660
            src_version_id = props[self.SERIAL]
1661
            src_hash = props[self.HASH]
1662
            src_size = props[self.SIZE]
1663
            src_type = props[self.TYPE]
1664
            src_checksum = props[self.CHECKSUM]
1665
        else:
1666
            src_version_id = None
1667
            src_hash = None
1668
            src_size = 0
1669
            src_type = ''
1670
            src_checksum = ''
1671
        if size is None:  # Set metadata.
1672
            hash = src_hash  # This way hash can be set to None
1673
                             # (account or container).
1674
            size = src_size
1675
        if type is None:
1676
            type = src_type
1677
        if checksum is None:
1678
            checksum = src_checksum
1679
        uuid = self._generate_uuid(
1680
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1681

    
1682
        if src_node is None:
1683
            pre_version_id = src_version_id
1684
        else:
1685
            pre_version_id = None
1686
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1687
            if props is not None:
1688
                pre_version_id = props[self.SERIAL]
1689
        if pre_version_id is not None:
1690
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1691
                                        update_statistics_ancestors_depth)
1692

    
1693
        dest_version_id, mtime = self.node.version_create(
1694
            node, hash, size, type, src_version_id, user, uuid, checksum,
1695
            cluster, update_statistics_ancestors_depth)
1696

    
1697
        self.node.attribute_unset_is_latest(node, dest_version_id)
1698

    
1699
        return pre_version_id, dest_version_id
1700

    
1701
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1702
                                node, meta, replace=False):
1703
        if src_version_id is not None:
1704
            self.node.attribute_copy(src_version_id, dest_version_id)
1705
        if not replace:
1706
            self.node.attribute_del(dest_version_id, domain, (
1707
                k for k, v in meta.iteritems() if v == ''))
1708
            self.node.attribute_set(dest_version_id, domain, node, (
1709
                (k, v) for k, v in meta.iteritems() if v != ''))
1710
        else:
1711
            self.node.attribute_del(dest_version_id, domain)
1712
            self.node.attribute_set(dest_version_id, domain, node, ((
1713
                k, v) for k, v in meta.iteritems()))
1714

    
1715
    def _put_metadata(self, user, node, domain, meta, replace=False,
1716
                      update_statistics_ancestors_depth=None):
1717
        """Create a new version and store metadata."""
1718

    
1719
        src_version_id, dest_version_id = self._put_version_duplicate(
1720
            user, node,
1721
            update_statistics_ancestors_depth=
1722
            update_statistics_ancestors_depth)
1723
        self._put_metadata_duplicate(
1724
            src_version_id, dest_version_id, domain, node, meta, replace)
1725
        return src_version_id, dest_version_id
1726

    
1727
    def _list_limits(self, listing, marker, limit):
1728
        start = 0
1729
        if marker:
1730
            try:
1731
                start = listing.index(marker) + 1
1732
            except ValueError:
1733
                pass
1734
        if not limit or limit > 10000:
1735
            limit = 10000
1736
        return start, limit
1737

    
1738
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1739
                                marker=None, limit=10000, virtual=True,
1740
                                domain=None, keys=None, until=None,
1741
                                size_range=None, allowed=None,
1742
                                all_props=False):
1743
        keys = keys or []
1744
        allowed = allowed or []
1745
        cont_prefix = path + '/'
1746
        prefix = cont_prefix + prefix
1747
        start = cont_prefix + marker if marker else None
1748
        before = until if until is not None else inf
1749
        filterq = keys if domain else []
1750
        sizeq = size_range
1751

    
1752
        objects, prefixes = self.node.latest_version_list(
1753
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1754
            allowed, domain, filterq, sizeq, all_props)
1755
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1756
        objects.sort(key=lambda x: x[0])
1757
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1758
        return objects
1759

    
1760
    # Reporting functions.
1761

    
1762
    @debug_method
1763
    @backend_method
1764
    def _report_size_change(self, user, account, size, details=None):
1765
        details = details or {}
1766

    
1767
        if size == 0:
1768
            return
1769

    
1770
        account_node = self._lookup_account(account, True)[1]
1771
        total = self._get_statistics(account_node, compute=True)[1]
1772
        details.update({'user': user, 'total': total})
1773
        self.messages.append(
1774
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1775
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1776

    
1777
        if not self.using_external_quotaholder:
1778
            return
1779

    
1780
        try:
1781
            name = details['path'] if 'path' in details else ''
1782
            serial = self.astakosclient.issue_one_commission(
1783
                holder=account,
1784
                source=DEFAULT_SOURCE,
1785
                provisions={'pithos.diskspace': size},
1786
                name=name)
1787
        except BaseException, e:
1788
            raise QuotaError(e)
1789
        else:
1790
            self.serials.append(serial)
1791

    
1792
    @debug_method
1793
    @backend_method
1794
    def _report_object_change(self, user, account, path, details=None):
1795
        details = details or {}
1796
        details.update({'user': user})
1797
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1798
                              account, QUEUE_INSTANCE_ID, 'object', path,
1799
                              details))
1800

    
1801
    @debug_method
1802
    @backend_method
1803
    def _report_sharing_change(self, user, account, path, details=None):
1804
        details = details or {}
1805
        details.update({'user': user})
1806
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1807
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1808
                              details))
1809

    
1810
    # Policy functions.
1811

    
1812
    def _check_policy(self, policy, is_account_policy=True):
1813
        default_policy = self.default_account_policy \
1814
            if is_account_policy else self.default_container_policy
1815
        for k in policy.keys():
1816
            if policy[k] == '':
1817
                policy[k] = default_policy.get(k)
1818
        for k, v in policy.iteritems():
1819
            if k == 'quota':
1820
                q = int(v)  # May raise ValueError.
1821
                if q < 0:
1822
                    raise ValueError
1823
            elif k == 'versioning':
1824
                if v not in ['auto', 'none']:
1825
                    raise ValueError
1826
            else:
1827
                raise ValueError
1828

    
1829
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1830
        default_policy = self.default_account_policy \
1831
            if is_account_policy else self.default_container_policy
1832
        if replace:
1833
            for k, v in default_policy.iteritems():
1834
                if k not in policy:
1835
                    policy[k] = v
1836
        self.node.policy_set(node, policy)
1837

    
1838
    def _get_policy(self, node, is_account_policy=True):
1839
        default_policy = self.default_account_policy \
1840
            if is_account_policy else self.default_container_policy
1841
        policy = default_policy.copy()
1842
        policy.update(self.node.policy_get(node))
1843
        return policy
1844

    
1845
    def _apply_versioning(self, account, container, version_id,
1846
                          update_statistics_ancestors_depth=None):
1847
        """Delete the provided version if such is the policy.
1848
           Return size of object removed.
1849
        """
1850

    
1851
        if version_id is None:
1852
            return 0
1853
        path, node = self._lookup_container(account, container)
1854
        versioning = self._get_policy(
1855
            node, is_account_policy=False)['versioning']
1856
        if versioning != 'auto':
1857
            hash, size = self.node.version_remove(
1858
                version_id, update_statistics_ancestors_depth)
1859
            self.store.map_delete(hash)
1860
            return size
1861
        elif self.free_versioning:
1862
            return self.node.version_get_properties(
1863
                version_id, keys=('size',))[0]
1864
        return 0
1865

    
1866
    # Access control functions.
1867

    
1868
    def _check_groups(self, groups):
1869
        # raise ValueError('Bad characters in groups')
1870
        pass
1871

    
1872
    def _check_permissions(self, path, permissions):
1873
        # raise ValueError('Bad characters in permissions')
1874
        pass
1875

    
1876
    def _get_formatted_paths(self, paths):
1877
        formatted = []
1878
        if len(paths) == 0:
1879
            return formatted
1880
        props = self.node.get_props(paths)
1881
        if props:
1882
            for prop in props:
1883
                if prop[1].split(';', 1)[0].strip() in (
1884
                        'application/directory', 'application/folder'):
1885
                    formatted.append((prop[0].rstrip('/') + '/',
1886
                                      self.MATCH_PREFIX))
1887
                formatted.append((prop[0], self.MATCH_EXACT))
1888
        return formatted
1889

    
1890
    def _get_permissions_path(self, account, container, name):
1891
        path = '/'.join((account, container, name))
1892
        permission_paths = self.permissions.access_inherit(path)
1893
        permission_paths.sort()
1894
        permission_paths.reverse()
1895
        for p in permission_paths:
1896
            if p == path:
1897
                return p
1898
            else:
1899
                if p.count('/') < 2:
1900
                    continue
1901
                node = self.node.node_lookup(p)
1902
                props = None
1903
                if node is not None:
1904
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1905
                if props is not None:
1906
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1907
                            'application/directory', 'application/folder'):
1908
                        return p
1909
        return None
1910

    
1911
    def _get_permissions_path_bulk(self, account, container, names):
1912
        formatted_paths = []
1913
        for name in names:
1914
            path = '/'.join((account, container, name))
1915
            formatted_paths.append(path)
1916
        permission_paths = self.permissions.access_inherit_bulk(
1917
            formatted_paths)
1918
        permission_paths.sort()
1919
        permission_paths.reverse()
1920
        permission_paths_list = []
1921
        lookup_list = []
1922
        for p in permission_paths:
1923
            if p in formatted_paths:
1924
                permission_paths_list.append(p)
1925
            else:
1926
                if p.count('/') < 2:
1927
                    continue
1928
                lookup_list.append(p)
1929

    
1930
        if len(lookup_list) > 0:
1931
            props = self.node.get_props(lookup_list)
1932
            if props:
1933
                for prop in props:
1934
                    if prop[1].split(';', 1)[0].strip() in (
1935
                            'application/directory', 'application/folder'):
1936
                        permission_paths_list.append(prop[0])
1937

    
1938
        if len(permission_paths_list) > 0:
1939
            return permission_paths_list
1940

    
1941
        return None
1942

    
1943
    def _reset_allowed_paths(self):
1944
        self.read_allowed_paths = defaultdict(set)
1945
        self.write_allowed_paths = defaultdict(set)
1946

    
1947
    @check_allowed_paths(action=0)
1948
    def _can_read_account(self, user, account):
1949
        if user != account:
1950
            if account not in self._allowed_accounts(user):
1951
                raise NotAllowedError
1952

    
1953
    @check_allowed_paths(action=1)
1954
    def _can_write_account(self, user, account):
1955
        if user != account:
1956
            raise NotAllowedError
1957

    
1958
    @check_allowed_paths(action=0)
1959
    def _can_read_container(self, user, account, container):
1960
        if user != account:
1961
            if container not in self._allowed_containers(user, account):
1962
                raise NotAllowedError
1963

    
1964
    @check_allowed_paths(action=1)
1965
    def _can_write_container(self, user, account, container):
1966
        if user != account:
1967
            raise NotAllowedError
1968

    
1969
    @check_allowed_paths(action=0)
1970
    def _can_read_object(self, user, account, container, name):
1971
        if user == account:
1972
            return True
1973
        path = '/'.join((account, container, name))
1974
        if self.permissions.public_get(path) is not None:
1975
            return True
1976
        path = self._get_permissions_path(account, container, name)
1977
        if not path:
1978
            raise NotAllowedError
1979
        if (not self.permissions.access_check(path, self.READ, user) and not
1980
                self.permissions.access_check(path, self.WRITE, user)):
1981
            raise NotAllowedError
1982

    
1983
    @check_allowed_paths(action=1)
1984
    def _can_write_object(self, user, account, container, name):
1985
        if user == account:
1986
            return True
1987
        path = '/'.join((account, container, name))
1988
        path = self._get_permissions_path(account, container, name)
1989
        if not path:
1990
            raise NotAllowedError
1991
        if not self.permissions.access_check(path, self.WRITE, user):
1992
            raise NotAllowedError
1993

    
1994
    def _allowed_accounts(self, user):
1995
        allow = set()
1996
        for path in self.permissions.access_list_paths(user):
1997
            p = path.split('/', 1)[0]
1998
            allow.add(p)
1999
        self.read_allowed_paths[user] |= allow
2000
        return sorted(allow)
2001

    
2002
    def _allowed_containers(self, user, account):
2003
        allow = set()
2004
        for path in self.permissions.access_list_paths(user, account):
2005
            p = path.split('/', 2)[1]
2006
            allow.add(p)
2007
        self.read_allowed_paths[user] |= allow
2008
        return sorted(allow)
2009

    
2010
    # Domain functions
2011

    
2012
    @debug_method
2013
    @backend_method
2014
    def get_domain_objects(self, domain, user=None):
2015
        allowed_paths = self.permissions.access_list_paths(
2016
            user, include_owned=user is not None, include_containers=False)
2017
        if not allowed_paths:
2018
            return []
2019
        obj_list = self.node.domain_object_list(
2020
            domain, allowed_paths, CLUSTER_NORMAL)
2021
        return [(path,
2022
                 self._build_metadata(props, user_defined_meta),
2023
                 self.permissions.access_get(path)) for
2024
                path, props, user_defined_meta in obj_list]
2025

    
2026
    # util functions
2027

    
2028
    def _build_metadata(self, props, user_defined=None,
2029
                        include_user_defined=True):
2030
        meta = {'bytes': props[self.SIZE],
2031
                'type': props[self.TYPE],
2032
                'hash': props[self.HASH],
2033
                'version': props[self.SERIAL],
2034
                'version_timestamp': props[self.MTIME],
2035
                'modified_by': props[self.MUSER],
2036
                'uuid': props[self.UUID],
2037
                'checksum': props[self.CHECKSUM]}
2038
        if include_user_defined and user_defined is not None:
2039
            meta.update(user_defined)
2040
        return meta
2041

    
2042
    def _exists(self, node):
2043
        try:
2044
            self._get_version(node)
2045
        except ItemNotExists:
2046
            return False
2047
        else:
2048
            return True
2049

    
2050
    def _unhexlify_hash(self, hash):
2051
        try:
2052
            return binascii.unhexlify(hash)
2053
        except TypeError:
2054
            raise InvalidHash(hash)