Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d0b67cbc

History | View | Annotate | Download (80 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42
from time import time
43

    
44
from pithos.workers import glue
45
from archipelago.common import Segment, Xseg_ctx
46
from objpool import ObjectPool
47

    
48

    
49
try:
50
    from astakosclient import AstakosClient
51
except ImportError:
52
    AstakosClient = None
53

    
54
from pithos.backends.base import (
55
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
56
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
57
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
58
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
59
    InvalidHash, IllegalOperationError)
60

    
61

    
62
class DisabledAstakosClient(object):
63
    def __init__(self, *args, **kwargs):
64
        self.args = args
65
        self.kwargs = kwargs
66

    
67
    def __getattr__(self, name):
68
        m = ("AstakosClient has been disabled, "
69
             "yet an attempt to access it was made")
70
        raise AssertionError(m)
71

    
72

    
73
# Stripped-down version of the HashMap class found in tools.
74

    
75
class HashMap(list):
76

    
77
    def __init__(self, blocksize, blockhash):
78
        super(HashMap, self).__init__()
79
        self.blocksize = blocksize
80
        self.blockhash = blockhash
81

    
82
    def _hash_raw(self, v):
83
        h = hashlib.new(self.blockhash)
84
        h.update(v)
85
        return h.digest()
86

    
87
    def hash(self):
88
        if len(self) == 0:
89
            return self._hash_raw('')
90
        if len(self) == 1:
91
            return self.__getitem__(0)
92

    
93
        h = list(self)
94
        s = 2
95
        while s < len(h):
96
            s = s * 2
97
        h += [('\x00' * len(h[0]))] * (s - len(h))
98
        while len(h) > 1:
99
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
100
        return h[0]
101

    
102
# Default modules and settings.
103
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
104
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
105
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
106
DEFAULT_BLOCK_PATH = 'data/'
107
DEFAULT_BLOCK_UMASK = 0o022
108
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
109
DEFAULT_HASH_ALGORITHM = 'sha256'
110
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
111
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
112
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
113
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
114
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
115
                               'abcdefghijklmnopqrstuvwxyz'
116
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
117
DEFAULT_PUBLIC_URL_SECURITY = 16
118
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
119

    
120
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
121
QUEUE_CLIENT_ID = 'pithos'
122
QUEUE_INSTANCE_ID = '1'
123

    
124
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
125

    
126
inf = float('inf')
127

    
128
ULTIMATE_ANSWER = 42
129

    
130
DEFAULT_SOURCE = 'system'
131
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
132

    
133
DEFAULT_MAP_CHECK_INTERVAL = 300  # set to 300 secs
134

    
135
logger = logging.getLogger(__name__)
136

    
137

    
138
def backend_method(func):
139
    @wraps(func)
140
    def wrapper(self, *args, **kw):
141
        # if we are inside a database transaction
142
        # just proceed with the method execution
143
        # otherwise manage a new transaction
144
        if self.in_transaction:
145
            return func(self, *args, **kw)
146

    
147
        try:
148
            self.pre_exec()
149
            result = func(self, *args, **kw)
150
            success_status = True
151
            return result
152
        except:
153
            success_status = False
154
            raise
155
        finally:
156
            self.post_exec(success_status)
157
    return wrapper
158

    
159

    
160
def debug_method(func):
161
    @wraps(func)
162
    def wrapper(self, *args, **kw):
163
        try:
164
            result = func(self, *args, **kw)
165
            return result
166
        except:
167
            result = format_exc()
168
            raise
169
        finally:
170
            all_args = map(repr, args)
171
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
172
            logger.debug(">>> %s(%s) <<< %s" % (
173
                func.__name__, ', '.join(all_args).rstrip(', '), result))
174
    return wrapper
175

    
176

    
177
def list_method(func):
178
    @wraps(func)
179
    def wrapper(self, *args, **kw):
180
        marker = kw.get('marker')
181
        limit = kw.get('limit')
182
        result = func(self, *args, **kw)
183
        start, limit = self._list_limits(result, marker, limit)
184
        return result[start:start + limit]
185
    return wrapper
186

    
187

    
188
class ModularBackend(BaseBackend):
189
    """A modular backend.
190

191
    Uses modules for SQL functions and storage.
192
    """
193

    
194
    def __init__(self, db_module=None, db_connection=None,
195
                 block_module=None, block_path=None, block_umask=None,
196
                 block_size=None, hash_algorithm=None,
197
                 queue_module=None, queue_hosts=None, queue_exchange=None,
198
                 astakos_auth_url=None, service_token=None,
199
                 astakosclient_poolsize=None,
200
                 free_versioning=True, block_params=None,
201
                 public_url_security=None,
202
                 public_url_alphabet=None,
203
                 account_quota_policy=None,
204
                 container_quota_policy=None,
205
                 container_versioning_policy=None,
206
                 archipelago_conf_file=None,
207
                 xseg_pool_size=8,
208
                 map_check_interval=None):
209
        db_module = db_module or DEFAULT_DB_MODULE
210
        db_connection = db_connection or DEFAULT_DB_CONNECTION
211
        block_module = block_module or DEFAULT_BLOCK_MODULE
212
        block_path = block_path or DEFAULT_BLOCK_PATH
213
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
214
        block_params = block_params or DEFAULT_BLOCK_PARAMS
215
        block_size = block_size or DEFAULT_BLOCK_SIZE
216
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
217
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
218
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
219
        container_quota_policy = container_quota_policy \
220
            or DEFAULT_CONTAINER_QUOTA
221
        container_versioning_policy = container_versioning_policy \
222
            or DEFAULT_CONTAINER_VERSIONING
223
        archipelago_conf_file = archipelago_conf_file \
224
            or DEFAULT_ARCHIPELAGO_CONF_FILE
225
        map_check_interval = map_check_interval \
226
            or DEFAULT_MAP_CHECK_INTERVAL
227

    
228
        self.default_account_policy = {'quota': account_quota_policy}
229
        self.default_container_policy = {
230
            'quota': container_quota_policy,
231
            'versioning': container_versioning_policy
232
        }
233
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
234
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
235

    
236
        self.public_url_security = (public_url_security or
237
                                    DEFAULT_PUBLIC_URL_SECURITY)
238
        self.public_url_alphabet = (public_url_alphabet or
239
                                    DEFAULT_PUBLIC_URL_ALPHABET)
240

    
241
        self.hash_algorithm = hash_algorithm
242
        self.block_size = block_size
243
        self.free_versioning = free_versioning
244
        self.map_check_interval = map_check_interval
245

    
246
        def load_module(m):
247
            __import__(m)
248
            return sys.modules[m]
249

    
250
        self.db_module = load_module(db_module)
251
        self.wrapper = self.db_module.DBWrapper(db_connection)
252
        params = {'wrapper': self.wrapper}
253
        self.permissions = self.db_module.Permissions(**params)
254
        self.config = self.db_module.Config(**params)
255
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
256
        for x in ['READ', 'WRITE']:
257
            setattr(self, x, getattr(self.db_module, x))
258
        self.node = self.db_module.Node(**params)
259
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
260
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
261
                  'MATCH_PREFIX', 'MATCH_EXACT',
262
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
263
            setattr(self, x, getattr(self.db_module, x))
264

    
265
        self.ALLOWED = ['read', 'write']
266

    
267
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
268
                                      cfile=archipelago_conf_file,
269
                                      pool_size=xseg_pool_size)
270
        self.block_module = load_module(block_module)
271
        self.block_params = block_params
272
        params = {'path': block_path,
273
                  'block_size': self.block_size,
274
                  'hash_algorithm': self.hash_algorithm,
275
                  'umask': block_umask}
276
        params.update(self.block_params)
277
        self.store = self.block_module.Store(**params)
278

    
279
        if queue_module and queue_hosts:
280
            self.queue_module = load_module(queue_module)
281
            params = {'hosts': queue_hosts,
282
                      'exchange': queue_exchange,
283
                      'client_id': QUEUE_CLIENT_ID}
284
            self.queue = self.queue_module.Queue(**params)
285
        else:
286
            class NoQueue:
287
                def send(self, *args):
288
                    pass
289

    
290
                def close(self):
291
                    pass
292

    
293
            self.queue = NoQueue()
294

    
295
        self.astakos_auth_url = astakos_auth_url
296
        self.service_token = service_token
297

    
298
        if not astakos_auth_url or not AstakosClient:
299
            self.astakosclient = DisabledAstakosClient(
300
                service_token, astakos_auth_url,
301
                use_pool=True,
302
                pool_size=astakosclient_poolsize)
303
        else:
304
            self.astakosclient = AstakosClient(
305
                service_token, astakos_auth_url,
306
                use_pool=True,
307
                pool_size=astakosclient_poolsize)
308

    
309
        self.serials = []
310
        self.messages = []
311

    
312
        self._move_object = partial(self._copy_object, is_move=True)
313

    
314
        self.lock_container_path = False
315

    
316
        self.in_transaction = False
317

    
318
    def pre_exec(self, lock_container_path=False):
319
        self.lock_container_path = lock_container_path
320
        self.wrapper.execute()
321
        self.serials = []
322
        self.in_transaction = True
323

    
324
    def post_exec(self, success_status=True):
325
        if success_status:
326
            # send messages produced
327
            for m in self.messages:
328
                self.queue.send(*m)
329

    
330
            # register serials
331
            if self.serials:
332
                self.commission_serials.insert_many(
333
                    self.serials)
334

    
335
                # commit to ensure that the serials are registered
336
                # even if resolve commission fails
337
                self.wrapper.commit()
338

    
339
                # start new transaction
340
                self.wrapper.execute()
341

    
342
                r = self.astakosclient.resolve_commissions(
343
                    accept_serials=self.serials,
344
                    reject_serials=[])
345
                self.commission_serials.delete_many(
346
                    r['accepted'])
347

    
348
            self.wrapper.commit()
349
        else:
350
            if self.serials:
351
                r = self.astakosclient.resolve_commissions(
352
                    accept_serials=[],
353
                    reject_serials=self.serials)
354
                self.commission_serials.delete_many(
355
                    r['rejected'])
356
            self.wrapper.rollback()
357
        self.in_transaction = False
358

    
359
    def close(self):
360
        self.wrapper.close()
361
        self.queue.close()
362

    
363
    @property
364
    def using_external_quotaholder(self):
365
        return not isinstance(self.astakosclient, DisabledAstakosClient)
366

    
367
    @debug_method
368
    @backend_method
369
    @list_method
370
    def list_accounts(self, user, marker=None, limit=10000):
371
        """Return a list of accounts the user can access."""
372

    
373
        return self._allowed_accounts(user)
374

    
375
    def _get_account_quotas(self, account):
376
        """Get account usage from astakos."""
377

    
378
        quotas = self.astakosclient.service_get_quotas(account)[account]
379
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
380
                                                  {})
381

    
382
    def _get_account_quotas(self, account):
383
        """Get account usage from astakos."""
384

    
385
        quotas = self.astakosclient.service_get_quotas(account)[account]
386
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
387
                                                  {})
388

    
389
    @debug_method
390
    @backend_method
391
    def get_account_meta(self, user, account, domain, until=None,
392
                         include_user_defined=True):
393
        """Return a dictionary with the account metadata for the domain."""
394

    
395
        path, node = self._lookup_account(account, user == account)
396
        if user != account:
397
            if until or (node is None) or (account not
398
                                           in self._allowed_accounts(user)):
399
                raise NotAllowedError
400
        try:
401
            props = self._get_properties(node, until)
402
            mtime = props[self.MTIME]
403
        except NameError:
404
            props = None
405
            mtime = until
406
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
407
        tstamp = max(tstamp, mtime)
408
        if until is None:
409
            modified = tstamp
410
        else:
411
            modified = self._get_statistics(
412
                node, compute=True)[2]  # Overall last modification.
413
            modified = max(modified, mtime)
414

    
415
        if user != account:
416
            meta = {'name': account}
417
        else:
418
            meta = {}
419
            if props is not None and include_user_defined:
420
                meta.update(
421
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
422
            if until is not None:
423
                meta.update({'until_timestamp': tstamp})
424
            meta.update({'name': account, 'count': count, 'bytes': bytes})
425
            if self.using_external_quotaholder:
426
                external_quota = self._get_account_quotas(account)
427
                meta['bytes'] = external_quota.get('usage', 0)
428
        meta.update({'modified': modified})
429
        return meta
430

    
431
    @debug_method
432
    @backend_method
433
    def update_account_meta(self, user, account, domain, meta, replace=False):
434
        """Update the metadata associated with the account for the domain."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        path, node = self._lookup_account(account, True)
439
        self._put_metadata(user, node, domain, meta, replace,
440
                           update_statistics_ancestors_depth=-1)
441

    
442
    @debug_method
443
    @backend_method
444
    def get_account_groups(self, user, account):
445
        """Return a dictionary with the user groups defined for the account."""
446

    
447
        if user != account:
448
            if account not in self._allowed_accounts(user):
449
                raise NotAllowedError
450
            return {}
451
        self._lookup_account(account, True)
452
        return self.permissions.group_dict(account)
453

    
454
    @debug_method
455
    @backend_method
456
    def update_account_groups(self, user, account, groups, replace=False):
457
        """Update the groups associated with the account."""
458

    
459
        if user != account:
460
            raise NotAllowedError
461
        self._lookup_account(account, True)
462
        self._check_groups(groups)
463
        if replace:
464
            self.permissions.group_destroy(account)
465
        for k, v in groups.iteritems():
466
            if not replace:  # If not already deleted.
467
                self.permissions.group_delete(account, k)
468
            if v:
469
                self.permissions.group_addmany(account, k, v)
470

    
471
    @debug_method
472
    @backend_method
473
    def get_account_policy(self, user, account):
474
        """Return a dictionary with the account policy."""
475

    
476
        if user != account:
477
            if account not in self._allowed_accounts(user):
478
                raise NotAllowedError
479
            return {}
480
        path, node = self._lookup_account(account, True)
481
        policy = self._get_policy(node, is_account_policy=True)
482
        if self.using_external_quotaholder:
483
            external_quota = self._get_account_quotas(account)
484
            policy['quota'] = external_quota.get('limit', 0)
485
        return policy
486

    
487
    @debug_method
488
    @backend_method
489
    def update_account_policy(self, user, account, policy, replace=False):
490
        """Update the policy associated with the account."""
491

    
492
        if user != account:
493
            raise NotAllowedError
494
        path, node = self._lookup_account(account, True)
495
        self._check_policy(policy, is_account_policy=True)
496
        self._put_policy(node, policy, replace, is_account_policy=True)
497

    
498
    @debug_method
499
    @backend_method
500
    def put_account(self, user, account, policy=None):
501
        """Create a new account with the given name."""
502

    
503
        policy = policy or {}
504
        if user != account:
505
            raise NotAllowedError
506
        node = self.node.node_lookup(account)
507
        if node is not None:
508
            raise AccountExists('Account already exists')
509
        if policy:
510
            self._check_policy(policy, is_account_policy=True)
511
        node = self._put_path(user, self.ROOTNODE, account,
512
                              update_statistics_ancestors_depth=-1)
513
        self._put_policy(node, policy, True, is_account_policy=True)
514

    
515
    @debug_method
516
    @backend_method
517
    def delete_account(self, user, account):
518
        """Delete the account with the given name."""
519

    
520
        if user != account:
521
            raise NotAllowedError
522
        node = self.node.node_lookup(account)
523
        if node is None:
524
            return
525
        if not self.node.node_remove(node,
526
                                     update_statistics_ancestors_depth=-1):
527
            raise AccountNotEmpty('Account is not empty')
528
        self.permissions.group_destroy(account)
529

    
530
    @debug_method
531
    @backend_method
532
    @list_method
533
    def list_containers(self, user, account, marker=None, limit=10000,
534
                        shared=False, until=None, public=False):
535
        """Return a list of containers existing under an account."""
536

    
537
        if user != account:
538
            if until or account not in self._allowed_accounts(user):
539
                raise NotAllowedError
540
            return self._allowed_containers(user, account)
541
        if shared or public:
542
            allowed = set()
543
            if shared:
544
                allowed.update([x.split('/', 2)[1] for x in
545
                               self.permissions.access_list_shared(account)])
546
            if public:
547
                allowed.update([x[0].split('/', 2)[1] for x in
548
                               self.permissions.public_list(account)])
549
            return sorted(allowed)
550
        node = self.node.node_lookup(account)
551
        return [x[0] for x in self._list_object_properties(
552
            node, account, '', '/', marker, limit, False, None, [], until)]
553

    
554
    @debug_method
555
    @backend_method
556
    def list_container_meta(self, user, account, container, domain,
557
                            until=None):
558
        """Return a list of the container's object meta keys for a domain."""
559

    
560
        allowed = []
561
        if user != account:
562
            if until:
563
                raise NotAllowedError
564
            allowed = self.permissions.access_list_paths(
565
                user, '/'.join((account, container)))
566
            if not allowed:
567
                raise NotAllowedError
568
        path, node = self._lookup_container(account, container)
569
        before = until if until is not None else inf
570
        allowed = self._get_formatted_paths(allowed)
571
        return self.node.latest_attribute_keys(node, domain, before,
572
                                               CLUSTER_DELETED, allowed)
573

    
574
    @debug_method
575
    @backend_method
576
    def get_container_meta(self, user, account, container, domain, until=None,
577
                           include_user_defined=True):
578
        """Return a dictionary with the container metadata for the domain."""
579

    
580
        if user != account:
581
            if until or container not in self._allowed_containers(user,
582
                                                                  account):
583
                raise NotAllowedError
584
        path, node = self._lookup_container(account, container)
585
        props = self._get_properties(node, until)
586
        mtime = props[self.MTIME]
587
        count, bytes, tstamp = self._get_statistics(node, until)
588
        tstamp = max(tstamp, mtime)
589
        if until is None:
590
            modified = tstamp
591
        else:
592
            modified = self._get_statistics(
593
                node)[2]  # Overall last modification.
594
            modified = max(modified, mtime)
595

    
596
        if user != account:
597
            meta = {'name': container}
598
        else:
599
            meta = {}
600
            if include_user_defined:
601
                meta.update(
602
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
603
            if until is not None:
604
                meta.update({'until_timestamp': tstamp})
605
            meta.update({'name': container, 'count': count, 'bytes': bytes})
606
        meta.update({'modified': modified})
607
        return meta
608

    
609
    @debug_method
610
    @backend_method
611
    def update_container_meta(self, user, account, container, domain, meta,
612
                              replace=False):
613
        """Update the metadata associated with the container for the domain."""
614

    
615
        if user != account:
616
            raise NotAllowedError
617
        path, node = self._lookup_container(account, container)
618
        src_version_id, dest_version_id = self._put_metadata(
619
            user, node, domain, meta, replace,
620
            update_statistics_ancestors_depth=0)
621
        if src_version_id is not None:
622
            versioning = self._get_policy(
623
                node, is_account_policy=False)['versioning']
624
            if versioning != 'auto':
625
                self.node.version_remove(src_version_id,
626
                                         update_statistics_ancestors_depth=0)
627

    
628
    @debug_method
629
    @backend_method
630
    def get_container_policy(self, user, account, container):
631
        """Return a dictionary with the container policy."""
632

    
633
        if user != account:
634
            if container not in self._allowed_containers(user, account):
635
                raise NotAllowedError
636
            return {}
637
        path, node = self._lookup_container(account, container)
638
        return self._get_policy(node, is_account_policy=False)
639

    
640
    @debug_method
641
    @backend_method
642
    def update_container_policy(self, user, account, container, policy,
643
                                replace=False):
644
        """Update the policy associated with the container."""
645

    
646
        if user != account:
647
            raise NotAllowedError
648
        path, node = self._lookup_container(account, container)
649
        self._check_policy(policy, is_account_policy=False)
650
        self._put_policy(node, policy, replace, is_account_policy=False)
651

    
652
    @debug_method
653
    @backend_method
654
    def put_container(self, user, account, container, policy=None):
655
        """Create a new container with the given name."""
656

    
657
        policy = policy or {}
658
        if user != account:
659
            raise NotAllowedError
660
        try:
661
            path, node = self._lookup_container(account, container)
662
        except NameError:
663
            pass
664
        else:
665
            raise ContainerExists('Container already exists')
666
        if policy:
667
            self._check_policy(policy, is_account_policy=False)
668
        path = '/'.join((account, container))
669
        node = self._put_path(
670
            user, self._lookup_account(account, True)[1], path,
671
            update_statistics_ancestors_depth=-1)
672
        self._put_policy(node, policy, True, is_account_policy=False)
673

    
674
    @debug_method
675
    @backend_method
676
    def delete_container(self, user, account, container, until=None, prefix='',
677
                         delimiter=None):
678
        """Delete/purge the container with the given name."""
679

    
680
        if user != account:
681
            raise NotAllowedError
682
        path, node = self._lookup_container(account, container)
683

    
684
        if until is not None:
685
            hashes, size, serials = self.node.node_purge_children(
686
                node, until, CLUSTER_HISTORY,
687
                update_statistics_ancestors_depth=0)
688
            for h in hashes:
689
                self.store.map_delete(h)
690
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
691
                                          update_statistics_ancestors_depth=0)
692
            if not self.free_versioning:
693
                self._report_size_change(
694
                    user, account, -size, {
695
                        'action': 'container purge',
696
                        'path': path,
697
                        'versions': ','.join(str(i) for i in serials)
698
                    }
699
                )
700
            return
701

    
702
        if not delimiter:
703
            if self._get_statistics(node)[0] > 0:
704
                raise ContainerNotEmpty('Container is not empty')
705
            hashes, size, serials = self.node.node_purge_children(
706
                node, inf, CLUSTER_HISTORY,
707
                update_statistics_ancestors_depth=0)
708
            for h in hashes:
709
                self.store.map_delete(h)
710
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
711
                                          update_statistics_ancestors_depth=0)
712
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
713
            if not self.free_versioning:
714
                self._report_size_change(
715
                    user, account, -size, {
716
                        'action': 'container purge',
717
                        'path': path,
718
                        'versions': ','.join(str(i) for i in serials)
719
                    }
720
                )
721
        else:
722
            # remove only contents
723
            src_names = self._list_objects_no_limit(
724
                user, account, container, prefix='', delimiter=None,
725
                virtual=False, domain=None, keys=[], shared=False, until=None,
726
                size_range=None, all_props=True, public=False)
727
            paths = []
728
            for t in src_names:
729
                path = '/'.join((account, container, t[0]))
730
                node = t[2]
731
                if not self._exists(node):
732
                    continue
733
                src_version_id, dest_version_id = self._put_version_duplicate(
734
                    user, node, size=0, type='', hash=None, checksum='',
735
                    cluster=CLUSTER_DELETED,
736
                    update_statistics_ancestors_depth=1)
737
                del_size = self._apply_versioning(
738
                    account, container, src_version_id,
739
                    update_statistics_ancestors_depth=1)
740
                self._report_size_change(
741
                    user, account, -del_size, {
742
                        'action': 'object delete',
743
                        'path': path,
744
                        'versions': ','.join([str(dest_version_id)])})
745
                self._report_object_change(
746
                    user, account, path, details={'action': 'object delete'})
747
                paths.append(path)
748
            self.permissions.access_clear_bulk(paths)
749

    
750
    def _list_objects(self, user, account, container, prefix, delimiter,
751
                      marker, limit, virtual, domain, keys, shared, until,
752
                      size_range, all_props, public):
753
        if user != account and until:
754
            raise NotAllowedError
755

    
756
        objects = []
757
        if shared and public:
758
            # get shared first
759
            shared_paths = self._list_object_permissions(
760
                user, account, container, prefix, shared=True, public=False)
761
            if shared_paths:
762
                path, node = self._lookup_container(account, container)
763
                shared_paths = self._get_formatted_paths(shared_paths)
764
                objects = set(self._list_object_properties(
765
                    node, path, prefix, delimiter, marker, limit, virtual,
766
                    domain, keys, until, size_range, shared_paths, all_props))
767

    
768
            # get public
769
            objects |= set(self._list_public_object_properties(
770
                user, account, container, prefix, all_props))
771
            objects = list(objects)
772

    
773
            objects.sort(key=lambda x: x[0])
774
        elif public:
775
            objects = self._list_public_object_properties(
776
                user, account, container, prefix, all_props)
777
        else:
778
            allowed = self._list_object_permissions(
779
                user, account, container, prefix, shared, public=False)
780
            if shared and not allowed:
781
                return []
782
            path, node = self._lookup_container(account, container)
783
            allowed = self._get_formatted_paths(allowed)
784
            objects = self._list_object_properties(
785
                node, path, prefix, delimiter, marker, limit, virtual, domain,
786
                keys, until, size_range, allowed, all_props)
787

    
788
        # apply limits
789
        start, limit = self._list_limits(objects, marker, limit)
790
        return objects[start:start + limit]
791

    
792
    def _list_public_object_properties(self, user, account, container, prefix,
793
                                       all_props):
794
        public = self._list_object_permissions(
795
            user, account, container, prefix, shared=False, public=True)
796
        paths, nodes = self._lookup_objects(public)
797
        path = '/'.join((account, container))
798
        cont_prefix = path + '/'
799
        paths = [x[len(cont_prefix):] for x in paths]
800
        objects = [(p,) + props for p, props in
801
                   zip(paths, self.node.version_lookup_bulk(
802
                       nodes, all_props=all_props, order_by_path=True))]
803
        return objects
804

    
805
    def _list_objects_no_limit(self, user, account, container, prefix,
806
                               delimiter, virtual, domain, keys, shared, until,
807
                               size_range, all_props, public):
808
        objects = []
809
        while True:
810
            marker = objects[-1] if objects else None
811
            limit = 10000
812
            l = self._list_objects(
813
                user, account, container, prefix, delimiter, marker, limit,
814
                virtual, domain, keys, shared, until, size_range, all_props,
815
                public)
816
            objects.extend(l)
817
            if not l or len(l) < limit:
818
                break
819
        return objects
820

    
821
    def _list_object_permissions(self, user, account, container, prefix,
822
                                 shared, public):
823
        allowed = []
824
        path = '/'.join((account, container, prefix)).rstrip('/')
825
        if user != account:
826
            allowed = self.permissions.access_list_paths(user, path)
827
            if not allowed:
828
                raise NotAllowedError
829
        else:
830
            allowed = set()
831
            if shared:
832
                allowed.update(self.permissions.access_list_shared(path))
833
            if public:
834
                allowed.update(
835
                    [x[0] for x in self.permissions.public_list(path)])
836
            allowed = sorted(allowed)
837
            if not allowed:
838
                return []
839
        return allowed
840

    
841
    @debug_method
842
    @backend_method
843
    def list_objects(self, user, account, container, prefix='', delimiter=None,
844
                     marker=None, limit=10000, virtual=True, domain=None,
845
                     keys=None, shared=False, until=None, size_range=None,
846
                     public=False):
847
        """List (object name, object version_id) under a container."""
848

    
849
        keys = keys or []
850
        return self._list_objects(
851
            user, account, container, prefix, delimiter, marker, limit,
852
            virtual, domain, keys, shared, until, size_range, False, public)
853

    
854
    @debug_method
855
    @backend_method
856
    def list_object_meta(self, user, account, container, prefix='',
857
                         delimiter=None, marker=None, limit=10000,
858
                         virtual=True, domain=None, keys=None, shared=False,
859
                         until=None, size_range=None, public=False):
860
        """Return a list of metadata dicts of objects under a container."""
861

    
862
        keys = keys or []
863
        props = self._list_objects(
864
            user, account, container, prefix, delimiter, marker, limit,
865
            virtual, domain, keys, shared, until, size_range, True, public)
866
        objects = []
867
        for p in props:
868
            if len(p) == 2:
869
                objects.append({'subdir': p[0]})
870
            else:
871
                objects.append({
872
                    'name': p[0],
873
                    'bytes': p[self.SIZE + 1],
874
                    'type': p[self.TYPE + 1],
875
                    'hash': p[self.HASH + 1],
876
                    'version': p[self.SERIAL + 1],
877
                    'version_timestamp': p[self.MTIME + 1],
878
                    'modified': p[self.MTIME + 1] if until is None else None,
879
                    'modified_by': p[self.MUSER + 1],
880
                    'uuid': p[self.UUID + 1],
881
                    'checksum': p[self.CHECKSUM + 1]})
882
        return objects
883

    
884
    @debug_method
885
    @backend_method
886
    def list_object_permissions(self, user, account, container, prefix=''):
887
        """Return a list of paths enforce permissions under a container."""
888

    
889
        return self._list_object_permissions(user, account, container, prefix,
890
                                             True, False)
891

    
892
    @debug_method
893
    @backend_method
894
    def list_object_public(self, user, account, container, prefix=''):
895
        """Return a mapping of object paths to public ids under a container."""
896

    
897
        public = {}
898
        for path, p in self.permissions.public_list('/'.join((account,
899
                                                              container,
900
                                                              prefix))):
901
            public[path] = p
902
        return public
903

    
904
    @debug_method
905
    @backend_method
906
    def get_object_meta(self, user, account, container, name, domain,
907
                        version=None, include_user_defined=True):
908
        """Return a dictionary with the object metadata for the domain."""
909

    
910
        self._can_read(user, account, container, name)
911
        path, node = self._lookup_object(account, container, name)
912
        props = self._get_version(node, version)
913
        if version is None:
914
            modified = props[self.MTIME]
915
        else:
916
            try:
917
                modified = self._get_version(
918
                    node)[self.MTIME]  # Overall last modification.
919
            except NameError:  # Object may be deleted.
920
                del_props = self.node.version_lookup(
921
                    node, inf, CLUSTER_DELETED)
922
                if del_props is None:
923
                    raise ItemNotExists('Object does not exist')
924
                modified = del_props[self.MTIME]
925

    
926
        meta = {}
927
        if include_user_defined:
928
            meta.update(
929
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
930
        meta.update({'name': name,
931
                     'bytes': props[self.SIZE],
932
                     'type': props[self.TYPE],
933
                     'hash': props[self.HASH],
934
                     'version': props[self.SERIAL],
935
                     'version_timestamp': props[self.MTIME],
936
                     'modified': modified,
937
                     'modified_by': props[self.MUSER],
938
                     'uuid': props[self.UUID],
939
                     'checksum': props[self.CHECKSUM],
940
                     'available': props[self.AVAILABLE],
941
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
942
        return meta
943

    
944
    @debug_method
945
    @backend_method
946
    def update_object_meta(self, user, account, container, name, domain, meta,
947
                           replace=False):
948
        """Update object metadata for a domain and return the new version."""
949

    
950
        self._can_write(user, account, container, name)
951

    
952
        path, node = self._lookup_object(account, container, name,
953
                                         lock_container=True)
954
        src_version_id, dest_version_id = self._put_metadata(
955
            user, node, domain, meta, replace,
956
            update_statistics_ancestors_depth=1)
957
        self._apply_versioning(account, container, src_version_id,
958
                               update_statistics_ancestors_depth=1)
959
        return dest_version_id
960

    
961
    @debug_method
962
    @backend_method
963
    def get_object_permissions_bulk(self, user, account, container, names):
964
        """Return the action allowed on the object, the path
965
        from which the object gets its permissions from,
966
        along with a dictionary containing the permissions."""
967

    
968
        permissions_path = self._get_permissions_path_bulk(account, container,
969
                                                           names)
970
        access_objects = self.permissions.access_check_bulk(permissions_path,
971
                                                            user)
972
        #group_parents = access_objects['group_parents']
973
        nobject_permissions = {}
974
        cpath = '/'.join((account, container, ''))
975
        cpath_idx = len(cpath)
976
        for path in permissions_path:
977
            allowed = 1
978
            name = path[cpath_idx:]
979
            if user != account:
980
                try:
981
                    allowed = access_objects[path]
982
                except KeyError:
983
                    raise NotAllowedError
984
            access_dict, allowed = \
985
                self.permissions.access_get_for_bulk(access_objects[path])
986
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
987
                                         access_dict)
988
        self._lookup_objects(permissions_path)
989
        return nobject_permissions
990

    
991
    @debug_method
992
    @backend_method
993
    def get_object_permissions(self, user, account, container, name):
994
        """Return the action allowed on the object, the path
995
        from which the object gets its permissions from,
996
        along with a dictionary containing the permissions."""
997

    
998
        allowed = 'write'
999
        permissions_path = self._get_permissions_path(account, container, name)
1000
        if user != account:
1001
            if self.permissions.access_check(permissions_path, self.WRITE,
1002
                                             user):
1003
                allowed = 'write'
1004
            elif self.permissions.access_check(permissions_path, self.READ,
1005
                                               user):
1006
                allowed = 'read'
1007
            else:
1008
                raise NotAllowedError
1009
        self._lookup_object(account, container, name)
1010
        return (allowed,
1011
                permissions_path,
1012
                self.permissions.access_get(permissions_path))
1013

    
1014
    @debug_method
1015
    @backend_method
1016
    def update_object_permissions(self, user, account, container, name,
1017
                                  permissions):
1018
        """Update the permissions associated with the object."""
1019

    
1020
        if user != account:
1021
            raise NotAllowedError
1022
        path = self._lookup_object(account, container, name,
1023
                                   lock_container=True)[0]
1024
        self._check_permissions(path, permissions)
1025
        try:
1026
            self.permissions.access_set(path, permissions)
1027
        except:
1028
            raise ValueError
1029
        else:
1030
            self._report_sharing_change(user, account, path, {'members':
1031
                                        self.permissions.access_members(path)})
1032

    
1033
    @debug_method
1034
    @backend_method
1035
    def get_object_public(self, user, account, container, name):
1036
        """Return the public id of the object if applicable."""
1037

    
1038
        self._can_read(user, account, container, name)
1039
        path = self._lookup_object(account, container, name)[0]
1040
        p = self.permissions.public_get(path)
1041
        return p
1042

    
1043
    @debug_method
1044
    @backend_method
1045
    def update_object_public(self, user, account, container, name, public):
1046
        """Update the public status of the object."""
1047

    
1048
        self._can_write(user, account, container, name)
1049
        path = self._lookup_object(account, container, name,
1050
                                   lock_container=True)[0]
1051
        if not public:
1052
            self.permissions.public_unset(path)
1053
        else:
1054
            self.permissions.public_set(
1055
                path, self.public_url_security, self.public_url_alphabet)
1056

    
1057
    def _update_available(self, props):
1058
        """Checks if the object map exists and updates the database"""
1059

    
1060
        if not props[self.AVAILABLE]:
1061
            if props[self.MAP_CHECK_TIMESTAMP]:
1062
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1063
                if elapsed_time < self.map_check_interval:
1064
                    raise NotAllowedError(
1065
                        'Consequent map checks are limited: retry later.')
1066
        try:
1067
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1068
                                                     props[self.SIZE])
1069
        except:  # map does not exist
1070
            # Raising an exception results in db transaction rollback
1071
            # However we have to force the update of the database
1072
            self.wrapper.rollback()  # rollback existing transaction
1073
            self.wrapper.execute()  # start new transaction
1074
            self.node.version_put_property(props[self.SERIAL],
1075
                                           'map_check_timestamp', time())
1076
            self.wrapper.commit()  # commit transaction
1077
            self.wrapper.execute()  # start new transaction
1078
            raise IllegalOperationError(
1079
                'Unable to retrieve Archipelago Volume hashmap.')
1080
        else:  # map exists
1081
            self.node.version_put_property(props[self.SERIAL],
1082
                                           'available', True)
1083
            self.node.version_put_property(props[self.SERIAL],
1084
                                           'map_check_timestamp', time())
1085
            return hashmap
1086

    
1087
    @debug_method
1088
    @backend_method
1089
    def get_object_hashmap(self, user, account, container, name, version=None):
1090
        """Return the object's size and a list with partial hashes."""
1091

    
1092
        self._can_read(user, account, container, name)
1093
        path, node = self._lookup_object(account, container, name)
1094
        props = self._get_version(node, version)
1095
        if props[self.HASH] is None:
1096
            return 0, ()
1097
        if props[self.HASH].startswith('archip:'):
1098
            hashmap = self._update_available(props)
1099
            return props[self.SIZE], [x for x in hashmap]
1100
        else:
1101
            hashmap = self.store.map_get(self._unhexlify_hash(
1102
                props[self.HASH]))
1103
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1104

    
1105
    def _update_object_hash(self, user, account, container, name, size, type,
1106
                            hash, checksum, domain, meta, replace_meta,
1107
                            permissions, src_node=None, src_version_id=None,
1108
                            is_copy=False, report_size_change=True,
1109
                            available=True):
1110
        if permissions is not None and user != account:
1111
            raise NotAllowedError
1112
        self._can_write(user, account, container, name)
1113
        if permissions is not None:
1114
            path = '/'.join((account, container, name))
1115
            self._check_permissions(path, permissions)
1116

    
1117
        account_path, account_node = self._lookup_account(account, True)
1118
        container_path, container_node = self._lookup_container(
1119
            account, container)
1120

    
1121
        path, node = self._put_object_node(
1122
            container_path, container_node, name)
1123
        pre_version_id, dest_version_id = self._put_version_duplicate(
1124
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1125
            checksum=checksum, is_copy=is_copy,
1126
            update_statistics_ancestors_depth=1, available=available)
1127

    
1128
        # Handle meta.
1129
        if src_version_id is None:
1130
            src_version_id = pre_version_id
1131
        self._put_metadata_duplicate(
1132
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1133

    
1134
        del_size = self._apply_versioning(account, container, pre_version_id,
1135
                                          update_statistics_ancestors_depth=1)
1136
        size_delta = size - del_size
1137
        if size_delta > 0:
1138
            # Check account quota.
1139
            if not self.using_external_quotaholder:
1140
                account_quota = long(self._get_policy(
1141
                    account_node, is_account_policy=True)['quota'])
1142
                account_usage = self._get_statistics(account_node,
1143
                                                     compute=True)[1]
1144
                if (account_quota > 0 and account_usage > account_quota):
1145
                    raise QuotaError(
1146
                        'Account quota exceeded: limit: %s, usage: %s' % (
1147
                            account_quota, account_usage))
1148

    
1149
            # Check container quota.
1150
            container_quota = long(self._get_policy(
1151
                container_node, is_account_policy=False)['quota'])
1152
            container_usage = self._get_statistics(container_node)[1]
1153
            if (container_quota > 0 and container_usage > container_quota):
1154
                # This must be executed in a transaction, so the version is
1155
                # never created if it fails.
1156
                raise QuotaError(
1157
                    'Container quota exceeded: limit: %s, usage: %s' % (
1158
                        container_quota, container_usage
1159
                    )
1160
                )
1161

    
1162
        if report_size_change:
1163
            self._report_size_change(
1164
                user, account, size_delta,
1165
                {'action': 'object update', 'path': path,
1166
                 'versions': ','.join([str(dest_version_id)])})
1167
        if permissions is not None:
1168
            self.permissions.access_set(path, permissions)
1169
            self._report_sharing_change(
1170
                user, account, path,
1171
                {'members': self.permissions.access_members(path)})
1172

    
1173
        self._report_object_change(
1174
            user, account, path,
1175
            details={'version': dest_version_id, 'action': 'object update'})
1176
        return dest_version_id
1177

    
1178
    @debug_method
1179
    @backend_method
1180
    def register_object_map(self, user, account, container, name, size, type,
1181
                            mapfile, checksum='', domain='pithos', meta=None,
1182
                            replace_meta=False, permissions=None):
1183
        """Register an object mapfile without providing any data.
1184

1185
        Lock the container path, create a node pointing to the object path,
1186
        create a version pointing to the mapfile
1187
        and issue the size change in the quotaholder.
1188

1189
        :param user: the user account which performs the action
1190

1191
        :param account: the account under which the object resides
1192

1193
        :param container: the container under which the object resides
1194

1195
        :param name: the object name
1196

1197
        :param size: the object size
1198

1199
        :param type: the object mimetype
1200

1201
        :param mapfile: the mapfile pointing to the object data
1202

1203
        :param checkcum: the md5 checksum (optional)
1204

1205
        :param domain: the object domain
1206

1207
        :param meta: a dict with custom object metadata
1208

1209
        :param replace_meta: replace existing metadata or not
1210

1211
        :param permissions: a dict with the read and write object permissions
1212

1213
        :returns: the new object uuid
1214

1215
        :raises: ItemNotExists, NotAllowedError, QuotaError
1216
        """
1217

    
1218
        meta = meta or {}
1219
        try:
1220
            self.lock_container_path = True
1221
            self.put_container(user, account, container, policy=None)
1222
        except ContainerExists:
1223
            pass
1224
        finally:
1225
            self.lock_container_path = False
1226
        dest_version_id = self._update_object_hash(
1227
            user, account, container, name, size, type, mapfile, checksum,
1228
            domain, meta, replace_meta, permissions, available=False)
1229
        return self.node.version_get_properties(dest_version_id,
1230
                                                keys=('uuid',))[0]
1231

    
1232
    @debug_method
1233
    def update_object_hashmap(self, user, account, container, name, size, type,
1234
                              hashmap, checksum, domain, meta=None,
1235
                              replace_meta=False, permissions=None):
1236
        """Create/update an object's hashmap and return the new version."""
1237

    
1238
        for h in hashmap:
1239
            if h.startswith('archip_'):
1240
                raise IllegalOperationError(
1241
                    'Cannot update Archipelago Volume hashmap.')
1242
        meta = meta or {}
1243
        if size == 0:  # No such thing as an empty hashmap.
1244
            hashmap = [self.put_block('')]
1245
        map = HashMap(self.block_size, self.hash_algorithm)
1246
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1247
        missing = self.store.block_search(map)
1248
        if missing:
1249
            ie = IndexError()
1250
            ie.data = [binascii.hexlify(x) for x in missing]
1251
            raise ie
1252

    
1253
        hash = map.hash()
1254
        hexlified = binascii.hexlify(hash)
1255
        # _update_object_hash() locks destination path
1256
        dest_version_id = self._update_object_hash(
1257
            user, account, container, name, size, type, hexlified, checksum,
1258
            domain, meta, replace_meta, permissions)
1259
        self.store.map_put(hash, map)
1260
        return dest_version_id, hexlified
1261

    
1262
    @debug_method
1263
    @backend_method
1264
    def update_object_checksum(self, user, account, container, name, version,
1265
                               checksum):
1266
        """Update an object's checksum."""
1267

    
1268
        # Update objects with greater version and same hashmap
1269
        # and size (fix metadata updates).
1270
        self._can_write(user, account, container, name)
1271
        path, node = self._lookup_object(account, container, name,
1272
                                         lock_container=True)
1273
        props = self._get_version(node, version)
1274
        versions = self.node.node_get_versions(node)
1275
        for x in versions:
1276
            if (x[self.SERIAL] >= int(version) and
1277
                x[self.HASH] == props[self.HASH] and
1278
                    x[self.SIZE] == props[self.SIZE]):
1279
                self.node.version_put_property(
1280
                    x[self.SERIAL], 'checksum', checksum)
1281

    
1282
    def _copy_object(self, user, src_account, src_container, src_name,
1283
                     dest_account, dest_container, dest_name, type,
1284
                     dest_domain=None, dest_meta=None, replace_meta=False,
1285
                     permissions=None, src_version=None, is_move=False,
1286
                     delimiter=None):
1287

    
1288
        report_size_change = not is_move
1289
        dest_meta = dest_meta or {}
1290
        dest_version_ids = []
1291
        self._can_read(user, src_account, src_container, src_name)
1292

    
1293
        src_container_path = '/'.join((src_account, src_container))
1294
        dest_container_path = '/'.join((dest_account, dest_container))
1295
        # Lock container paths in alphabetical order
1296
        if src_container_path < dest_container_path:
1297
            self._lookup_container(src_account, src_container)
1298
            self._lookup_container(dest_account, dest_container)
1299
        else:
1300
            self._lookup_container(dest_account, dest_container)
1301
            self._lookup_container(src_account, src_container)
1302

    
1303
        path, node = self._lookup_object(src_account, src_container, src_name)
1304
        # TODO: Will do another fetch of the properties in duplicate version...
1305
        props = self._get_version(
1306
            node, src_version)  # Check to see if source exists.
1307
        src_version_id = props[self.SERIAL]
1308
        hash = props[self.HASH]
1309
        size = props[self.SIZE]
1310
        is_copy = not is_move and (src_account, src_container, src_name) != (
1311
            dest_account, dest_container, dest_name)  # New uuid.
1312
        dest_version_ids.append(self._update_object_hash(
1313
            user, dest_account, dest_container, dest_name, size, type, hash,
1314
            None, dest_domain, dest_meta, replace_meta, permissions,
1315
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1316
            report_size_change=report_size_change))
1317
        if is_move and ((src_account, src_container, src_name) !=
1318
                        (dest_account, dest_container, dest_name)):
1319
            self._delete_object(user, src_account, src_container, src_name,
1320
                                report_size_change=report_size_change)
1321

    
1322
        if delimiter:
1323
            prefix = (src_name + delimiter if not
1324
                      src_name.endswith(delimiter) else src_name)
1325
            src_names = self._list_objects_no_limit(
1326
                user, src_account, src_container, prefix, delimiter=None,
1327
                virtual=False, domain=None, keys=[], shared=False, until=None,
1328
                size_range=None, all_props=True, public=False)
1329
            src_names.sort(key=lambda x: x[2])  # order by nodes
1330
            paths = [elem[0] for elem in src_names]
1331
            nodes = [elem[2] for elem in src_names]
1332
            # TODO: Will do another fetch of the properties
1333
            # in duplicate version...
1334
            props = self._get_versions(nodes)  # Check to see if source exists.
1335

    
1336
            for prop, path, node in zip(props, paths, nodes):
1337
                src_version_id = prop[self.SERIAL]
1338
                hash = prop[self.HASH]
1339
                vtype = prop[self.TYPE]
1340
                size = prop[self.SIZE]
1341
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1342
                    delimiter) else dest_name
1343
                vdest_name = path.replace(prefix, dest_prefix, 1)
1344
                # _update_object_hash() locks destination path
1345
                dest_version_ids.append(self._update_object_hash(
1346
                    user, dest_account, dest_container, vdest_name, size,
1347
                    vtype, hash, None, dest_domain, meta={},
1348
                    replace_meta=False, permissions=None, src_node=node,
1349
                    src_version_id=src_version_id, is_copy=is_copy,
1350
                    report_size_change=report_size_change))
1351
                if is_move and ((src_account, src_container, src_name) !=
1352
                                (dest_account, dest_container, dest_name)):
1353
                    self._delete_object(user, src_account, src_container, path,
1354
                                        report_size_change=report_size_change)
1355
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1356
                dest_version_ids)
1357

    
1358
    @debug_method
1359
    @backend_method
1360
    def copy_object(self, user, src_account, src_container, src_name,
1361
                    dest_account, dest_container, dest_name, type, domain,
1362
                    meta=None, replace_meta=False, permissions=None,
1363
                    src_version=None, delimiter=None):
1364
        """Copy an object's data and metadata."""
1365

    
1366
        meta = meta or {}
1367
        dest_version_id = self._copy_object(
1368
            user, src_account, src_container, src_name, dest_account,
1369
            dest_container, dest_name, type, domain, meta, replace_meta,
1370
            permissions, src_version, False, delimiter)
1371
        return dest_version_id
1372

    
1373
    @debug_method
1374
    @backend_method
1375
    def move_object(self, user, src_account, src_container, src_name,
1376
                    dest_account, dest_container, dest_name, type, domain,
1377
                    meta=None, replace_meta=False, permissions=None,
1378
                    delimiter=None):
1379
        """Move an object's data and metadata."""
1380

    
1381
        meta = meta or {}
1382
        if user != src_account:
1383
            raise NotAllowedError
1384
        dest_version_id = self._move_object(
1385
            user, src_account, src_container, src_name, dest_account,
1386
            dest_container, dest_name, type, domain, meta, replace_meta,
1387
            permissions, None, delimiter=delimiter)
1388
        return dest_version_id
1389

    
1390
    def _delete_object(self, user, account, container, name, until=None,
1391
                       delimiter=None, report_size_change=True):
1392
        if user != account:
1393
            raise NotAllowedError
1394

    
1395
        # lookup object and lock container path also
1396
        path, node = self._lookup_object(account, container, name,
1397
                                         lock_container=True)
1398

    
1399
        if until is not None:
1400
            if node is None:
1401
                return
1402
            hashes = []
1403
            size = 0
1404
            serials = []
1405
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1406
                                           update_statistics_ancestors_depth=1)
1407
            hashes += h
1408
            size += s
1409
            serials += v
1410
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1411
                                           update_statistics_ancestors_depth=1)
1412
            hashes += h
1413
            if not self.free_versioning:
1414
                size += s
1415
            serials += v
1416
            for h in hashes:
1417
                self.store.map_delete(h)
1418
            self.node.node_purge(node, until, CLUSTER_DELETED,
1419
                                 update_statistics_ancestors_depth=1)
1420
            try:
1421
                self._get_version(node)
1422
            except NameError:
1423
                self.permissions.access_clear(path)
1424
            self._report_size_change(
1425
                user, account, -size, {
1426
                    'action': 'object purge',
1427
                    'path': path,
1428
                    'versions': ','.join(str(i) for i in serials)
1429
                }
1430
            )
1431
            return
1432

    
1433
        if not self._exists(node):
1434
            raise ItemNotExists('Object is deleted.')
1435

    
1436
        src_version_id, dest_version_id = self._put_version_duplicate(
1437
            user, node, size=0, type='', hash=None, checksum='',
1438
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1439
        del_size = self._apply_versioning(account, container, src_version_id,
1440
                                          update_statistics_ancestors_depth=1)
1441
        if report_size_change:
1442
            self._report_size_change(
1443
                user, account, -del_size,
1444
                {'action': 'object delete',
1445
                 'path': path,
1446
                 'versions': ','.join([str(dest_version_id)])})
1447
        self._report_object_change(
1448
            user, account, path, details={'action': 'object delete'})
1449
        self.permissions.access_clear(path)
1450

    
1451
        if delimiter:
1452
            prefix = name + delimiter if not name.endswith(delimiter) else name
1453
            src_names = self._list_objects_no_limit(
1454
                user, account, container, prefix, delimiter=None,
1455
                virtual=False, domain=None, keys=[], shared=False, until=None,
1456
                size_range=None, all_props=True, public=False)
1457
            paths = []
1458
            for t in src_names:
1459
                path = '/'.join((account, container, t[0]))
1460
                node = t[2]
1461
                if not self._exists(node):
1462
                    continue
1463
                src_version_id, dest_version_id = self._put_version_duplicate(
1464
                    user, node, size=0, type='', hash=None, checksum='',
1465
                    cluster=CLUSTER_DELETED,
1466
                    update_statistics_ancestors_depth=1)
1467
                del_size = self._apply_versioning(
1468
                    account, container, src_version_id,
1469
                    update_statistics_ancestors_depth=1)
1470
                if report_size_change:
1471
                    self._report_size_change(
1472
                        user, account, -del_size,
1473
                        {'action': 'object delete',
1474
                         'path': path,
1475
                         'versions': ','.join([str(dest_version_id)])})
1476
                self._report_object_change(
1477
                    user, account, path, details={'action': 'object delete'})
1478
                paths.append(path)
1479
            self.permissions.access_clear_bulk(paths)
1480

    
1481
    @debug_method
1482
    @backend_method
1483
    def delete_object(self, user, account, container, name, until=None,
1484
                      prefix='', delimiter=None):
1485
        """Delete/purge an object."""
1486

    
1487
        self._delete_object(user, account, container, name, until, delimiter)
1488

    
1489
    @debug_method
1490
    @backend_method
1491
    def list_versions(self, user, account, container, name):
1492
        """Return a list of all object (version, version_timestamp) tuples."""
1493

    
1494
        self._can_read(user, account, container, name)
1495
        path, node = self._lookup_object(account, container, name)
1496
        versions = self.node.node_get_versions(node)
1497
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1498
                x[self.CLUSTER] != CLUSTER_DELETED]
1499

    
1500
    @debug_method
1501
    @backend_method
1502
    def get_uuid(self, user, uuid, check_permissions=True):
1503
        """Return the (account, container, name) for the UUID given."""
1504

    
1505
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1506
        if info is None:
1507
            raise NameError
1508
        path, serial = info
1509
        account, container, name = path.split('/', 2)
1510
        if check_permissions:
1511
            self._can_read(user, account, container, name)
1512
        return (account, container, name)
1513

    
1514
    @debug_method
1515
    @backend_method
1516
    def get_public(self, user, public):
1517
        """Return the (account, container, name) for the public id given."""
1518

    
1519
        path = self.permissions.public_path(public)
1520
        if path is None:
1521
            raise NameError
1522
        account, container, name = path.split('/', 2)
1523
        self._can_read(user, account, container, name)
1524
        return (account, container, name)
1525

    
1526
    def get_block(self, hash):
1527
        """Return a block's data."""
1528

    
1529
        logger.debug("get_block: %s", hash)
1530
        if hash.startswith('archip_'):
1531
            block = self.store.block_get_archipelago(hash)
1532
        else:
1533
            block = self.store.block_get(self._unhexlify_hash(hash))
1534
        if not block:
1535
            raise ItemNotExists('Block does not exist')
1536
        return block
1537

    
1538
    def put_block(self, data):
1539
        """Store a block and return the hash."""
1540

    
1541
        logger.debug("put_block: %s", len(data))
1542
        return binascii.hexlify(self.store.block_put(data))
1543

    
1544
    def update_block(self, hash, data, offset=0):
1545
        """Update a known block and return the hash."""
1546

    
1547
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1548
        if hash.startswith('archip_'):
1549
            raise IllegalOperationError(
1550
                'Cannot update an Archipelago Volume block.')
1551
        if offset == 0 and len(data) == self.block_size:
1552
            return self.put_block(data)
1553
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1554
        return binascii.hexlify(h)
1555

    
1556
    # Path functions.
1557

    
1558
    def _generate_uuid(self):
1559
        return str(uuidlib.uuid4())
1560

    
1561
    def _put_object_node(self, path, parent, name):
1562
        path = '/'.join((path, name))
1563
        node = self.node.node_lookup(path)
1564
        if node is None:
1565
            node = self.node.node_create(parent, path)
1566
        return path, node
1567

    
1568
    def _put_path(self, user, parent, path,
1569
                  update_statistics_ancestors_depth=None):
1570
        node = self.node.node_create(parent, path)
1571
        self.node.version_create(node, None, 0, '', None, user,
1572
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1573
                                 update_statistics_ancestors_depth)
1574
        return node
1575

    
1576
    def _lookup_account(self, account, create=True):
1577
        node = self.node.node_lookup(account)
1578
        if node is None and create:
1579
            node = self._put_path(
1580
                account, self.ROOTNODE, account,
1581
                update_statistics_ancestors_depth=-1)  # User is account.
1582
        return account, node
1583

    
1584
    def _lookup_container(self, account, container):
1585
        for_update = True if self.lock_container_path else False
1586
        path = '/'.join((account, container))
1587
        node = self.node.node_lookup(path, for_update)
1588
        if node is None:
1589
            raise ItemNotExists('Container does not exist')
1590
        return path, node
1591

    
1592
    def _lookup_object(self, account, container, name, lock_container=False):
1593
        if lock_container:
1594
            self._lookup_container(account, container)
1595

    
1596
        path = '/'.join((account, container, name))
1597
        node = self.node.node_lookup(path)
1598
        if node is None:
1599
            raise ItemNotExists('Object does not exist')
1600
        return path, node
1601

    
1602
    def _lookup_objects(self, paths):
1603
        nodes = self.node.node_lookup_bulk(paths)
1604
        return paths, nodes
1605

    
1606
    def _get_properties(self, node, until=None):
1607
        """Return properties until the timestamp given."""
1608

    
1609
        before = until if until is not None else inf
1610
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1611
        if props is None and until is not None:
1612
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1613
        if props is None:
1614
            raise ItemNotExists('Path does not exist')
1615
        return props
1616

    
1617
    def _get_statistics(self, node, until=None, compute=False):
1618
        """Return (count, sum of size, timestamp) of everything under node."""
1619

    
1620
        if until is not None:
1621
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1622
        elif compute:
1623
            stats = self.node.statistics_latest(node,
1624
                                                except_cluster=CLUSTER_DELETED)
1625
        else:
1626
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1627
        if stats is None:
1628
            stats = (0, 0, 0)
1629
        return stats
1630

    
1631
    def _get_version(self, node, version=None):
1632
        if version is None:
1633
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1634
            if props is None:
1635
                raise ItemNotExists('Object does not exist')
1636
        else:
1637
            try:
1638
                version = int(version)
1639
            except ValueError:
1640
                raise VersionNotExists('Version does not exist')
1641
            props = self.node.version_get_properties(version, node=node)
1642
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1643
                raise VersionNotExists('Version does not exist')
1644
        return props
1645

    
1646
    def _get_versions(self, nodes):
1647
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1648

    
1649
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1650
                               type=None, hash=None, checksum=None,
1651
                               cluster=CLUSTER_NORMAL, is_copy=False,
1652
                               update_statistics_ancestors_depth=None,
1653
                               available=True):
1654
        """Create a new version of the node."""
1655

    
1656
        props = self.node.version_lookup(
1657
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1658
        if props is not None:
1659
            src_version_id = props[self.SERIAL]
1660
            src_hash = props[self.HASH]
1661
            src_size = props[self.SIZE]
1662
            src_type = props[self.TYPE]
1663
            src_checksum = props[self.CHECKSUM]
1664
        else:
1665
            src_version_id = None
1666
            src_hash = None
1667
            src_size = 0
1668
            src_type = ''
1669
            src_checksum = ''
1670
        if size is None:  # Set metadata.
1671
            hash = src_hash  # This way hash can be set to None
1672
                             # (account or container).
1673
            size = src_size
1674
        if type is None:
1675
            type = src_type
1676
        if checksum is None:
1677
            checksum = src_checksum
1678
        uuid = self._generate_uuid(
1679
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1680

    
1681
        if src_node is None:
1682
            pre_version_id = src_version_id
1683
        else:
1684
            pre_version_id = None
1685
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1686
            if props is not None:
1687
                pre_version_id = props[self.SERIAL]
1688
        if pre_version_id is not None:
1689
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1690
                                        update_statistics_ancestors_depth)
1691

    
1692
        dest_version_id, mtime = self.node.version_create(
1693
            node, hash, size, type, src_version_id, user, uuid, checksum,
1694
            cluster, update_statistics_ancestors_depth,
1695
            available=available)
1696

    
1697
        self.node.attribute_unset_is_latest(node, dest_version_id)
1698

    
1699
        return pre_version_id, dest_version_id
1700

    
1701
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1702
                                node, meta, replace=False):
1703
        if src_version_id is not None:
1704
            self.node.attribute_copy(src_version_id, dest_version_id)
1705
        if not replace:
1706
            self.node.attribute_del(dest_version_id, domain, (
1707
                k for k, v in meta.iteritems() if v == ''))
1708
            self.node.attribute_set(dest_version_id, domain, node, (
1709
                (k, v) for k, v in meta.iteritems() if v != ''))
1710
        else:
1711
            self.node.attribute_del(dest_version_id, domain)
1712
            self.node.attribute_set(dest_version_id, domain, node, ((
1713
                k, v) for k, v in meta.iteritems()))
1714

    
1715
    def _put_metadata(self, user, node, domain, meta, replace=False,
1716
                      update_statistics_ancestors_depth=None):
1717
        """Create a new version and store metadata."""
1718

    
1719
        src_version_id, dest_version_id = self._put_version_duplicate(
1720
            user, node,
1721
            update_statistics_ancestors_depth=
1722
            update_statistics_ancestors_depth)
1723
        self._put_metadata_duplicate(
1724
            src_version_id, dest_version_id, domain, node, meta, replace)
1725
        return src_version_id, dest_version_id
1726

    
1727
    def _list_limits(self, listing, marker, limit):
1728
        start = 0
1729
        if marker:
1730
            try:
1731
                start = listing.index(marker) + 1
1732
            except ValueError:
1733
                pass
1734
        if not limit or limit > 10000:
1735
            limit = 10000
1736
        return start, limit
1737

    
1738
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1739
                                marker=None, limit=10000, virtual=True,
1740
                                domain=None, keys=None, until=None,
1741
                                size_range=None, allowed=None,
1742
                                all_props=False):
1743
        keys = keys or []
1744
        allowed = allowed or []
1745
        cont_prefix = path + '/'
1746
        prefix = cont_prefix + prefix
1747
        start = cont_prefix + marker if marker else None
1748
        before = until if until is not None else inf
1749
        filterq = keys if domain else []
1750
        sizeq = size_range
1751

    
1752
        objects, prefixes = self.node.latest_version_list(
1753
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1754
            allowed, domain, filterq, sizeq, all_props)
1755
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1756
        objects.sort(key=lambda x: x[0])
1757
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1758
        return objects
1759

    
1760
    # Reporting functions.
1761

    
1762
    @debug_method
1763
    @backend_method
1764
    def _report_size_change(self, user, account, size, details=None):
1765
        details = details or {}
1766

    
1767
        if size == 0:
1768
            return
1769

    
1770
        account_node = self._lookup_account(account, True)[1]
1771
        total = self._get_statistics(account_node, compute=True)[1]
1772
        details.update({'user': user, 'total': total})
1773
        self.messages.append(
1774
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1775
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1776

    
1777
        if not self.using_external_quotaholder:
1778
            return
1779

    
1780
        try:
1781
            name = details['path'] if 'path' in details else ''
1782
            serial = self.astakosclient.issue_one_commission(
1783
                holder=account,
1784
                source=DEFAULT_SOURCE,
1785
                provisions={'pithos.diskspace': size},
1786
                name=name)
1787
        except BaseException, e:
1788
            raise QuotaError(e)
1789
        else:
1790
            self.serials.append(serial)
1791

    
1792
    @debug_method
1793
    @backend_method
1794
    def _report_object_change(self, user, account, path, details=None):
1795
        details = details or {}
1796
        details.update({'user': user})
1797
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1798
                              account, QUEUE_INSTANCE_ID, 'object', path,
1799
                              details))
1800

    
1801
    @debug_method
1802
    @backend_method
1803
    def _report_sharing_change(self, user, account, path, details=None):
1804
        details = details or {}
1805
        details.update({'user': user})
1806
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1807
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1808
                              details))
1809

    
1810
    # Policy functions.
1811

    
1812
    def _check_policy(self, policy, is_account_policy=True):
1813
        default_policy = self.default_account_policy \
1814
            if is_account_policy else self.default_container_policy
1815
        for k in policy.keys():
1816
            if policy[k] == '':
1817
                policy[k] = default_policy.get(k)
1818
        for k, v in policy.iteritems():
1819
            if k == 'quota':
1820
                q = int(v)  # May raise ValueError.
1821
                if q < 0:
1822
                    raise ValueError
1823
            elif k == 'versioning':
1824
                if v not in ['auto', 'none']:
1825
                    raise ValueError
1826
            else:
1827
                raise ValueError
1828

    
1829
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1830
        default_policy = self.default_account_policy \
1831
            if is_account_policy else self.default_container_policy
1832
        if replace:
1833
            for k, v in default_policy.iteritems():
1834
                if k not in policy:
1835
                    policy[k] = v
1836
        self.node.policy_set(node, policy)
1837

    
1838
    def _get_policy(self, node, is_account_policy=True):
1839
        default_policy = self.default_account_policy \
1840
            if is_account_policy else self.default_container_policy
1841
        policy = default_policy.copy()
1842
        policy.update(self.node.policy_get(node))
1843
        return policy
1844

    
1845
    def _apply_versioning(self, account, container, version_id,
1846
                          update_statistics_ancestors_depth=None):
1847
        """Delete the provided version if such is the policy.
1848
           Return size of object removed.
1849
        """
1850

    
1851
        if version_id is None:
1852
            return 0
1853
        path, node = self._lookup_container(account, container)
1854
        versioning = self._get_policy(
1855
            node, is_account_policy=False)['versioning']
1856
        if versioning != 'auto':
1857
            hash, size = self.node.version_remove(
1858
                version_id, update_statistics_ancestors_depth)
1859
            self.store.map_delete(hash)
1860
            return size
1861
        elif self.free_versioning:
1862
            return self.node.version_get_properties(
1863
                version_id, keys=('size',))[0]
1864
        return 0
1865

    
1866
    # Access control functions.
1867

    
1868
    def _check_groups(self, groups):
1869
        # raise ValueError('Bad characters in groups')
1870
        pass
1871

    
1872
    def _check_permissions(self, path, permissions):
1873
        # raise ValueError('Bad characters in permissions')
1874
        pass
1875

    
1876
    def _get_formatted_paths(self, paths):
1877
        formatted = []
1878
        if len(paths) == 0:
1879
            return formatted
1880
        props = self.node.get_props(paths)
1881
        if props:
1882
            for prop in props:
1883
                if prop[1].split(';', 1)[0].strip() in (
1884
                        'application/directory', 'application/folder'):
1885
                    formatted.append((prop[0].rstrip('/') + '/',
1886
                                      self.MATCH_PREFIX))
1887
                formatted.append((prop[0], self.MATCH_EXACT))
1888
        return formatted
1889

    
1890
    def _get_permissions_path(self, account, container, name):
1891
        path = '/'.join((account, container, name))
1892
        permission_paths = self.permissions.access_inherit(path)
1893
        permission_paths.sort()
1894
        permission_paths.reverse()
1895
        for p in permission_paths:
1896
            if p == path:
1897
                return p
1898
            else:
1899
                if p.count('/') < 2:
1900
                    continue
1901
                node = self.node.node_lookup(p)
1902
                props = None
1903
                if node is not None:
1904
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1905
                if props is not None:
1906
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1907
                            'application/directory', 'application/folder'):
1908
                        return p
1909
        return None
1910

    
1911
    def _get_permissions_path_bulk(self, account, container, names):
1912
        formatted_paths = []
1913
        for name in names:
1914
            path = '/'.join((account, container, name))
1915
            formatted_paths.append(path)
1916
        permission_paths = self.permissions.access_inherit_bulk(
1917
            formatted_paths)
1918
        permission_paths.sort()
1919
        permission_paths.reverse()
1920
        permission_paths_list = []
1921
        lookup_list = []
1922
        for p in permission_paths:
1923
            if p in formatted_paths:
1924
                permission_paths_list.append(p)
1925
            else:
1926
                if p.count('/') < 2:
1927
                    continue
1928
                lookup_list.append(p)
1929

    
1930
        if len(lookup_list) > 0:
1931
            props = self.node.get_props(lookup_list)
1932
            if props:
1933
                for prop in props:
1934
                    if prop[1].split(';', 1)[0].strip() in (
1935
                            'application/directory', 'application/folder'):
1936
                        permission_paths_list.append(prop[0])
1937

    
1938
        if len(permission_paths_list) > 0:
1939
            return permission_paths_list
1940

    
1941
        return None
1942

    
1943
    def _can_read(self, user, account, container, name):
1944
        if user == account:
1945
            return True
1946
        path = '/'.join((account, container, name))
1947
        if self.permissions.public_get(path) is not None:
1948
            return True
1949
        path = self._get_permissions_path(account, container, name)
1950
        if not path:
1951
            raise NotAllowedError
1952
        if (not self.permissions.access_check(path, self.READ, user) and not
1953
                self.permissions.access_check(path, self.WRITE, user)):
1954
            raise NotAllowedError
1955

    
1956
    def _can_write(self, user, account, container, name):
1957
        if user == account:
1958
            return True
1959
        path = '/'.join((account, container, name))
1960
        path = self._get_permissions_path(account, container, name)
1961
        if not path:
1962
            raise NotAllowedError
1963
        if not self.permissions.access_check(path, self.WRITE, user):
1964
            raise NotAllowedError
1965

    
1966
    def _allowed_accounts(self, user):
1967
        allow = set()
1968
        for path in self.permissions.access_list_paths(user):
1969
            allow.add(path.split('/', 1)[0])
1970
        return sorted(allow)
1971

    
1972
    def _allowed_containers(self, user, account):
1973
        allow = set()
1974
        for path in self.permissions.access_list_paths(user, account):
1975
            allow.add(path.split('/', 2)[1])
1976
        return sorted(allow)
1977

    
1978
    # Domain functions
1979

    
1980
    @debug_method
1981
    @backend_method
1982
    def get_domain_objects(self, domain, user=None):
1983
        allowed_paths = self.permissions.access_list_paths(
1984
            user, include_owned=user is not None, include_containers=False)
1985
        if not allowed_paths:
1986
            return []
1987
        obj_list = self.node.domain_object_list(
1988
            domain, allowed_paths, CLUSTER_NORMAL)
1989
        return [(path,
1990
                 self._build_metadata(props, user_defined_meta),
1991
                 self.permissions.access_get(path)) for
1992
                path, props, user_defined_meta in obj_list]
1993

    
1994
    # util functions
1995

    
1996
    def _build_metadata(self, props, user_defined=None,
1997
                        include_user_defined=True):
1998
        meta = {'bytes': props[self.SIZE],
1999
                'type': props[self.TYPE],
2000
                'hash': props[self.HASH],
2001
                'version': props[self.SERIAL],
2002
                'version_timestamp': props[self.MTIME],
2003
                'modified_by': props[self.MUSER],
2004
                'uuid': props[self.UUID],
2005
                'checksum': props[self.CHECKSUM]}
2006
        if include_user_defined and user_defined is not None:
2007
            meta.update(user_defined)
2008
        return meta
2009

    
2010
    def _exists(self, node):
2011
        try:
2012
            self._get_version(node)
2013
        except ItemNotExists:
2014
            return False
2015
        else:
2016
            return True
2017

    
2018
    def _unhexlify_hash(self, hash):
2019
        try:
2020
            return binascii.unhexlify(hash)
2021
        except TypeError:
2022
            raise InvalidHash(hash)