Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 5f9426d9

History | View | Annotate | Download (80.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42
from time import time
43

    
44
from pithos.workers import glue
45
from archipelago.common import Segment, Xseg_ctx
46
from objpool import ObjectPool
47

    
48

    
49
try:
50
    from astakosclient import AstakosClient
51
except ImportError:
52
    AstakosClient = None
53

    
54
from pithos.backends.base import (
55
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
56
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
57
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
58
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
59
    InvalidHash, IllegalOperationError)
60

    
61

    
62
class DisabledAstakosClient(object):
63
    def __init__(self, *args, **kwargs):
64
        self.args = args
65
        self.kwargs = kwargs
66

    
67
    def __getattr__(self, name):
68
        m = ("AstakosClient has been disabled, "
69
             "yet an attempt to access it was made")
70
        raise AssertionError(m)
71

    
72

    
73
# Stripped-down version of the HashMap class found in tools.
74

    
75
class HashMap(list):
76

    
77
    def __init__(self, blocksize, blockhash):
78
        super(HashMap, self).__init__()
79
        self.blocksize = blocksize
80
        self.blockhash = blockhash
81

    
82
    def _hash_raw(self, v):
83
        h = hashlib.new(self.blockhash)
84
        h.update(v)
85
        return h.digest()
86

    
87
    def hash(self):
88
        if len(self) == 0:
89
            return self._hash_raw('')
90
        if len(self) == 1:
91
            return self.__getitem__(0)
92

    
93
        h = list(self)
94
        s = 2
95
        while s < len(h):
96
            s = s * 2
97
        h += [('\x00' * len(h[0]))] * (s - len(h))
98
        while len(h) > 1:
99
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
100
        return h[0]
101

    
102
# Default modules and settings.
103
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
104
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
105
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
106
DEFAULT_BLOCK_PATH = 'data/'
107
DEFAULT_BLOCK_UMASK = 0o022
108
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
109
DEFAULT_HASH_ALGORITHM = 'sha256'
110
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
111
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
112
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
113
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
114
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
115
                               'abcdefghijklmnopqrstuvwxyz'
116
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
117
DEFAULT_PUBLIC_URL_SECURITY = 16
118
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
119

    
120
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
121
QUEUE_CLIENT_ID = 'pithos'
122
QUEUE_INSTANCE_ID = '1'
123

    
124
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
125

    
126
inf = float('inf')
127

    
128
ULTIMATE_ANSWER = 42
129

    
130
DEFAULT_SOURCE = 'system'
131
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
132

    
133
DEFAULT_MAP_CHECK_INTERVAL = 300  # set to 300 secs
134

    
135
logger = logging.getLogger(__name__)
136

    
137

    
138
def backend_method(func):
139
    @wraps(func)
140
    def wrapper(self, *args, **kw):
141
        # if we are inside a database transaction
142
        # just proceed with the method execution
143
        # otherwise manage a new transaction
144
        if self.in_transaction:
145
            return func(self, *args, **kw)
146

    
147
        try:
148
            self.pre_exec()
149
            result = func(self, *args, **kw)
150
            success_status = True
151
            return result
152
        except:
153
            success_status = False
154
            raise
155
        finally:
156
            self.post_exec(success_status)
157
    return wrapper
158

    
159

    
160
def debug_method(func):
161
    @wraps(func)
162
    def wrapper(self, *args, **kw):
163
        try:
164
            result = func(self, *args, **kw)
165
            return result
166
        except:
167
            result = format_exc()
168
            raise
169
        finally:
170
            all_args = map(repr, args)
171
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
172
            logger.debug(">>> %s(%s) <<< %s" % (
173
                func.__name__, ', '.join(all_args).rstrip(', '), result))
174
    return wrapper
175

    
176

    
177
def list_method(func):
178
    @wraps(func)
179
    def wrapper(self, *args, **kw):
180
        marker = kw.get('marker')
181
        limit = kw.get('limit')
182
        result = func(self, *args, **kw)
183
        start, limit = self._list_limits(result, marker, limit)
184
        return result[start:start + limit]
185
    return wrapper
186

    
187

    
188
class ModularBackend(BaseBackend):
189
    """A modular backend.
190

191
    Uses modules for SQL functions and storage.
192
    """
193

    
194
    def __init__(self, db_module=None, db_connection=None,
195
                 block_module=None, block_path=None, block_umask=None,
196
                 block_size=None, hash_algorithm=None,
197
                 queue_module=None, queue_hosts=None, queue_exchange=None,
198
                 astakos_auth_url=None, service_token=None,
199
                 astakosclient_poolsize=None,
200
                 free_versioning=True, block_params=None,
201
                 public_url_security=None,
202
                 public_url_alphabet=None,
203
                 account_quota_policy=None,
204
                 container_quota_policy=None,
205
                 container_versioning_policy=None,
206
                 archipelago_conf_file=None,
207
                 xseg_pool_size=8,
208
                 map_check_interval=None):
209
        db_module = db_module or DEFAULT_DB_MODULE
210
        db_connection = db_connection or DEFAULT_DB_CONNECTION
211
        block_module = block_module or DEFAULT_BLOCK_MODULE
212
        block_path = block_path or DEFAULT_BLOCK_PATH
213
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
214
        block_params = block_params or DEFAULT_BLOCK_PARAMS
215
        block_size = block_size or DEFAULT_BLOCK_SIZE
216
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
217
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
218
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
219
        container_quota_policy = container_quota_policy \
220
            or DEFAULT_CONTAINER_QUOTA
221
        container_versioning_policy = container_versioning_policy \
222
            or DEFAULT_CONTAINER_VERSIONING
223
        archipelago_conf_file = archipelago_conf_file \
224
            or DEFAULT_ARCHIPELAGO_CONF_FILE
225
        map_check_interval = map_check_interval \
226
            or DEFAULT_MAP_CHECK_INTERVAL
227

    
228
        self.default_account_policy = {'quota': account_quota_policy}
229
        self.default_container_policy = {
230
            'quota': container_quota_policy,
231
            'versioning': container_versioning_policy
232
        }
233
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
234
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
235

    
236
        self.public_url_security = (public_url_security or
237
                                    DEFAULT_PUBLIC_URL_SECURITY)
238
        self.public_url_alphabet = (public_url_alphabet or
239
                                    DEFAULT_PUBLIC_URL_ALPHABET)
240

    
241
        self.hash_algorithm = hash_algorithm
242
        self.block_size = block_size
243
        self.free_versioning = free_versioning
244
        self.map_check_interval = map_check_interval
245

    
246
        def load_module(m):
247
            __import__(m)
248
            return sys.modules[m]
249

    
250
        self.db_module = load_module(db_module)
251
        self.wrapper = self.db_module.DBWrapper(db_connection)
252
        params = {'wrapper': self.wrapper}
253
        self.permissions = self.db_module.Permissions(**params)
254
        self.config = self.db_module.Config(**params)
255
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
256
        for x in ['READ', 'WRITE']:
257
            setattr(self, x, getattr(self.db_module, x))
258
        self.node = self.db_module.Node(**params)
259
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
260
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
261
                  'MATCH_PREFIX', 'MATCH_EXACT',
262
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
263
            setattr(self, x, getattr(self.db_module, x))
264

    
265
        self.ALLOWED = ['read', 'write']
266

    
267
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
268
                                      cfile=archipelago_conf_file,
269
                                      pool_size=xseg_pool_size)
270
        self.block_module = load_module(block_module)
271
        self.block_params = block_params
272
        params = {'path': block_path,
273
                  'block_size': self.block_size,
274
                  'hash_algorithm': self.hash_algorithm,
275
                  'umask': block_umask}
276
        params.update(self.block_params)
277
        self.store = self.block_module.Store(**params)
278

    
279
        if queue_module and queue_hosts:
280
            self.queue_module = load_module(queue_module)
281
            params = {'hosts': queue_hosts,
282
                      'exchange': queue_exchange,
283
                      'client_id': QUEUE_CLIENT_ID}
284
            self.queue = self.queue_module.Queue(**params)
285
        else:
286
            class NoQueue:
287
                def send(self, *args):
288
                    pass
289

    
290
                def close(self):
291
                    pass
292

    
293
            self.queue = NoQueue()
294

    
295
        self.astakos_auth_url = astakos_auth_url
296
        self.service_token = service_token
297

    
298
        if not astakos_auth_url or not AstakosClient:
299
            self.astakosclient = DisabledAstakosClient(
300
                service_token, astakos_auth_url,
301
                use_pool=True,
302
                pool_size=astakosclient_poolsize)
303
        else:
304
            self.astakosclient = AstakosClient(
305
                service_token, astakos_auth_url,
306
                use_pool=True,
307
                pool_size=astakosclient_poolsize)
308

    
309
        self.serials = []
310
        self.messages = []
311

    
312
        self._move_object = partial(self._copy_object, is_move=True)
313

    
314
        self.lock_container_path = False
315

    
316
        self.in_transaction = False
317

    
318
    def pre_exec(self, lock_container_path=False):
319
        self.lock_container_path = lock_container_path
320
        self.wrapper.execute()
321
        self.serials = []
322
        self.in_transaction = True
323

    
324
    def post_exec(self, success_status=True):
325
        if success_status:
326
            # send messages produced
327
            for m in self.messages:
328
                self.queue.send(*m)
329

    
330
            # register serials
331
            if self.serials:
332
                self.commission_serials.insert_many(
333
                    self.serials)
334

    
335
                # commit to ensure that the serials are registered
336
                # even if resolve commission fails
337
                self.wrapper.commit()
338

    
339
                # start new transaction
340
                self.wrapper.execute()
341

    
342
                r = self.astakosclient.resolve_commissions(
343
                    accept_serials=self.serials,
344
                    reject_serials=[])
345
                self.commission_serials.delete_many(
346
                    r['accepted'])
347

    
348
            self.wrapper.commit()
349
        else:
350
            if self.serials:
351
                r = self.astakosclient.resolve_commissions(
352
                    accept_serials=[],
353
                    reject_serials=self.serials)
354
                self.commission_serials.delete_many(
355
                    r['rejected'])
356
            self.wrapper.rollback()
357
        self.in_transaction = False
358

    
359
    def close(self):
360
        self.wrapper.close()
361
        self.queue.close()
362

    
363
    @property
364
    def using_external_quotaholder(self):
365
        return not isinstance(self.astakosclient, DisabledAstakosClient)
366

    
367
    @debug_method
368
    @backend_method
369
    @list_method
370
    def list_accounts(self, user, marker=None, limit=10000):
371
        """Return a list of accounts the user can access."""
372

    
373
        return self._allowed_accounts(user)
374

    
375
    def _get_account_quotas(self, account):
376
        """Get account usage from astakos."""
377

    
378
        quotas = self.astakosclient.service_get_quotas(account)[account]
379
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
380
                                                  {})
381

    
382
    def _get_account_quotas(self, account):
383
        """Get account usage from astakos."""
384

    
385
        quotas = self.astakosclient.service_get_quotas(account)[account]
386
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
387
                                                  {})
388

    
389
    @debug_method
390
    @backend_method
391
    def get_account_meta(self, user, account, domain, until=None,
392
                         include_user_defined=True):
393
        """Return a dictionary with the account metadata for the domain."""
394

    
395
        path, node = self._lookup_account(account, user == account)
396
        if user != account:
397
            if until or (node is None) or (account not
398
                                           in self._allowed_accounts(user)):
399
                raise NotAllowedError
400
        try:
401
            props = self._get_properties(node, until)
402
            mtime = props[self.MTIME]
403
        except NameError:
404
            props = None
405
            mtime = until
406
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
407
        tstamp = max(tstamp, mtime)
408
        if until is None:
409
            modified = tstamp
410
        else:
411
            modified = self._get_statistics(
412
                node, compute=True)[2]  # Overall last modification.
413
            modified = max(modified, mtime)
414

    
415
        if user != account:
416
            meta = {'name': account}
417
        else:
418
            meta = {}
419
            if props is not None and include_user_defined:
420
                meta.update(
421
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
422
            if until is not None:
423
                meta.update({'until_timestamp': tstamp})
424
            meta.update({'name': account, 'count': count, 'bytes': bytes})
425
            if self.using_external_quotaholder:
426
                external_quota = self._get_account_quotas(account)
427
                meta['bytes'] = external_quota.get('usage', 0)
428
        meta.update({'modified': modified})
429
        return meta
430

    
431
    @debug_method
432
    @backend_method
433
    def update_account_meta(self, user, account, domain, meta, replace=False):
434
        """Update the metadata associated with the account for the domain."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        path, node = self._lookup_account(account, True)
439
        self._put_metadata(user, node, domain, meta, replace,
440
                           update_statistics_ancestors_depth=-1)
441

    
442
    @debug_method
443
    @backend_method
444
    def get_account_groups(self, user, account):
445
        """Return a dictionary with the user groups defined for the account."""
446

    
447
        if user != account:
448
            if account not in self._allowed_accounts(user):
449
                raise NotAllowedError
450
            return {}
451
        self._lookup_account(account, True)
452
        return self.permissions.group_dict(account)
453

    
454
    @debug_method
455
    @backend_method
456
    def update_account_groups(self, user, account, groups, replace=False):
457
        """Update the groups associated with the account."""
458

    
459
        if user != account:
460
            raise NotAllowedError
461
        self._lookup_account(account, True)
462
        self._check_groups(groups)
463
        if replace:
464
            self.permissions.group_destroy(account)
465
        for k, v in groups.iteritems():
466
            if not replace:  # If not already deleted.
467
                self.permissions.group_delete(account, k)
468
            if v:
469
                self.permissions.group_addmany(account, k, v)
470

    
471
    @debug_method
472
    @backend_method
473
    def get_account_policy(self, user, account):
474
        """Return a dictionary with the account policy."""
475

    
476
        if user != account:
477
            if account not in self._allowed_accounts(user):
478
                raise NotAllowedError
479
            return {}
480
        path, node = self._lookup_account(account, True)
481
        policy = self._get_policy(node, is_account_policy=True)
482
        if self.using_external_quotaholder:
483
            external_quota = self._get_account_quotas(account)
484
            policy['quota'] = external_quota.get('limit', 0)
485
        return policy
486

    
487
    @debug_method
488
    @backend_method
489
    def update_account_policy(self, user, account, policy, replace=False):
490
        """Update the policy associated with the account."""
491

    
492
        if user != account:
493
            raise NotAllowedError
494
        path, node = self._lookup_account(account, True)
495
        self._check_policy(policy, is_account_policy=True)
496
        self._put_policy(node, policy, replace, is_account_policy=True)
497

    
498
    @debug_method
499
    @backend_method
500
    def put_account(self, user, account, policy=None):
501
        """Create a new account with the given name."""
502

    
503
        policy = policy or {}
504
        if user != account:
505
            raise NotAllowedError
506
        node = self.node.node_lookup(account)
507
        if node is not None:
508
            raise AccountExists('Account already exists')
509
        if policy:
510
            self._check_policy(policy, is_account_policy=True)
511
        node = self._put_path(user, self.ROOTNODE, account,
512
                              update_statistics_ancestors_depth=-1)
513
        self._put_policy(node, policy, True, is_account_policy=True)
514

    
515
    @debug_method
516
    @backend_method
517
    def delete_account(self, user, account):
518
        """Delete the account with the given name."""
519

    
520
        if user != account:
521
            raise NotAllowedError
522
        node = self.node.node_lookup(account)
523
        if node is None:
524
            return
525
        if not self.node.node_remove(node,
526
                                     update_statistics_ancestors_depth=-1):
527
            raise AccountNotEmpty('Account is not empty')
528
        self.permissions.group_destroy(account)
529

    
530
    @debug_method
531
    @backend_method
532
    @list_method
533
    def list_containers(self, user, account, marker=None, limit=10000,
534
                        shared=False, until=None, public=False):
535
        """Return a list of containers existing under an account."""
536

    
537
        if user != account:
538
            if until or account not in self._allowed_accounts(user):
539
                raise NotAllowedError
540
            return self._allowed_containers(user, account)
541
        if shared or public:
542
            allowed = set()
543
            if shared:
544
                allowed.update([x.split('/', 2)[1] for x in
545
                               self.permissions.access_list_shared(account)])
546
            if public:
547
                allowed.update([x[0].split('/', 2)[1] for x in
548
                               self.permissions.public_list(account)])
549
            return sorted(allowed)
550
        node = self.node.node_lookup(account)
551
        return [x[0] for x in self._list_object_properties(
552
            node, account, '', '/', marker, limit, False, None, [], until)]
553

    
554
    @debug_method
555
    @backend_method
556
    def list_container_meta(self, user, account, container, domain,
557
                            until=None):
558
        """Return a list of the container's object meta keys for a domain."""
559

    
560
        allowed = []
561
        if user != account:
562
            if until:
563
                raise NotAllowedError
564
            allowed = self.permissions.access_list_paths(
565
                user, '/'.join((account, container)))
566
            if not allowed:
567
                raise NotAllowedError
568
        path, node = self._lookup_container(account, container)
569
        before = until if until is not None else inf
570
        allowed = self._get_formatted_paths(allowed)
571
        return self.node.latest_attribute_keys(node, domain, before,
572
                                               CLUSTER_DELETED, allowed)
573

    
574
    @debug_method
575
    @backend_method
576
    def get_container_meta(self, user, account, container, domain, until=None,
577
                           include_user_defined=True):
578
        """Return a dictionary with the container metadata for the domain."""
579

    
580
        if user != account:
581
            if until or container not in self._allowed_containers(user,
582
                                                                  account):
583
                raise NotAllowedError
584
        path, node = self._lookup_container(account, container)
585
        props = self._get_properties(node, until)
586
        mtime = props[self.MTIME]
587
        count, bytes, tstamp = self._get_statistics(node, until)
588
        tstamp = max(tstamp, mtime)
589
        if until is None:
590
            modified = tstamp
591
        else:
592
            modified = self._get_statistics(
593
                node)[2]  # Overall last modification.
594
            modified = max(modified, mtime)
595

    
596
        if user != account:
597
            meta = {'name': container}
598
        else:
599
            meta = {}
600
            if include_user_defined:
601
                meta.update(
602
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
603
            if until is not None:
604
                meta.update({'until_timestamp': tstamp})
605
            meta.update({'name': container, 'count': count, 'bytes': bytes})
606
        meta.update({'modified': modified})
607
        return meta
608

    
609
    @debug_method
610
    @backend_method
611
    def update_container_meta(self, user, account, container, domain, meta,
612
                              replace=False):
613
        """Update the metadata associated with the container for the domain."""
614

    
615
        if user != account:
616
            raise NotAllowedError
617
        path, node = self._lookup_container(account, container)
618
        src_version_id, dest_version_id = self._put_metadata(
619
            user, node, domain, meta, replace,
620
            update_statistics_ancestors_depth=0)
621
        if src_version_id is not None:
622
            versioning = self._get_policy(
623
                node, is_account_policy=False)['versioning']
624
            if versioning != 'auto':
625
                self.node.version_remove(src_version_id,
626
                                         update_statistics_ancestors_depth=0)
627

    
628
    @debug_method
629
    @backend_method
630
    def get_container_policy(self, user, account, container):
631
        """Return a dictionary with the container policy."""
632

    
633
        if user != account:
634
            if container not in self._allowed_containers(user, account):
635
                raise NotAllowedError
636
            return {}
637
        path, node = self._lookup_container(account, container)
638
        return self._get_policy(node, is_account_policy=False)
639

    
640
    @debug_method
641
    @backend_method
642
    def update_container_policy(self, user, account, container, policy,
643
                                replace=False):
644
        """Update the policy associated with the container."""
645

    
646
        if user != account:
647
            raise NotAllowedError
648
        path, node = self._lookup_container(account, container)
649
        self._check_policy(policy, is_account_policy=False)
650
        self._put_policy(node, policy, replace, is_account_policy=False)
651

    
652
    @debug_method
653
    @backend_method
654
    def put_container(self, user, account, container, policy=None):
655
        """Create a new container with the given name."""
656

    
657
        policy = policy or {}
658
        if user != account:
659
            raise NotAllowedError
660
        try:
661
            path, node = self._lookup_container(account, container)
662
        except NameError:
663
            pass
664
        else:
665
            raise ContainerExists('Container already exists')
666
        if policy:
667
            self._check_policy(policy, is_account_policy=False)
668
        path = '/'.join((account, container))
669
        node = self._put_path(
670
            user, self._lookup_account(account, True)[1], path,
671
            update_statistics_ancestors_depth=-1)
672
        self._put_policy(node, policy, True, is_account_policy=False)
673

    
674
    @debug_method
675
    @backend_method
676
    def delete_container(self, user, account, container, until=None, prefix='',
677
                         delimiter=None):
678
        """Delete/purge the container with the given name."""
679

    
680
        if user != account:
681
            raise NotAllowedError
682
        path, node = self._lookup_container(account, container)
683

    
684
        if until is not None:
685
            hashes, size, serials = self.node.node_purge_children(
686
                node, until, CLUSTER_HISTORY,
687
                update_statistics_ancestors_depth=0)
688
            for h in hashes:
689
                self.store.map_delete(h)
690
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
691
                                          update_statistics_ancestors_depth=0)
692
            if not self.free_versioning:
693
                self._report_size_change(
694
                    user, account, -size, {
695
                        'action': 'container purge',
696
                        'path': path,
697
                        'versions': ','.join(str(i) for i in serials)
698
                    }
699
                )
700
            return
701

    
702
        if not delimiter:
703
            if self._get_statistics(node)[0] > 0:
704
                raise ContainerNotEmpty('Container is not empty')
705
            hashes, size, serials = self.node.node_purge_children(
706
                node, inf, CLUSTER_HISTORY,
707
                update_statistics_ancestors_depth=0)
708
            for h in hashes:
709
                self.store.map_delete(h)
710
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
711
                                          update_statistics_ancestors_depth=0)
712
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
713
            if not self.free_versioning:
714
                self._report_size_change(
715
                    user, account, -size, {
716
                        'action': 'container purge',
717
                        'path': path,
718
                        'versions': ','.join(str(i) for i in serials)
719
                    }
720
                )
721
        else:
722
            # remove only contents
723
            src_names = self._list_objects_no_limit(
724
                user, account, container, prefix='', delimiter=None,
725
                virtual=False, domain=None, keys=[], shared=False, until=None,
726
                size_range=None, all_props=True, public=False)
727
            paths = []
728
            for t in src_names:
729
                path = '/'.join((account, container, t[0]))
730
                node = t[2]
731
                if not self._exists(node):
732
                    continue
733
                src_version_id, dest_version_id = self._put_version_duplicate(
734
                    user, node, size=0, type='', hash=None, checksum='',
735
                    cluster=CLUSTER_DELETED,
736
                    update_statistics_ancestors_depth=1)
737
                del_size = self._apply_versioning(
738
                    account, container, src_version_id,
739
                    update_statistics_ancestors_depth=1)
740
                self._report_size_change(
741
                    user, account, -del_size, {
742
                        'action': 'object delete',
743
                        'path': path,
744
                        'versions': ','.join([str(dest_version_id)])})
745
                self._report_object_change(
746
                    user, account, path, details={'action': 'object delete'})
747
                paths.append(path)
748
            self.permissions.access_clear_bulk(paths)
749

    
750
    def _list_objects(self, user, account, container, prefix, delimiter,
751
                      marker, limit, virtual, domain, keys, shared, until,
752
                      size_range, all_props, public):
753
        if user != account and until:
754
            raise NotAllowedError
755

    
756
        objects = []
757
        if shared and public:
758
            # get shared first
759
            shared_paths = self._list_object_permissions(
760
                user, account, container, prefix, shared=True, public=False)
761
            if shared_paths:
762
                path, node = self._lookup_container(account, container)
763
                shared_paths = self._get_formatted_paths(shared_paths)
764
                objects = set(self._list_object_properties(
765
                    node, path, prefix, delimiter, marker, limit, virtual,
766
                    domain, keys, until, size_range, shared_paths, all_props))
767

    
768
            # get public
769
            objects |= set(self._list_public_object_properties(
770
                user, account, container, prefix, all_props))
771
            objects = list(objects)
772

    
773
            objects.sort(key=lambda x: x[0])
774
        elif public:
775
            objects = self._list_public_object_properties(
776
                user, account, container, prefix, all_props)
777
        else:
778
            allowed = self._list_object_permissions(
779
                user, account, container, prefix, shared, public=False)
780
            if shared and not allowed:
781
                return []
782
            path, node = self._lookup_container(account, container)
783
            allowed = self._get_formatted_paths(allowed)
784
            objects = self._list_object_properties(
785
                node, path, prefix, delimiter, marker, limit, virtual, domain,
786
                keys, until, size_range, allowed, all_props)
787

    
788
        # apply limits
789
        start, limit = self._list_limits(objects, marker, limit)
790
        return objects[start:start + limit]
791

    
792
    def _list_public_object_properties(self, user, account, container, prefix,
793
                                       all_props):
794
        public = self._list_object_permissions(
795
            user, account, container, prefix, shared=False, public=True)
796
        paths, nodes = self._lookup_objects(public)
797
        path = '/'.join((account, container))
798
        cont_prefix = path + '/'
799
        paths = [x[len(cont_prefix):] for x in paths]
800
        objects = [(p,) + props for p, props in
801
                   zip(paths, self.node.version_lookup_bulk(
802
                       nodes, all_props=all_props, order_by_path=True))]
803
        return objects
804

    
805
    def _list_objects_no_limit(self, user, account, container, prefix,
806
                               delimiter, virtual, domain, keys, shared, until,
807
                               size_range, all_props, public):
808
        objects = []
809
        while True:
810
            marker = objects[-1] if objects else None
811
            limit = 10000
812
            l = self._list_objects(
813
                user, account, container, prefix, delimiter, marker, limit,
814
                virtual, domain, keys, shared, until, size_range, all_props,
815
                public)
816
            objects.extend(l)
817
            if not l or len(l) < limit:
818
                break
819
        return objects
820

    
821
    def _list_object_permissions(self, user, account, container, prefix,
822
                                 shared, public):
823
        allowed = []
824
        path = '/'.join((account, container, prefix)).rstrip('/')
825
        if user != account:
826
            allowed = self.permissions.access_list_paths(user, path)
827
            if not allowed:
828
                raise NotAllowedError
829
        else:
830
            allowed = set()
831
            if shared:
832
                allowed.update(self.permissions.access_list_shared(path))
833
            if public:
834
                allowed.update(
835
                    [x[0] for x in self.permissions.public_list(path)])
836
            allowed = sorted(allowed)
837
            if not allowed:
838
                return []
839
        return allowed
840

    
841
    @debug_method
842
    @backend_method
843
    def list_objects(self, user, account, container, prefix='', delimiter=None,
844
                     marker=None, limit=10000, virtual=True, domain=None,
845
                     keys=None, shared=False, until=None, size_range=None,
846
                     public=False):
847
        """List (object name, object version_id) under a container."""
848

    
849
        keys = keys or []
850
        return self._list_objects(
851
            user, account, container, prefix, delimiter, marker, limit,
852
            virtual, domain, keys, shared, until, size_range, False, public)
853

    
854
    @debug_method
855
    @backend_method
856
    def list_object_meta(self, user, account, container, prefix='',
857
                         delimiter=None, marker=None, limit=10000,
858
                         virtual=True, domain=None, keys=None, shared=False,
859
                         until=None, size_range=None, public=False):
860
        """Return a list of metadata dicts of objects under a container."""
861

    
862
        keys = keys or []
863
        props = self._list_objects(
864
            user, account, container, prefix, delimiter, marker, limit,
865
            virtual, domain, keys, shared, until, size_range, True, public)
866
        objects = []
867
        for p in props:
868
            if len(p) == 2:
869
                objects.append({'subdir': p[0]})
870
            else:
871
                objects.append({
872
                    'name': p[0],
873
                    'bytes': p[self.SIZE + 1],
874
                    'type': p[self.TYPE + 1],
875
                    'hash': p[self.HASH + 1],
876
                    'version': p[self.SERIAL + 1],
877
                    'version_timestamp': p[self.MTIME + 1],
878
                    'modified': p[self.MTIME + 1] if until is None else None,
879
                    'modified_by': p[self.MUSER + 1],
880
                    'uuid': p[self.UUID + 1],
881
                    'checksum': p[self.CHECKSUM + 1]})
882
        return objects
883

    
884
    @debug_method
885
    @backend_method
886
    def list_object_permissions(self, user, account, container, prefix=''):
887
        """Return a list of paths enforce permissions under a container."""
888

    
889
        return self._list_object_permissions(user, account, container, prefix,
890
                                             True, False)
891

    
892
    @debug_method
893
    @backend_method
894
    def list_object_public(self, user, account, container, prefix=''):
895
        """Return a mapping of object paths to public ids under a container."""
896

    
897
        public = {}
898
        for path, p in self.permissions.public_list('/'.join((account,
899
                                                              container,
900
                                                              prefix))):
901
            public[path] = p
902
        return public
903

    
904
    @debug_method
905
    @backend_method
906
    def get_object_meta(self, user, account, container, name, domain,
907
                        version=None, include_user_defined=True):
908
        """Return a dictionary with the object metadata for the domain."""
909

    
910
        self._can_read(user, account, container, name)
911
        path, node = self._lookup_object(account, container, name)
912
        props = self._get_version(node, version)
913
        if version is None:
914
            if not props[self.AVAILABLE]:
915
                try:
916
                    self._update_available(props)
917
                except (NotAllowedError, IllegalOperationError):
918
                    pass  # just update the database
919
                finally:
920
                    # get updated properties
921
                    props = self._get_version(node, version)
922
            modified = props[self.MTIME]
923
        else:
924
            try:
925
                modified = self._get_version(
926
                    node)[self.MTIME]  # Overall last modification.
927
            except NameError:  # Object may be deleted.
928
                del_props = self.node.version_lookup(
929
                    node, inf, CLUSTER_DELETED)
930
                if del_props is None:
931
                    raise ItemNotExists('Object does not exist')
932
                modified = del_props[self.MTIME]
933

    
934
        meta = {}
935
        if include_user_defined:
936
            meta.update(
937
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
938
        meta.update({'name': name,
939
                     'bytes': props[self.SIZE],
940
                     'type': props[self.TYPE],
941
                     'hash': props[self.HASH],
942
                     'version': props[self.SERIAL],
943
                     'version_timestamp': props[self.MTIME],
944
                     'modified': modified,
945
                     'modified_by': props[self.MUSER],
946
                     'uuid': props[self.UUID],
947
                     'checksum': props[self.CHECKSUM],
948
                     'available': props[self.AVAILABLE],
949
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
950
        return meta
951

    
952
    @debug_method
953
    @backend_method
954
    def update_object_meta(self, user, account, container, name, domain, meta,
955
                           replace=False):
956
        """Update object metadata for a domain and return the new version."""
957

    
958
        self._can_write(user, account, container, name)
959

    
960
        path, node = self._lookup_object(account, container, name,
961
                                         lock_container=True)
962
        src_version_id, dest_version_id = self._put_metadata(
963
            user, node, domain, meta, replace,
964
            update_statistics_ancestors_depth=1)
965
        self._apply_versioning(account, container, src_version_id,
966
                               update_statistics_ancestors_depth=1)
967
        return dest_version_id
968

    
969
    @debug_method
970
    @backend_method
971
    def get_object_permissions_bulk(self, user, account, container, names):
972
        """Return the action allowed on the object, the path
973
        from which the object gets its permissions from,
974
        along with a dictionary containing the permissions."""
975

    
976
        permissions_path = self._get_permissions_path_bulk(account, container,
977
                                                           names)
978
        access_objects = self.permissions.access_check_bulk(permissions_path,
979
                                                            user)
980
        #group_parents = access_objects['group_parents']
981
        nobject_permissions = {}
982
        cpath = '/'.join((account, container, ''))
983
        cpath_idx = len(cpath)
984
        for path in permissions_path:
985
            allowed = 1
986
            name = path[cpath_idx:]
987
            if user != account:
988
                try:
989
                    allowed = access_objects[path]
990
                except KeyError:
991
                    raise NotAllowedError
992
            access_dict, allowed = \
993
                self.permissions.access_get_for_bulk(access_objects[path])
994
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
995
                                         access_dict)
996
        self._lookup_objects(permissions_path)
997
        return nobject_permissions
998

    
999
    @debug_method
1000
    @backend_method
1001
    def get_object_permissions(self, user, account, container, name):
1002
        """Return the action allowed on the object, the path
1003
        from which the object gets its permissions from,
1004
        along with a dictionary containing the permissions."""
1005

    
1006
        allowed = 'write'
1007
        permissions_path = self._get_permissions_path(account, container, name)
1008
        if user != account:
1009
            if self.permissions.access_check(permissions_path, self.WRITE,
1010
                                             user):
1011
                allowed = 'write'
1012
            elif self.permissions.access_check(permissions_path, self.READ,
1013
                                               user):
1014
                allowed = 'read'
1015
            else:
1016
                raise NotAllowedError
1017
        self._lookup_object(account, container, name)
1018
        return (allowed,
1019
                permissions_path,
1020
                self.permissions.access_get(permissions_path))
1021

    
1022
    @debug_method
1023
    @backend_method
1024
    def update_object_permissions(self, user, account, container, name,
1025
                                  permissions):
1026
        """Update the permissions associated with the object."""
1027

    
1028
        if user != account:
1029
            raise NotAllowedError
1030
        path = self._lookup_object(account, container, name,
1031
                                   lock_container=True)[0]
1032
        self._check_permissions(path, permissions)
1033
        try:
1034
            self.permissions.access_set(path, permissions)
1035
        except:
1036
            raise ValueError
1037
        else:
1038
            self._report_sharing_change(user, account, path, {'members':
1039
                                        self.permissions.access_members(path)})
1040

    
1041
    @debug_method
1042
    @backend_method
1043
    def get_object_public(self, user, account, container, name):
1044
        """Return the public id of the object if applicable."""
1045

    
1046
        self._can_read(user, account, container, name)
1047
        path = self._lookup_object(account, container, name)[0]
1048
        p = self.permissions.public_get(path)
1049
        return p
1050

    
1051
    @debug_method
1052
    @backend_method
1053
    def update_object_public(self, user, account, container, name, public):
1054
        """Update the public status of the object."""
1055

    
1056
        self._can_write(user, account, container, name)
1057
        path = self._lookup_object(account, container, name,
1058
                                   lock_container=True)[0]
1059
        if not public:
1060
            self.permissions.public_unset(path)
1061
        else:
1062
            self.permissions.public_set(
1063
                path, self.public_url_security, self.public_url_alphabet)
1064

    
1065
    def _update_available(self, props):
1066
        """Checks if the object map exists and updates the database"""
1067

    
1068
        if not props[self.AVAILABLE]:
1069
            if props[self.MAP_CHECK_TIMESTAMP]:
1070
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1071
                if elapsed_time < self.map_check_interval:
1072
                    raise NotAllowedError(
1073
                        'Consequent map checks are limited: retry later.')
1074
        try:
1075
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1076
                                                     props[self.SIZE])
1077
        except:  # map does not exist
1078
            # Raising an exception results in db transaction rollback
1079
            # However we have to force the update of the database
1080
            self.wrapper.rollback()  # rollback existing transaction
1081
            self.wrapper.execute()  # start new transaction
1082
            self.node.version_put_property(props[self.SERIAL],
1083
                                           'map_check_timestamp', time())
1084
            self.wrapper.commit()  # commit transaction
1085
            self.wrapper.execute()  # start new transaction
1086
            raise IllegalOperationError(
1087
                'Unable to retrieve Archipelago Volume hashmap.')
1088
        else:  # map exists
1089
            self.node.version_put_property(props[self.SERIAL],
1090
                                           'available', True)
1091
            self.node.version_put_property(props[self.SERIAL],
1092
                                           'map_check_timestamp', time())
1093
            return hashmap
1094

    
1095
    @debug_method
1096
    @backend_method
1097
    def get_object_hashmap(self, user, account, container, name, version=None):
1098
        """Return the object's size and a list with partial hashes."""
1099

    
1100
        self._can_read(user, account, container, name)
1101
        path, node = self._lookup_object(account, container, name)
1102
        props = self._get_version(node, version)
1103
        if props[self.HASH] is None:
1104
            return 0, ()
1105
        if props[self.HASH].startswith('archip:'):
1106
            hashmap = self._update_available(props)
1107
            return props[self.SIZE], [x for x in hashmap]
1108
        else:
1109
            hashmap = self.store.map_get(self._unhexlify_hash(
1110
                props[self.HASH]))
1111
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1112

    
1113
    def _update_object_hash(self, user, account, container, name, size, type,
1114
                            hash, checksum, domain, meta, replace_meta,
1115
                            permissions, src_node=None, src_version_id=None,
1116
                            is_copy=False, report_size_change=True,
1117
                            available=True):
1118
        if permissions is not None and user != account:
1119
            raise NotAllowedError
1120
        self._can_write(user, account, container, name)
1121
        if permissions is not None:
1122
            path = '/'.join((account, container, name))
1123
            self._check_permissions(path, permissions)
1124

    
1125
        account_path, account_node = self._lookup_account(account, True)
1126
        container_path, container_node = self._lookup_container(
1127
            account, container)
1128

    
1129
        path, node = self._put_object_node(
1130
            container_path, container_node, name)
1131
        pre_version_id, dest_version_id = self._put_version_duplicate(
1132
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1133
            checksum=checksum, is_copy=is_copy,
1134
            update_statistics_ancestors_depth=1,
1135
            available=available, keep_available=False)
1136

    
1137
        # Handle meta.
1138
        if src_version_id is None:
1139
            src_version_id = pre_version_id
1140
        self._put_metadata_duplicate(
1141
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1142

    
1143
        del_size = self._apply_versioning(account, container, pre_version_id,
1144
                                          update_statistics_ancestors_depth=1)
1145
        size_delta = size - del_size
1146
        if size_delta > 0:
1147
            # Check account quota.
1148
            if not self.using_external_quotaholder:
1149
                account_quota = long(self._get_policy(
1150
                    account_node, is_account_policy=True)['quota'])
1151
                account_usage = self._get_statistics(account_node,
1152
                                                     compute=True)[1]
1153
                if (account_quota > 0 and account_usage > account_quota):
1154
                    raise QuotaError(
1155
                        'Account quota exceeded: limit: %s, usage: %s' % (
1156
                            account_quota, account_usage))
1157

    
1158
            # Check container quota.
1159
            container_quota = long(self._get_policy(
1160
                container_node, is_account_policy=False)['quota'])
1161
            container_usage = self._get_statistics(container_node)[1]
1162
            if (container_quota > 0 and container_usage > container_quota):
1163
                # This must be executed in a transaction, so the version is
1164
                # never created if it fails.
1165
                raise QuotaError(
1166
                    'Container quota exceeded: limit: %s, usage: %s' % (
1167
                        container_quota, container_usage
1168
                    )
1169
                )
1170

    
1171
        if report_size_change:
1172
            self._report_size_change(
1173
                user, account, size_delta,
1174
                {'action': 'object update', 'path': path,
1175
                 'versions': ','.join([str(dest_version_id)])})
1176
        if permissions is not None:
1177
            self.permissions.access_set(path, permissions)
1178
            self._report_sharing_change(
1179
                user, account, path,
1180
                {'members': self.permissions.access_members(path)})
1181

    
1182
        self._report_object_change(
1183
            user, account, path,
1184
            details={'version': dest_version_id, 'action': 'object update'})
1185
        return dest_version_id
1186

    
1187
    @debug_method
1188
    @backend_method
1189
    def register_object_map(self, user, account, container, name, size, type,
1190
                            mapfile, checksum='', domain='pithos', meta=None,
1191
                            replace_meta=False, permissions=None):
1192
        """Register an object mapfile without providing any data.
1193

1194
        Lock the container path, create a node pointing to the object path,
1195
        create a version pointing to the mapfile
1196
        and issue the size change in the quotaholder.
1197

1198
        :param user: the user account which performs the action
1199

1200
        :param account: the account under which the object resides
1201

1202
        :param container: the container under which the object resides
1203

1204
        :param name: the object name
1205

1206
        :param size: the object size
1207

1208
        :param type: the object mimetype
1209

1210
        :param mapfile: the mapfile pointing to the object data
1211

1212
        :param checkcum: the md5 checksum (optional)
1213

1214
        :param domain: the object domain
1215

1216
        :param meta: a dict with custom object metadata
1217

1218
        :param replace_meta: replace existing metadata or not
1219

1220
        :param permissions: a dict with the read and write object permissions
1221

1222
        :returns: the new object uuid
1223

1224
        :raises: ItemNotExists, NotAllowedError, QuotaError
1225
        """
1226

    
1227
        meta = meta or {}
1228
        try:
1229
            self.lock_container_path = True
1230
            self.put_container(user, account, container, policy=None)
1231
        except ContainerExists:
1232
            pass
1233
        finally:
1234
            self.lock_container_path = False
1235
        dest_version_id = self._update_object_hash(
1236
            user, account, container, name, size, type, mapfile, checksum,
1237
            domain, meta, replace_meta, permissions, available=False)
1238
        return self.node.version_get_properties(dest_version_id,
1239
                                                keys=('uuid',))[0]
1240

    
1241
    @debug_method
1242
    def update_object_hashmap(self, user, account, container, name, size, type,
1243
                              hashmap, checksum, domain, meta=None,
1244
                              replace_meta=False, permissions=None):
1245
        """Create/update an object's hashmap and return the new version."""
1246

    
1247
        for h in hashmap:
1248
            if h.startswith('archip_'):
1249
                raise IllegalOperationError(
1250
                    'Cannot update Archipelago Volume hashmap.')
1251
        meta = meta or {}
1252
        if size == 0:  # No such thing as an empty hashmap.
1253
            hashmap = [self.put_block('')]
1254
        map = HashMap(self.block_size, self.hash_algorithm)
1255
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1256
        missing = self.store.block_search(map)
1257
        if missing:
1258
            ie = IndexError()
1259
            ie.data = [binascii.hexlify(x) for x in missing]
1260
            raise ie
1261

    
1262
        hash = map.hash()
1263
        hexlified = binascii.hexlify(hash)
1264
        # _update_object_hash() locks destination path
1265
        dest_version_id = self._update_object_hash(
1266
            user, account, container, name, size, type, hexlified, checksum,
1267
            domain, meta, replace_meta, permissions)
1268
        self.store.map_put(hash, map)
1269
        return dest_version_id, hexlified
1270

    
1271
    @debug_method
1272
    @backend_method
1273
    def update_object_checksum(self, user, account, container, name, version,
1274
                               checksum):
1275
        """Update an object's checksum."""
1276

    
1277
        # Update objects with greater version and same hashmap
1278
        # and size (fix metadata updates).
1279
        self._can_write(user, account, container, name)
1280
        path, node = self._lookup_object(account, container, name,
1281
                                         lock_container=True)
1282
        props = self._get_version(node, version)
1283
        versions = self.node.node_get_versions(node)
1284
        for x in versions:
1285
            if (x[self.SERIAL] >= int(version) and
1286
                x[self.HASH] == props[self.HASH] and
1287
                    x[self.SIZE] == props[self.SIZE]):
1288
                self.node.version_put_property(
1289
                    x[self.SERIAL], 'checksum', checksum)
1290

    
1291
    def _copy_object(self, user, src_account, src_container, src_name,
1292
                     dest_account, dest_container, dest_name, type,
1293
                     dest_domain=None, dest_meta=None, replace_meta=False,
1294
                     permissions=None, src_version=None, is_move=False,
1295
                     delimiter=None):
1296

    
1297
        report_size_change = not is_move
1298
        dest_meta = dest_meta or {}
1299
        dest_version_ids = []
1300
        self._can_read(user, src_account, src_container, src_name)
1301

    
1302
        src_container_path = '/'.join((src_account, src_container))
1303
        dest_container_path = '/'.join((dest_account, dest_container))
1304
        # Lock container paths in alphabetical order
1305
        if src_container_path < dest_container_path:
1306
            self._lookup_container(src_account, src_container)
1307
            self._lookup_container(dest_account, dest_container)
1308
        else:
1309
            self._lookup_container(dest_account, dest_container)
1310
            self._lookup_container(src_account, src_container)
1311

    
1312
        path, node = self._lookup_object(src_account, src_container, src_name)
1313
        # TODO: Will do another fetch of the properties in duplicate version...
1314
        props = self._get_version(
1315
            node, src_version)  # Check to see if source exists.
1316
        src_version_id = props[self.SERIAL]
1317
        hash = props[self.HASH]
1318
        size = props[self.SIZE]
1319
        is_copy = not is_move and (src_account, src_container, src_name) != (
1320
            dest_account, dest_container, dest_name)  # New uuid.
1321
        dest_version_ids.append(self._update_object_hash(
1322
            user, dest_account, dest_container, dest_name, size, type, hash,
1323
            None, dest_domain, dest_meta, replace_meta, permissions,
1324
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1325
            report_size_change=report_size_change))
1326
        if is_move and ((src_account, src_container, src_name) !=
1327
                        (dest_account, dest_container, dest_name)):
1328
            self._delete_object(user, src_account, src_container, src_name,
1329
                                report_size_change=report_size_change)
1330

    
1331
        if delimiter:
1332
            prefix = (src_name + delimiter if not
1333
                      src_name.endswith(delimiter) else src_name)
1334
            src_names = self._list_objects_no_limit(
1335
                user, src_account, src_container, prefix, delimiter=None,
1336
                virtual=False, domain=None, keys=[], shared=False, until=None,
1337
                size_range=None, all_props=True, public=False)
1338
            src_names.sort(key=lambda x: x[2])  # order by nodes
1339
            paths = [elem[0] for elem in src_names]
1340
            nodes = [elem[2] for elem in src_names]
1341
            # TODO: Will do another fetch of the properties
1342
            # in duplicate version...
1343
            props = self._get_versions(nodes)  # Check to see if source exists.
1344

    
1345
            for prop, path, node in zip(props, paths, nodes):
1346
                src_version_id = prop[self.SERIAL]
1347
                hash = prop[self.HASH]
1348
                vtype = prop[self.TYPE]
1349
                size = prop[self.SIZE]
1350
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1351
                    delimiter) else dest_name
1352
                vdest_name = path.replace(prefix, dest_prefix, 1)
1353
                # _update_object_hash() locks destination path
1354
                dest_version_ids.append(self._update_object_hash(
1355
                    user, dest_account, dest_container, vdest_name, size,
1356
                    vtype, hash, None, dest_domain, meta={},
1357
                    replace_meta=False, permissions=None, src_node=node,
1358
                    src_version_id=src_version_id, is_copy=is_copy,
1359
                    report_size_change=report_size_change))
1360
                if is_move and ((src_account, src_container, src_name) !=
1361
                                (dest_account, dest_container, dest_name)):
1362
                    self._delete_object(user, src_account, src_container, path,
1363
                                        report_size_change=report_size_change)
1364
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1365
                dest_version_ids)
1366

    
1367
    @debug_method
1368
    @backend_method
1369
    def copy_object(self, user, src_account, src_container, src_name,
1370
                    dest_account, dest_container, dest_name, type, domain,
1371
                    meta=None, replace_meta=False, permissions=None,
1372
                    src_version=None, delimiter=None):
1373
        """Copy an object's data and metadata."""
1374

    
1375
        meta = meta or {}
1376
        dest_version_id = self._copy_object(
1377
            user, src_account, src_container, src_name, dest_account,
1378
            dest_container, dest_name, type, domain, meta, replace_meta,
1379
            permissions, src_version, False, delimiter)
1380
        return dest_version_id
1381

    
1382
    @debug_method
1383
    @backend_method
1384
    def move_object(self, user, src_account, src_container, src_name,
1385
                    dest_account, dest_container, dest_name, type, domain,
1386
                    meta=None, replace_meta=False, permissions=None,
1387
                    delimiter=None):
1388
        """Move an object's data and metadata."""
1389

    
1390
        meta = meta or {}
1391
        if user != src_account:
1392
            raise NotAllowedError
1393
        dest_version_id = self._move_object(
1394
            user, src_account, src_container, src_name, dest_account,
1395
            dest_container, dest_name, type, domain, meta, replace_meta,
1396
            permissions, None, delimiter=delimiter)
1397
        return dest_version_id
1398

    
1399
    def _delete_object(self, user, account, container, name, until=None,
1400
                       delimiter=None, report_size_change=True):
1401
        if user != account:
1402
            raise NotAllowedError
1403

    
1404
        # lookup object and lock container path also
1405
        path, node = self._lookup_object(account, container, name,
1406
                                         lock_container=True)
1407

    
1408
        if until is not None:
1409
            if node is None:
1410
                return
1411
            hashes = []
1412
            size = 0
1413
            serials = []
1414
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1415
                                           update_statistics_ancestors_depth=1)
1416
            hashes += h
1417
            size += s
1418
            serials += v
1419
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1420
                                           update_statistics_ancestors_depth=1)
1421
            hashes += h
1422
            if not self.free_versioning:
1423
                size += s
1424
            serials += v
1425
            for h in hashes:
1426
                self.store.map_delete(h)
1427
            self.node.node_purge(node, until, CLUSTER_DELETED,
1428
                                 update_statistics_ancestors_depth=1)
1429
            try:
1430
                self._get_version(node)
1431
            except NameError:
1432
                self.permissions.access_clear(path)
1433
            self._report_size_change(
1434
                user, account, -size, {
1435
                    'action': 'object purge',
1436
                    'path': path,
1437
                    'versions': ','.join(str(i) for i in serials)
1438
                }
1439
            )
1440
            return
1441

    
1442
        if not self._exists(node):
1443
            raise ItemNotExists('Object is deleted.')
1444

    
1445
        src_version_id, dest_version_id = self._put_version_duplicate(
1446
            user, node, size=0, type='', hash=None, checksum='',
1447
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1448
        del_size = self._apply_versioning(account, container, src_version_id,
1449
                                          update_statistics_ancestors_depth=1)
1450
        if report_size_change:
1451
            self._report_size_change(
1452
                user, account, -del_size,
1453
                {'action': 'object delete',
1454
                 'path': path,
1455
                 'versions': ','.join([str(dest_version_id)])})
1456
        self._report_object_change(
1457
            user, account, path, details={'action': 'object delete'})
1458
        self.permissions.access_clear(path)
1459

    
1460
        if delimiter:
1461
            prefix = name + delimiter if not name.endswith(delimiter) else name
1462
            src_names = self._list_objects_no_limit(
1463
                user, account, container, prefix, delimiter=None,
1464
                virtual=False, domain=None, keys=[], shared=False, until=None,
1465
                size_range=None, all_props=True, public=False)
1466
            paths = []
1467
            for t in src_names:
1468
                path = '/'.join((account, container, t[0]))
1469
                node = t[2]
1470
                if not self._exists(node):
1471
                    continue
1472
                src_version_id, dest_version_id = self._put_version_duplicate(
1473
                    user, node, size=0, type='', hash=None, checksum='',
1474
                    cluster=CLUSTER_DELETED,
1475
                    update_statistics_ancestors_depth=1)
1476
                del_size = self._apply_versioning(
1477
                    account, container, src_version_id,
1478
                    update_statistics_ancestors_depth=1)
1479
                if report_size_change:
1480
                    self._report_size_change(
1481
                        user, account, -del_size,
1482
                        {'action': 'object delete',
1483
                         'path': path,
1484
                         'versions': ','.join([str(dest_version_id)])})
1485
                self._report_object_change(
1486
                    user, account, path, details={'action': 'object delete'})
1487
                paths.append(path)
1488
            self.permissions.access_clear_bulk(paths)
1489

    
1490
    @debug_method
1491
    @backend_method
1492
    def delete_object(self, user, account, container, name, until=None,
1493
                      prefix='', delimiter=None):
1494
        """Delete/purge an object."""
1495

    
1496
        self._delete_object(user, account, container, name, until, delimiter)
1497

    
1498
    @debug_method
1499
    @backend_method
1500
    def list_versions(self, user, account, container, name):
1501
        """Return a list of all object (version, version_timestamp) tuples."""
1502

    
1503
        self._can_read(user, account, container, name)
1504
        path, node = self._lookup_object(account, container, name)
1505
        versions = self.node.node_get_versions(node)
1506
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1507
                x[self.CLUSTER] != CLUSTER_DELETED]
1508

    
1509
    @debug_method
1510
    @backend_method
1511
    def get_uuid(self, user, uuid, check_permissions=True):
1512
        """Return the (account, container, name) for the UUID given."""
1513

    
1514
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1515
        if info is None:
1516
            raise NameError
1517
        path, serial = info
1518
        account, container, name = path.split('/', 2)
1519
        if check_permissions:
1520
            self._can_read(user, account, container, name)
1521
        return (account, container, name)
1522

    
1523
    @debug_method
1524
    @backend_method
1525
    def get_public(self, user, public):
1526
        """Return the (account, container, name) for the public id given."""
1527

    
1528
        path = self.permissions.public_path(public)
1529
        if path is None:
1530
            raise NameError
1531
        account, container, name = path.split('/', 2)
1532
        self._can_read(user, account, container, name)
1533
        return (account, container, name)
1534

    
1535
    def get_block(self, hash):
1536
        """Return a block's data."""
1537

    
1538
        logger.debug("get_block: %s", hash)
1539
        if hash.startswith('archip_'):
1540
            block = self.store.block_get_archipelago(hash)
1541
        else:
1542
            block = self.store.block_get(self._unhexlify_hash(hash))
1543
        if not block:
1544
            raise ItemNotExists('Block does not exist')
1545
        return block
1546

    
1547
    def put_block(self, data):
1548
        """Store a block and return the hash."""
1549

    
1550
        logger.debug("put_block: %s", len(data))
1551
        return binascii.hexlify(self.store.block_put(data))
1552

    
1553
    def update_block(self, hash, data, offset=0):
1554
        """Update a known block and return the hash."""
1555

    
1556
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1557
        if hash.startswith('archip_'):
1558
            raise IllegalOperationError(
1559
                'Cannot update an Archipelago Volume block.')
1560
        if offset == 0 and len(data) == self.block_size:
1561
            return self.put_block(data)
1562
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1563
        return binascii.hexlify(h)
1564

    
1565
    # Path functions.
1566

    
1567
    def _generate_uuid(self):
1568
        return str(uuidlib.uuid4())
1569

    
1570
    def _put_object_node(self, path, parent, name):
1571
        path = '/'.join((path, name))
1572
        node = self.node.node_lookup(path)
1573
        if node is None:
1574
            node = self.node.node_create(parent, path)
1575
        return path, node
1576

    
1577
    def _put_path(self, user, parent, path,
1578
                  update_statistics_ancestors_depth=None):
1579
        node = self.node.node_create(parent, path)
1580
        self.node.version_create(node, None, 0, '', None, user,
1581
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1582
                                 update_statistics_ancestors_depth)
1583
        return node
1584

    
1585
    def _lookup_account(self, account, create=True):
1586
        node = self.node.node_lookup(account)
1587
        if node is None and create:
1588
            node = self._put_path(
1589
                account, self.ROOTNODE, account,
1590
                update_statistics_ancestors_depth=-1)  # User is account.
1591
        return account, node
1592

    
1593
    def _lookup_container(self, account, container):
1594
        for_update = True if self.lock_container_path else False
1595
        path = '/'.join((account, container))
1596
        node = self.node.node_lookup(path, for_update)
1597
        if node is None:
1598
            raise ItemNotExists('Container does not exist')
1599
        return path, node
1600

    
1601
    def _lookup_object(self, account, container, name, lock_container=False):
1602
        if lock_container:
1603
            self._lookup_container(account, container)
1604

    
1605
        path = '/'.join((account, container, name))
1606
        node = self.node.node_lookup(path)
1607
        if node is None:
1608
            raise ItemNotExists('Object does not exist')
1609
        return path, node
1610

    
1611
    def _lookup_objects(self, paths):
1612
        nodes = self.node.node_lookup_bulk(paths)
1613
        return paths, nodes
1614

    
1615
    def _get_properties(self, node, until=None):
1616
        """Return properties until the timestamp given."""
1617

    
1618
        before = until if until is not None else inf
1619
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1620
        if props is None and until is not None:
1621
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1622
        if props is None:
1623
            raise ItemNotExists('Path does not exist')
1624
        return props
1625

    
1626
    def _get_statistics(self, node, until=None, compute=False):
1627
        """Return (count, sum of size, timestamp) of everything under node."""
1628

    
1629
        if until is not None:
1630
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1631
        elif compute:
1632
            stats = self.node.statistics_latest(node,
1633
                                                except_cluster=CLUSTER_DELETED)
1634
        else:
1635
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1636
        if stats is None:
1637
            stats = (0, 0, 0)
1638
        return stats
1639

    
1640
    def _get_version(self, node, version=None):
1641
        if version is None:
1642
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1643
            if props is None:
1644
                raise ItemNotExists('Object does not exist')
1645
        else:
1646
            try:
1647
                version = int(version)
1648
            except ValueError:
1649
                raise VersionNotExists('Version does not exist')
1650
            props = self.node.version_get_properties(version, node=node)
1651
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1652
                raise VersionNotExists('Version does not exist')
1653
        return props
1654

    
1655
    def _get_versions(self, nodes):
1656
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1657

    
1658
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1659
                               type=None, hash=None, checksum=None,
1660
                               cluster=CLUSTER_NORMAL, is_copy=False,
1661
                               update_statistics_ancestors_depth=None,
1662
                               available=True, keep_available=True):
1663
        """Create a new version of the node."""
1664

    
1665
        props = self.node.version_lookup(
1666
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1667
        if props is not None:
1668
            src_version_id = props[self.SERIAL]
1669
            src_hash = props[self.HASH]
1670
            src_size = props[self.SIZE]
1671
            src_type = props[self.TYPE]
1672
            src_checksum = props[self.CHECKSUM]
1673
            if keep_available:
1674
                src_available = props[self.AVAILABLE]
1675
                src_map_check_timestamp = props[self.MAP_CHECK_TIMESTAMP]
1676
            else:
1677
                src_available = available
1678
                src_map_check_timestamp = None
1679
        else:
1680
            src_version_id = None
1681
            src_hash = None
1682
            src_size = 0
1683
            src_type = ''
1684
            src_checksum = ''
1685
            src_available = available
1686
            src_map_check_timestamp = None
1687
        if size is None:  # Set metadata.
1688
            hash = src_hash  # This way hash can be set to None
1689
                             # (account or container).
1690
            size = src_size
1691
        if type is None:
1692
            type = src_type
1693
        if checksum is None:
1694
            checksum = src_checksum
1695
        uuid = self._generate_uuid(
1696
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1697

    
1698
        if src_node is None:
1699
            pre_version_id = src_version_id
1700
        else:
1701
            pre_version_id = None
1702
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1703
            if props is not None:
1704
                pre_version_id = props[self.SERIAL]
1705
        if pre_version_id is not None:
1706
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1707
                                        update_statistics_ancestors_depth)
1708

    
1709
        dest_version_id, mtime = self.node.version_create(
1710
            node, hash, size, type, src_version_id, user, uuid, checksum,
1711
            cluster, update_statistics_ancestors_depth,
1712
            available=src_available,
1713
            map_check_timestamp=src_map_check_timestamp)
1714

    
1715
        self.node.attribute_unset_is_latest(node, dest_version_id)
1716

    
1717
        return pre_version_id, dest_version_id
1718

    
1719
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1720
                                node, meta, replace=False):
1721
        if src_version_id is not None:
1722
            self.node.attribute_copy(src_version_id, dest_version_id)
1723
        if not replace:
1724
            self.node.attribute_del(dest_version_id, domain, (
1725
                k for k, v in meta.iteritems() if v == ''))
1726
            self.node.attribute_set(dest_version_id, domain, node, (
1727
                (k, v) for k, v in meta.iteritems() if v != ''))
1728
        else:
1729
            self.node.attribute_del(dest_version_id, domain)
1730
            self.node.attribute_set(dest_version_id, domain, node, ((
1731
                k, v) for k, v in meta.iteritems()))
1732

    
1733
    def _put_metadata(self, user, node, domain, meta, replace=False,
1734
                      update_statistics_ancestors_depth=None):
1735
        """Create a new version and store metadata."""
1736

    
1737
        src_version_id, dest_version_id = self._put_version_duplicate(
1738
            user, node,
1739
            update_statistics_ancestors_depth=
1740
            update_statistics_ancestors_depth)
1741
        self._put_metadata_duplicate(
1742
            src_version_id, dest_version_id, domain, node, meta, replace)
1743
        return src_version_id, dest_version_id
1744

    
1745
    def _list_limits(self, listing, marker, limit):
1746
        start = 0
1747
        if marker:
1748
            try:
1749
                start = listing.index(marker) + 1
1750
            except ValueError:
1751
                pass
1752
        if not limit or limit > 10000:
1753
            limit = 10000
1754
        return start, limit
1755

    
1756
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1757
                                marker=None, limit=10000, virtual=True,
1758
                                domain=None, keys=None, until=None,
1759
                                size_range=None, allowed=None,
1760
                                all_props=False):
1761
        keys = keys or []
1762
        allowed = allowed or []
1763
        cont_prefix = path + '/'
1764
        prefix = cont_prefix + prefix
1765
        start = cont_prefix + marker if marker else None
1766
        before = until if until is not None else inf
1767
        filterq = keys if domain else []
1768
        sizeq = size_range
1769

    
1770
        objects, prefixes = self.node.latest_version_list(
1771
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1772
            allowed, domain, filterq, sizeq, all_props)
1773
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1774
        objects.sort(key=lambda x: x[0])
1775
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1776
        return objects
1777

    
1778
    # Reporting functions.
1779

    
1780
    @debug_method
1781
    @backend_method
1782
    def _report_size_change(self, user, account, size, details=None):
1783
        details = details or {}
1784

    
1785
        if size == 0:
1786
            return
1787

    
1788
        account_node = self._lookup_account(account, True)[1]
1789
        total = self._get_statistics(account_node, compute=True)[1]
1790
        details.update({'user': user, 'total': total})
1791
        self.messages.append(
1792
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1793
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1794

    
1795
        if not self.using_external_quotaholder:
1796
            return
1797

    
1798
        try:
1799
            name = details['path'] if 'path' in details else ''
1800
            serial = self.astakosclient.issue_one_commission(
1801
                holder=account,
1802
                source=DEFAULT_SOURCE,
1803
                provisions={'pithos.diskspace': size},
1804
                name=name)
1805
        except BaseException, e:
1806
            raise QuotaError(e)
1807
        else:
1808
            self.serials.append(serial)
1809

    
1810
    @debug_method
1811
    @backend_method
1812
    def _report_object_change(self, user, account, path, details=None):
1813
        details = details or {}
1814
        details.update({'user': user})
1815
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1816
                              account, QUEUE_INSTANCE_ID, 'object', path,
1817
                              details))
1818

    
1819
    @debug_method
1820
    @backend_method
1821
    def _report_sharing_change(self, user, account, path, details=None):
1822
        details = details or {}
1823
        details.update({'user': user})
1824
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1825
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1826
                              details))
1827

    
1828
    # Policy functions.
1829

    
1830
    def _check_policy(self, policy, is_account_policy=True):
1831
        default_policy = self.default_account_policy \
1832
            if is_account_policy else self.default_container_policy
1833
        for k in policy.keys():
1834
            if policy[k] == '':
1835
                policy[k] = default_policy.get(k)
1836
        for k, v in policy.iteritems():
1837
            if k == 'quota':
1838
                q = int(v)  # May raise ValueError.
1839
                if q < 0:
1840
                    raise ValueError
1841
            elif k == 'versioning':
1842
                if v not in ['auto', 'none']:
1843
                    raise ValueError
1844
            else:
1845
                raise ValueError
1846

    
1847
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1848
        default_policy = self.default_account_policy \
1849
            if is_account_policy else self.default_container_policy
1850
        if replace:
1851
            for k, v in default_policy.iteritems():
1852
                if k not in policy:
1853
                    policy[k] = v
1854
        self.node.policy_set(node, policy)
1855

    
1856
    def _get_policy(self, node, is_account_policy=True):
1857
        default_policy = self.default_account_policy \
1858
            if is_account_policy else self.default_container_policy
1859
        policy = default_policy.copy()
1860
        policy.update(self.node.policy_get(node))
1861
        return policy
1862

    
1863
    def _apply_versioning(self, account, container, version_id,
1864
                          update_statistics_ancestors_depth=None):
1865
        """Delete the provided version if such is the policy.
1866
           Return size of object removed.
1867
        """
1868

    
1869
        if version_id is None:
1870
            return 0
1871
        path, node = self._lookup_container(account, container)
1872
        versioning = self._get_policy(
1873
            node, is_account_policy=False)['versioning']
1874
        if versioning != 'auto':
1875
            hash, size = self.node.version_remove(
1876
                version_id, update_statistics_ancestors_depth)
1877
            self.store.map_delete(hash)
1878
            return size
1879
        elif self.free_versioning:
1880
            return self.node.version_get_properties(
1881
                version_id, keys=('size',))[0]
1882
        return 0
1883

    
1884
    # Access control functions.
1885

    
1886
    def _check_groups(self, groups):
1887
        # raise ValueError('Bad characters in groups')
1888
        pass
1889

    
1890
    def _check_permissions(self, path, permissions):
1891
        # raise ValueError('Bad characters in permissions')
1892
        pass
1893

    
1894
    def _get_formatted_paths(self, paths):
1895
        formatted = []
1896
        if len(paths) == 0:
1897
            return formatted
1898
        props = self.node.get_props(paths)
1899
        if props:
1900
            for prop in props:
1901
                if prop[1].split(';', 1)[0].strip() in (
1902
                        'application/directory', 'application/folder'):
1903
                    formatted.append((prop[0].rstrip('/') + '/',
1904
                                      self.MATCH_PREFIX))
1905
                formatted.append((prop[0], self.MATCH_EXACT))
1906
        return formatted
1907

    
1908
    def _get_permissions_path(self, account, container, name):
1909
        path = '/'.join((account, container, name))
1910
        permission_paths = self.permissions.access_inherit(path)
1911
        permission_paths.sort()
1912
        permission_paths.reverse()
1913
        for p in permission_paths:
1914
            if p == path:
1915
                return p
1916
            else:
1917
                if p.count('/') < 2:
1918
                    continue
1919
                node = self.node.node_lookup(p)
1920
                props = None
1921
                if node is not None:
1922
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1923
                if props is not None:
1924
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1925
                            'application/directory', 'application/folder'):
1926
                        return p
1927
        return None
1928

    
1929
    def _get_permissions_path_bulk(self, account, container, names):
1930
        formatted_paths = []
1931
        for name in names:
1932
            path = '/'.join((account, container, name))
1933
            formatted_paths.append(path)
1934
        permission_paths = self.permissions.access_inherit_bulk(
1935
            formatted_paths)
1936
        permission_paths.sort()
1937
        permission_paths.reverse()
1938
        permission_paths_list = []
1939
        lookup_list = []
1940
        for p in permission_paths:
1941
            if p in formatted_paths:
1942
                permission_paths_list.append(p)
1943
            else:
1944
                if p.count('/') < 2:
1945
                    continue
1946
                lookup_list.append(p)
1947

    
1948
        if len(lookup_list) > 0:
1949
            props = self.node.get_props(lookup_list)
1950
            if props:
1951
                for prop in props:
1952
                    if prop[1].split(';', 1)[0].strip() in (
1953
                            'application/directory', 'application/folder'):
1954
                        permission_paths_list.append(prop[0])
1955

    
1956
        if len(permission_paths_list) > 0:
1957
            return permission_paths_list
1958

    
1959
        return None
1960

    
1961
    def _can_read(self, user, account, container, name):
1962
        if user == account:
1963
            return True
1964
        path = '/'.join((account, container, name))
1965
        if self.permissions.public_get(path) is not None:
1966
            return True
1967
        path = self._get_permissions_path(account, container, name)
1968
        if not path:
1969
            raise NotAllowedError
1970
        if (not self.permissions.access_check(path, self.READ, user) and not
1971
                self.permissions.access_check(path, self.WRITE, user)):
1972
            raise NotAllowedError
1973

    
1974
    def _can_write(self, user, account, container, name):
1975
        if user == account:
1976
            return True
1977
        path = '/'.join((account, container, name))
1978
        path = self._get_permissions_path(account, container, name)
1979
        if not path:
1980
            raise NotAllowedError
1981
        if not self.permissions.access_check(path, self.WRITE, user):
1982
            raise NotAllowedError
1983

    
1984
    def _allowed_accounts(self, user):
1985
        allow = set()
1986
        for path in self.permissions.access_list_paths(user):
1987
            allow.add(path.split('/', 1)[0])
1988
        return sorted(allow)
1989

    
1990
    def _allowed_containers(self, user, account):
1991
        allow = set()
1992
        for path in self.permissions.access_list_paths(user, account):
1993
            allow.add(path.split('/', 2)[1])
1994
        return sorted(allow)
1995

    
1996
    # Domain functions
1997

    
1998
    @debug_method
1999
    @backend_method
2000
    def get_domain_objects(self, domain, user=None):
2001
        allowed_paths = self.permissions.access_list_paths(
2002
            user, include_owned=user is not None, include_containers=False)
2003
        if not allowed_paths:
2004
            return []
2005
        obj_list = self.node.domain_object_list(
2006
            domain, allowed_paths, CLUSTER_NORMAL)
2007
        return [(path,
2008
                 self._build_metadata(props, user_defined_meta),
2009
                 self.permissions.access_get(path)) for
2010
                path, props, user_defined_meta in obj_list]
2011

    
2012
    # util functions
2013

    
2014
    def _build_metadata(self, props, user_defined=None,
2015
                        include_user_defined=True):
2016
        meta = {'bytes': props[self.SIZE],
2017
                'type': props[self.TYPE],
2018
                'hash': props[self.HASH],
2019
                'version': props[self.SERIAL],
2020
                'version_timestamp': props[self.MTIME],
2021
                'modified_by': props[self.MUSER],
2022
                'uuid': props[self.UUID],
2023
                'checksum': props[self.CHECKSUM]}
2024
        if include_user_defined and user_defined is not None:
2025
            meta.update(user_defined)
2026
        return meta
2027

    
2028
    def _exists(self, node):
2029
        try:
2030
            self._get_version(node)
2031
        except ItemNotExists:
2032
            return False
2033
        else:
2034
            return True
2035

    
2036
    def _unhexlify_hash(self, hash):
2037
        try:
2038
            return binascii.unhexlify(hash)
2039
        except TypeError:
2040
            raise InvalidHash(hash)