Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 3759eddb

History | View | Annotate | Download (77.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
from pithos.workers import glue
44
from archipelago.common import Segment, Xseg_ctx
45
from objpool import ObjectPool
46

    
47

    
48
try:
49
    from astakosclient import AstakosClient
50
except ImportError:
51
    AstakosClient = None
52

    
53
from pithos.backends.base import (
54
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
55
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
56
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
57
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
58
    InvalidHash, IllegalOperationError)
59

    
60

    
61
class DisabledAstakosClient(object):
62
    def __init__(self, *args, **kwargs):
63
        self.args = args
64
        self.kwargs = kwargs
65

    
66
    def __getattr__(self, name):
67
        m = ("AstakosClient has been disabled, "
68
             "yet an attempt to access it was made")
69
        raise AssertionError(m)
70

    
71

    
72
# Stripped-down version of the HashMap class found in tools.
73

    
74
class HashMap(list):
75

    
76
    def __init__(self, blocksize, blockhash):
77
        super(HashMap, self).__init__()
78
        self.blocksize = blocksize
79
        self.blockhash = blockhash
80

    
81
    def _hash_raw(self, v):
82
        h = hashlib.new(self.blockhash)
83
        h.update(v)
84
        return h.digest()
85

    
86
    def hash(self):
87
        if len(self) == 0:
88
            return self._hash_raw('')
89
        if len(self) == 1:
90
            return self.__getitem__(0)
91

    
92
        h = list(self)
93
        s = 2
94
        while s < len(h):
95
            s = s * 2
96
        h += [('\x00' * len(h[0]))] * (s - len(h))
97
        while len(h) > 1:
98
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
99
        return h[0]
100

    
101
# Default modules and settings.
102
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
103
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
104
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
105
DEFAULT_BLOCK_PATH = 'data/'
106
DEFAULT_BLOCK_UMASK = 0o022
107
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
108
DEFAULT_HASH_ALGORITHM = 'sha256'
109
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
110
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
111
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
112
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
113
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
114
                               'abcdefghijklmnopqrstuvwxyz'
115
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
116
DEFAULT_PUBLIC_URL_SECURITY = 16
117
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
118

    
119
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
120
QUEUE_CLIENT_ID = 'pithos'
121
QUEUE_INSTANCE_ID = '1'
122

    
123
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
124

    
125
inf = float('inf')
126

    
127
ULTIMATE_ANSWER = 42
128

    
129
DEFAULT_SOURCE = 'system'
130
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
131

    
132
logger = logging.getLogger(__name__)
133

    
134

    
135
def backend_method(func):
136
    @wraps(func)
137
    def wrapper(self, *args, **kw):
138
        # if we are inside a database transaction
139
        # just proceed with the method execution
140
        # otherwise manage a new transaction
141
        if self.in_transaction:
142
            return func(self, *args, **kw)
143

    
144
        try:
145
            self.pre_exec()
146
            result = func(self, *args, **kw)
147
            success_status = True
148
            return result
149
        except:
150
            success_status = False
151
            raise
152
        finally:
153
            self.post_exec(success_status)
154
    return wrapper
155

    
156

    
157
def debug_method(func):
158
    @wraps(func)
159
    def wrapper(self, *args, **kw):
160
        try:
161
            result = func(self, *args, **kw)
162
            return result
163
        except:
164
            result = format_exc()
165
            raise
166
        finally:
167
            all_args = map(repr, args)
168
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
169
            logger.debug(">>> %s(%s) <<< %s" % (
170
                func.__name__, ', '.join(all_args).rstrip(', '), result))
171
    return wrapper
172

    
173

    
174
def list_method(func):
175
    @wraps(func)
176
    def wrapper(self, *args, **kw):
177
        marker = kw.get('marker')
178
        limit = kw.get('limit')
179
        result = func(self, *args, **kw)
180
        start, limit = self._list_limits(result, marker, limit)
181
        return result[start:start + limit]
182
    return wrapper
183

    
184

    
185
class ModularBackend(BaseBackend):
186
    """A modular backend.
187

188
    Uses modules for SQL functions and storage.
189
    """
190

    
191
    def __init__(self, db_module=None, db_connection=None,
192
                 block_module=None, block_path=None, block_umask=None,
193
                 block_size=None, hash_algorithm=None,
194
                 queue_module=None, queue_hosts=None, queue_exchange=None,
195
                 astakos_auth_url=None, service_token=None,
196
                 astakosclient_poolsize=None,
197
                 free_versioning=True, block_params=None,
198
                 public_url_security=None,
199
                 public_url_alphabet=None,
200
                 account_quota_policy=None,
201
                 container_quota_policy=None,
202
                 container_versioning_policy=None,
203
                 archipelago_conf_file=None,
204
                 xseg_pool_size=8):
205
        db_module = db_module or DEFAULT_DB_MODULE
206
        db_connection = db_connection or DEFAULT_DB_CONNECTION
207
        block_module = block_module or DEFAULT_BLOCK_MODULE
208
        block_path = block_path or DEFAULT_BLOCK_PATH
209
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
210
        block_params = block_params or DEFAULT_BLOCK_PARAMS
211
        block_size = block_size or DEFAULT_BLOCK_SIZE
212
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
213
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
214
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
215
        container_quota_policy = container_quota_policy \
216
            or DEFAULT_CONTAINER_QUOTA
217
        container_versioning_policy = container_versioning_policy \
218
            or DEFAULT_CONTAINER_VERSIONING
219
        archipelago_conf_file = archipelago_conf_file \
220
            or DEFAULT_ARCHIPELAGO_CONF_FILE
221

    
222
        self.default_account_policy = {'quota': account_quota_policy}
223
        self.default_container_policy = {
224
            'quota': container_quota_policy,
225
            'versioning': container_versioning_policy
226
        }
227
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
228
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
229

    
230
        self.public_url_security = (public_url_security or
231
                                    DEFAULT_PUBLIC_URL_SECURITY)
232
        self.public_url_alphabet = (public_url_alphabet or
233
                                    DEFAULT_PUBLIC_URL_ALPHABET)
234

    
235
        self.hash_algorithm = hash_algorithm
236
        self.block_size = block_size
237
        self.free_versioning = free_versioning
238

    
239
        def load_module(m):
240
            __import__(m)
241
            return sys.modules[m]
242

    
243
        self.db_module = load_module(db_module)
244
        self.wrapper = self.db_module.DBWrapper(db_connection)
245
        params = {'wrapper': self.wrapper}
246
        self.permissions = self.db_module.Permissions(**params)
247
        self.config = self.db_module.Config(**params)
248
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
249
        for x in ['READ', 'WRITE']:
250
            setattr(self, x, getattr(self.db_module, x))
251
        self.node = self.db_module.Node(**params)
252
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
253
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
254
                  'MATCH_PREFIX', 'MATCH_EXACT']:
255
            setattr(self, x, getattr(self.db_module, x))
256

    
257
        self.ALLOWED = ['read', 'write']
258

    
259
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
260
                                      cfile=archipelago_conf_file,
261
                                      pool_size=xseg_pool_size)
262
        self.block_module = load_module(block_module)
263
        self.block_params = block_params
264
        params = {'path': block_path,
265
                  'block_size': self.block_size,
266
                  'hash_algorithm': self.hash_algorithm,
267
                  'umask': block_umask}
268
        params.update(self.block_params)
269
        self.store = self.block_module.Store(**params)
270

    
271
        if queue_module and queue_hosts:
272
            self.queue_module = load_module(queue_module)
273
            params = {'hosts': queue_hosts,
274
                      'exchange': queue_exchange,
275
                      'client_id': QUEUE_CLIENT_ID}
276
            self.queue = self.queue_module.Queue(**params)
277
        else:
278
            class NoQueue:
279
                def send(self, *args):
280
                    pass
281

    
282
                def close(self):
283
                    pass
284

    
285
            self.queue = NoQueue()
286

    
287
        self.astakos_auth_url = astakos_auth_url
288
        self.service_token = service_token
289

    
290
        if not astakos_auth_url or not AstakosClient:
291
            self.astakosclient = DisabledAstakosClient(
292
                service_token, astakos_auth_url,
293
                use_pool=True,
294
                pool_size=astakosclient_poolsize)
295
        else:
296
            self.astakosclient = AstakosClient(
297
                service_token, astakos_auth_url,
298
                use_pool=True,
299
                pool_size=astakosclient_poolsize)
300

    
301
        self.serials = []
302
        self.messages = []
303

    
304
        self._move_object = partial(self._copy_object, is_move=True)
305

    
306
        self.lock_container_path = False
307

    
308
        self.in_transaction = False
309

    
310
    def pre_exec(self, lock_container_path=False):
311
        self.lock_container_path = lock_container_path
312
        self.wrapper.execute()
313
        self.serials = []
314
        self.in_transaction = True
315

    
316
    def post_exec(self, success_status=True):
317
        if success_status:
318
            # send messages produced
319
            for m in self.messages:
320
                self.queue.send(*m)
321

    
322
            # register serials
323
            if self.serials:
324
                self.commission_serials.insert_many(
325
                    self.serials)
326

    
327
                # commit to ensure that the serials are registered
328
                # even if resolve commission fails
329
                self.wrapper.commit()
330

    
331
                # start new transaction
332
                self.wrapper.execute()
333

    
334
                r = self.astakosclient.resolve_commissions(
335
                    accept_serials=self.serials,
336
                    reject_serials=[])
337
                self.commission_serials.delete_many(
338
                    r['accepted'])
339

    
340
            self.wrapper.commit()
341
        else:
342
            if self.serials:
343
                r = self.astakosclient.resolve_commissions(
344
                    accept_serials=[],
345
                    reject_serials=self.serials)
346
                self.commission_serials.delete_many(
347
                    r['rejected'])
348
            self.wrapper.rollback()
349
        self.in_transaction = False
350

    
351
    def close(self):
352
        self.wrapper.close()
353
        self.queue.close()
354

    
355
    @property
356
    def using_external_quotaholder(self):
357
        return not isinstance(self.astakosclient, DisabledAstakosClient)
358

    
359
    @debug_method
360
    @backend_method
361
    @list_method
362
    def list_accounts(self, user, marker=None, limit=10000):
363
        """Return a list of accounts the user can access."""
364

    
365
        return self._allowed_accounts(user)
366

    
367
    def _get_account_quotas(self, account):
368
        """Get account usage from astakos."""
369

    
370
        quotas = self.astakosclient.service_get_quotas(account)[account]
371
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
372
                                                  {})
373

    
374
    def _get_account_quotas(self, account):
375
        """Get account usage from astakos."""
376

    
377
        quotas = self.astakosclient.service_get_quotas(account)[account]
378
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
379
                                                  {})
380

    
381
    @debug_method
382
    @backend_method
383
    def get_account_meta(self, user, account, domain, until=None,
384
                         include_user_defined=True):
385
        """Return a dictionary with the account metadata for the domain."""
386

    
387
        path, node = self._lookup_account(account, user == account)
388
        if user != account:
389
            if until or (node is None) or (account not
390
                                           in self._allowed_accounts(user)):
391
                raise NotAllowedError
392
        try:
393
            props = self._get_properties(node, until)
394
            mtime = props[self.MTIME]
395
        except NameError:
396
            props = None
397
            mtime = until
398
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
399
        tstamp = max(tstamp, mtime)
400
        if until is None:
401
            modified = tstamp
402
        else:
403
            modified = self._get_statistics(
404
                node, compute=True)[2]  # Overall last modification.
405
            modified = max(modified, mtime)
406

    
407
        if user != account:
408
            meta = {'name': account}
409
        else:
410
            meta = {}
411
            if props is not None and include_user_defined:
412
                meta.update(
413
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
414
            if until is not None:
415
                meta.update({'until_timestamp': tstamp})
416
            meta.update({'name': account, 'count': count, 'bytes': bytes})
417
            if self.using_external_quotaholder:
418
                external_quota = self._get_account_quotas(account)
419
                meta['bytes'] = external_quota.get('usage', 0)
420
        meta.update({'modified': modified})
421
        return meta
422

    
423
    @debug_method
424
    @backend_method
425
    def update_account_meta(self, user, account, domain, meta, replace=False):
426
        """Update the metadata associated with the account for the domain."""
427

    
428
        if user != account:
429
            raise NotAllowedError
430
        path, node = self._lookup_account(account, True)
431
        self._put_metadata(user, node, domain, meta, replace,
432
                           update_statistics_ancestors_depth=-1)
433

    
434
    @debug_method
435
    @backend_method
436
    def get_account_groups(self, user, account):
437
        """Return a dictionary with the user groups defined for the account."""
438

    
439
        if user != account:
440
            if account not in self._allowed_accounts(user):
441
                raise NotAllowedError
442
            return {}
443
        self._lookup_account(account, True)
444
        return self.permissions.group_dict(account)
445

    
446
    @debug_method
447
    @backend_method
448
    def update_account_groups(self, user, account, groups, replace=False):
449
        """Update the groups associated with the account."""
450

    
451
        if user != account:
452
            raise NotAllowedError
453
        self._lookup_account(account, True)
454
        self._check_groups(groups)
455
        if replace:
456
            self.permissions.group_destroy(account)
457
        for k, v in groups.iteritems():
458
            if not replace:  # If not already deleted.
459
                self.permissions.group_delete(account, k)
460
            if v:
461
                self.permissions.group_addmany(account, k, v)
462

    
463
    @debug_method
464
    @backend_method
465
    def get_account_policy(self, user, account):
466
        """Return a dictionary with the account policy."""
467

    
468
        if user != account:
469
            if account not in self._allowed_accounts(user):
470
                raise NotAllowedError
471
            return {}
472
        path, node = self._lookup_account(account, True)
473
        policy = self._get_policy(node, is_account_policy=True)
474
        if self.using_external_quotaholder:
475
            external_quota = self._get_account_quotas(account)
476
            policy['quota'] = external_quota.get('limit', 0)
477
        return policy
478

    
479
    @debug_method
480
    @backend_method
481
    def update_account_policy(self, user, account, policy, replace=False):
482
        """Update the policy associated with the account."""
483

    
484
        if user != account:
485
            raise NotAllowedError
486
        path, node = self._lookup_account(account, True)
487
        self._check_policy(policy, is_account_policy=True)
488
        self._put_policy(node, policy, replace, is_account_policy=True)
489

    
490
    @debug_method
491
    @backend_method
492
    def put_account(self, user, account, policy=None):
493
        """Create a new account with the given name."""
494

    
495
        policy = policy or {}
496
        if user != account:
497
            raise NotAllowedError
498
        node = self.node.node_lookup(account)
499
        if node is not None:
500
            raise AccountExists('Account already exists')
501
        if policy:
502
            self._check_policy(policy, is_account_policy=True)
503
        node = self._put_path(user, self.ROOTNODE, account,
504
                              update_statistics_ancestors_depth=-1)
505
        self._put_policy(node, policy, True, is_account_policy=True)
506

    
507
    @debug_method
508
    @backend_method
509
    def delete_account(self, user, account):
510
        """Delete the account with the given name."""
511

    
512
        if user != account:
513
            raise NotAllowedError
514
        node = self.node.node_lookup(account)
515
        if node is None:
516
            return
517
        if not self.node.node_remove(node,
518
                                     update_statistics_ancestors_depth=-1):
519
            raise AccountNotEmpty('Account is not empty')
520
        self.permissions.group_destroy(account)
521

    
522
    @debug_method
523
    @backend_method
524
    @list_method
525
    def list_containers(self, user, account, marker=None, limit=10000,
526
                        shared=False, until=None, public=False):
527
        """Return a list of containers existing under an account."""
528

    
529
        if user != account:
530
            if until or account not in self._allowed_accounts(user):
531
                raise NotAllowedError
532
            return self._allowed_containers(user, account)
533
        if shared or public:
534
            allowed = set()
535
            if shared:
536
                allowed.update([x.split('/', 2)[1] for x in
537
                               self.permissions.access_list_shared(account)])
538
            if public:
539
                allowed.update([x[0].split('/', 2)[1] for x in
540
                               self.permissions.public_list(account)])
541
            return sorted(allowed)
542
        node = self.node.node_lookup(account)
543
        return [x[0] for x in self._list_object_properties(
544
            node, account, '', '/', marker, limit, False, None, [], until)]
545

    
546
    @debug_method
547
    @backend_method
548
    def list_container_meta(self, user, account, container, domain,
549
                            until=None):
550
        """Return a list of the container's object meta keys for a domain."""
551

    
552
        allowed = []
553
        if user != account:
554
            if until:
555
                raise NotAllowedError
556
            allowed = self.permissions.access_list_paths(
557
                user, '/'.join((account, container)))
558
            if not allowed:
559
                raise NotAllowedError
560
        path, node = self._lookup_container(account, container)
561
        before = until if until is not None else inf
562
        allowed = self._get_formatted_paths(allowed)
563
        return self.node.latest_attribute_keys(node, domain, before,
564
                                               CLUSTER_DELETED, allowed)
565

    
566
    @debug_method
567
    @backend_method
568
    def get_container_meta(self, user, account, container, domain, until=None,
569
                           include_user_defined=True):
570
        """Return a dictionary with the container metadata for the domain."""
571

    
572
        if user != account:
573
            if until or container not in self._allowed_containers(user,
574
                                                                  account):
575
                raise NotAllowedError
576
        path, node = self._lookup_container(account, container)
577
        props = self._get_properties(node, until)
578
        mtime = props[self.MTIME]
579
        count, bytes, tstamp = self._get_statistics(node, until)
580
        tstamp = max(tstamp, mtime)
581
        if until is None:
582
            modified = tstamp
583
        else:
584
            modified = self._get_statistics(
585
                node)[2]  # Overall last modification.
586
            modified = max(modified, mtime)
587

    
588
        if user != account:
589
            meta = {'name': container}
590
        else:
591
            meta = {}
592
            if include_user_defined:
593
                meta.update(
594
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
595
            if until is not None:
596
                meta.update({'until_timestamp': tstamp})
597
            meta.update({'name': container, 'count': count, 'bytes': bytes})
598
        meta.update({'modified': modified})
599
        return meta
600

    
601
    @debug_method
602
    @backend_method
603
    def update_container_meta(self, user, account, container, domain, meta,
604
                              replace=False):
605
        """Update the metadata associated with the container for the domain."""
606

    
607
        if user != account:
608
            raise NotAllowedError
609
        path, node = self._lookup_container(account, container)
610
        src_version_id, dest_version_id = self._put_metadata(
611
            user, node, domain, meta, replace,
612
            update_statistics_ancestors_depth=0)
613
        if src_version_id is not None:
614
            versioning = self._get_policy(
615
                node, is_account_policy=False)['versioning']
616
            if versioning != 'auto':
617
                self.node.version_remove(src_version_id,
618
                                         update_statistics_ancestors_depth=0)
619

    
620
    @debug_method
621
    @backend_method
622
    def get_container_policy(self, user, account, container):
623
        """Return a dictionary with the container policy."""
624

    
625
        if user != account:
626
            if container not in self._allowed_containers(user, account):
627
                raise NotAllowedError
628
            return {}
629
        path, node = self._lookup_container(account, container)
630
        return self._get_policy(node, is_account_policy=False)
631

    
632
    @debug_method
633
    @backend_method
634
    def update_container_policy(self, user, account, container, policy,
635
                                replace=False):
636
        """Update the policy associated with the container."""
637

    
638
        if user != account:
639
            raise NotAllowedError
640
        path, node = self._lookup_container(account, container)
641
        self._check_policy(policy, is_account_policy=False)
642
        self._put_policy(node, policy, replace, is_account_policy=False)
643

    
644
    @debug_method
645
    @backend_method
646
    def put_container(self, user, account, container, policy=None):
647
        """Create a new container with the given name."""
648

    
649
        policy = policy or {}
650
        if user != account:
651
            raise NotAllowedError
652
        try:
653
            path, node = self._lookup_container(account, container)
654
        except NameError:
655
            pass
656
        else:
657
            raise ContainerExists('Container already exists')
658
        if policy:
659
            self._check_policy(policy, is_account_policy=False)
660
        path = '/'.join((account, container))
661
        node = self._put_path(
662
            user, self._lookup_account(account, True)[1], path,
663
            update_statistics_ancestors_depth=-1)
664
        self._put_policy(node, policy, True, is_account_policy=False)
665

    
666
    @debug_method
667
    @backend_method
668
    def delete_container(self, user, account, container, until=None, prefix='',
669
                         delimiter=None):
670
        """Delete/purge the container with the given name."""
671

    
672
        if user != account:
673
            raise NotAllowedError
674
        path, node = self._lookup_container(account, container)
675

    
676
        if until is not None:
677
            hashes, size, serials = self.node.node_purge_children(
678
                node, until, CLUSTER_HISTORY,
679
                update_statistics_ancestors_depth=0)
680
            for h in hashes:
681
                self.store.map_delete(h)
682
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
683
                                          update_statistics_ancestors_depth=0)
684
            if not self.free_versioning:
685
                self._report_size_change(
686
                    user, account, -size, {
687
                        'action': 'container purge',
688
                        'path': path,
689
                        'versions': ','.join(str(i) for i in serials)
690
                    }
691
                )
692
            return
693

    
694
        if not delimiter:
695
            if self._get_statistics(node)[0] > 0:
696
                raise ContainerNotEmpty('Container is not empty')
697
            hashes, size, serials = self.node.node_purge_children(
698
                node, inf, CLUSTER_HISTORY,
699
                update_statistics_ancestors_depth=0)
700
            for h in hashes:
701
                self.store.map_delete(h)
702
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
703
                                          update_statistics_ancestors_depth=0)
704
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
705
            if not self.free_versioning:
706
                self._report_size_change(
707
                    user, account, -size, {
708
                        'action': 'container purge',
709
                        'path': path,
710
                        'versions': ','.join(str(i) for i in serials)
711
                    }
712
                )
713
        else:
714
            # remove only contents
715
            src_names = self._list_objects_no_limit(
716
                user, account, container, prefix='', delimiter=None,
717
                virtual=False, domain=None, keys=[], shared=False, until=None,
718
                size_range=None, all_props=True, public=False)
719
            paths = []
720
            for t in src_names:
721
                path = '/'.join((account, container, t[0]))
722
                node = t[2]
723
                if not self._exists(node):
724
                    continue
725
                src_version_id, dest_version_id = self._put_version_duplicate(
726
                    user, node, size=0, type='', hash=None, checksum='',
727
                    cluster=CLUSTER_DELETED,
728
                    update_statistics_ancestors_depth=1)
729
                del_size = self._apply_versioning(
730
                    account, container, src_version_id,
731
                    update_statistics_ancestors_depth=1)
732
                self._report_size_change(
733
                    user, account, -del_size, {
734
                        'action': 'object delete',
735
                        'path': path,
736
                        'versions': ','.join([str(dest_version_id)])})
737
                self._report_object_change(
738
                    user, account, path, details={'action': 'object delete'})
739
                paths.append(path)
740
            self.permissions.access_clear_bulk(paths)
741

    
742
    def _list_objects(self, user, account, container, prefix, delimiter,
743
                      marker, limit, virtual, domain, keys, shared, until,
744
                      size_range, all_props, public):
745
        if user != account and until:
746
            raise NotAllowedError
747

    
748
        objects = []
749
        if shared and public:
750
            # get shared first
751
            shared_paths = self._list_object_permissions(
752
                user, account, container, prefix, shared=True, public=False)
753
            if shared_paths:
754
                path, node = self._lookup_container(account, container)
755
                shared_paths = self._get_formatted_paths(shared_paths)
756
                objects = set(self._list_object_properties(
757
                    node, path, prefix, delimiter, marker, limit, virtual,
758
                    domain, keys, until, size_range, shared_paths, all_props))
759

    
760
            # get public
761
            objects |= set(self._list_public_object_properties(
762
                user, account, container, prefix, all_props))
763
            objects = list(objects)
764

    
765
            objects.sort(key=lambda x: x[0])
766
        elif public:
767
            objects = self._list_public_object_properties(
768
                user, account, container, prefix, all_props)
769
        else:
770
            allowed = self._list_object_permissions(
771
                user, account, container, prefix, shared, public=False)
772
            if shared and not allowed:
773
                return []
774
            path, node = self._lookup_container(account, container)
775
            allowed = self._get_formatted_paths(allowed)
776
            objects = self._list_object_properties(
777
                node, path, prefix, delimiter, marker, limit, virtual, domain,
778
                keys, until, size_range, allowed, all_props)
779

    
780
        # apply limits
781
        start, limit = self._list_limits(objects, marker, limit)
782
        return objects[start:start + limit]
783

    
784
    def _list_public_object_properties(self, user, account, container, prefix,
785
                                       all_props):
786
        public = self._list_object_permissions(
787
            user, account, container, prefix, shared=False, public=True)
788
        paths, nodes = self._lookup_objects(public)
789
        path = '/'.join((account, container))
790
        cont_prefix = path + '/'
791
        paths = [x[len(cont_prefix):] for x in paths]
792
        objects = [(p,) + props for p, props in
793
                   zip(paths, self.node.version_lookup_bulk(
794
                       nodes, all_props=all_props, order_by_path=True))]
795
        return objects
796

    
797
    def _list_objects_no_limit(self, user, account, container, prefix,
798
                               delimiter, virtual, domain, keys, shared, until,
799
                               size_range, all_props, public):
800
        objects = []
801
        while True:
802
            marker = objects[-1] if objects else None
803
            limit = 10000
804
            l = self._list_objects(
805
                user, account, container, prefix, delimiter, marker, limit,
806
                virtual, domain, keys, shared, until, size_range, all_props,
807
                public)
808
            objects.extend(l)
809
            if not l or len(l) < limit:
810
                break
811
        return objects
812

    
813
    def _list_object_permissions(self, user, account, container, prefix,
814
                                 shared, public):
815
        allowed = []
816
        path = '/'.join((account, container, prefix)).rstrip('/')
817
        if user != account:
818
            allowed = self.permissions.access_list_paths(user, path)
819
            if not allowed:
820
                raise NotAllowedError
821
        else:
822
            allowed = set()
823
            if shared:
824
                allowed.update(self.permissions.access_list_shared(path))
825
            if public:
826
                allowed.update(
827
                    [x[0] for x in self.permissions.public_list(path)])
828
            allowed = sorted(allowed)
829
            if not allowed:
830
                return []
831
        return allowed
832

    
833
    @debug_method
834
    @backend_method
835
    def list_objects(self, user, account, container, prefix='', delimiter=None,
836
                     marker=None, limit=10000, virtual=True, domain=None,
837
                     keys=None, shared=False, until=None, size_range=None,
838
                     public=False):
839
        """List (object name, object version_id) under a container."""
840

    
841
        keys = keys or []
842
        return self._list_objects(
843
            user, account, container, prefix, delimiter, marker, limit,
844
            virtual, domain, keys, shared, until, size_range, False, public)
845

    
846
    @debug_method
847
    @backend_method
848
    def list_object_meta(self, user, account, container, prefix='',
849
                         delimiter=None, marker=None, limit=10000,
850
                         virtual=True, domain=None, keys=None, shared=False,
851
                         until=None, size_range=None, public=False):
852
        """Return a list of metadata dicts of objects under a container."""
853

    
854
        keys = keys or []
855
        props = self._list_objects(
856
            user, account, container, prefix, delimiter, marker, limit,
857
            virtual, domain, keys, shared, until, size_range, True, public)
858
        objects = []
859
        for p in props:
860
            if len(p) == 2:
861
                objects.append({'subdir': p[0]})
862
            else:
863
                objects.append({
864
                    'name': p[0],
865
                    'bytes': p[self.SIZE + 1],
866
                    'type': p[self.TYPE + 1],
867
                    'hash': p[self.HASH + 1],
868
                    'version': p[self.SERIAL + 1],
869
                    'version_timestamp': p[self.MTIME + 1],
870
                    'modified': p[self.MTIME + 1] if until is None else None,
871
                    'modified_by': p[self.MUSER + 1],
872
                    'uuid': p[self.UUID + 1],
873
                    'checksum': p[self.CHECKSUM + 1]})
874
        return objects
875

    
876
    @debug_method
877
    @backend_method
878
    def list_object_permissions(self, user, account, container, prefix=''):
879
        """Return a list of paths enforce permissions under a container."""
880

    
881
        return self._list_object_permissions(user, account, container, prefix,
882
                                             True, False)
883

    
884
    @debug_method
885
    @backend_method
886
    def list_object_public(self, user, account, container, prefix=''):
887
        """Return a mapping of object paths to public ids under a container."""
888

    
889
        public = {}
890
        for path, p in self.permissions.public_list('/'.join((account,
891
                                                              container,
892
                                                              prefix))):
893
            public[path] = p
894
        return public
895

    
896
    @debug_method
897
    @backend_method
898
    def get_object_meta(self, user, account, container, name, domain,
899
                        version=None, include_user_defined=True):
900
        """Return a dictionary with the object metadata for the domain."""
901

    
902
        self._can_read(user, account, container, name)
903
        path, node = self._lookup_object(account, container, name)
904
        props = self._get_version(node, version)
905
        if version is None:
906
            modified = props[self.MTIME]
907
        else:
908
            try:
909
                modified = self._get_version(
910
                    node)[self.MTIME]  # Overall last modification.
911
            except NameError:  # Object may be deleted.
912
                del_props = self.node.version_lookup(
913
                    node, inf, CLUSTER_DELETED)
914
                if del_props is None:
915
                    raise ItemNotExists('Object does not exist')
916
                modified = del_props[self.MTIME]
917

    
918
        meta = {}
919
        if include_user_defined:
920
            meta.update(
921
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
922
        meta.update({'name': name,
923
                     'bytes': props[self.SIZE],
924
                     'type': props[self.TYPE],
925
                     'hash': props[self.HASH],
926
                     'version': props[self.SERIAL],
927
                     'version_timestamp': props[self.MTIME],
928
                     'modified': modified,
929
                     'modified_by': props[self.MUSER],
930
                     'uuid': props[self.UUID],
931
                     'checksum': props[self.CHECKSUM]})
932
        return meta
933

    
934
    @debug_method
935
    @backend_method
936
    def update_object_meta(self, user, account, container, name, domain, meta,
937
                           replace=False):
938
        """Update object metadata for a domain and return the new version."""
939

    
940
        self._can_write(user, account, container, name)
941

    
942
        path, node = self._lookup_object(account, container, name,
943
                                         lock_container=True)
944
        src_version_id, dest_version_id = self._put_metadata(
945
            user, node, domain, meta, replace,
946
            update_statistics_ancestors_depth=1)
947
        self._apply_versioning(account, container, src_version_id,
948
                               update_statistics_ancestors_depth=1)
949
        return dest_version_id
950

    
951
    @debug_method
952
    @backend_method
953
    def get_object_permissions_bulk(self, user, account, container, names):
954
        """Return the action allowed on the object, the path
955
        from which the object gets its permissions from,
956
        along with a dictionary containing the permissions."""
957

    
958
        permissions_path = self._get_permissions_path_bulk(account, container,
959
                                                           names)
960
        access_objects = self.permissions.access_check_bulk(permissions_path,
961
                                                            user)
962
        #group_parents = access_objects['group_parents']
963
        nobject_permissions = {}
964
        cpath = '/'.join((account, container, ''))
965
        cpath_idx = len(cpath)
966
        for path in permissions_path:
967
            allowed = 1
968
            name = path[cpath_idx:]
969
            if user != account:
970
                try:
971
                    allowed = access_objects[path]
972
                except KeyError:
973
                    raise NotAllowedError
974
            access_dict, allowed = \
975
                self.permissions.access_get_for_bulk(access_objects[path])
976
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
977
                                         access_dict)
978
        self._lookup_objects(permissions_path)
979
        return nobject_permissions
980

    
981
    @debug_method
982
    @backend_method
983
    def get_object_permissions(self, user, account, container, name):
984
        """Return the action allowed on the object, the path
985
        from which the object gets its permissions from,
986
        along with a dictionary containing the permissions."""
987

    
988
        allowed = 'write'
989
        permissions_path = self._get_permissions_path(account, container, name)
990
        if user != account:
991
            if self.permissions.access_check(permissions_path, self.WRITE,
992
                                             user):
993
                allowed = 'write'
994
            elif self.permissions.access_check(permissions_path, self.READ,
995
                                               user):
996
                allowed = 'read'
997
            else:
998
                raise NotAllowedError
999
        self._lookup_object(account, container, name)
1000
        return (allowed,
1001
                permissions_path,
1002
                self.permissions.access_get(permissions_path))
1003

    
1004
    @debug_method
1005
    @backend_method
1006
    def update_object_permissions(self, user, account, container, name,
1007
                                  permissions):
1008
        """Update the permissions associated with the object."""
1009

    
1010
        if user != account:
1011
            raise NotAllowedError
1012
        path = self._lookup_object(account, container, name,
1013
                                   lock_container=True)[0]
1014
        self._check_permissions(path, permissions)
1015
        try:
1016
            self.permissions.access_set(path, permissions)
1017
        except:
1018
            raise ValueError
1019
        else:
1020
            self._report_sharing_change(user, account, path, {'members':
1021
                                        self.permissions.access_members(path)})
1022

    
1023
    @debug_method
1024
    @backend_method
1025
    def get_object_public(self, user, account, container, name):
1026
        """Return the public id of the object if applicable."""
1027

    
1028
        self._can_read(user, account, container, name)
1029
        path = self._lookup_object(account, container, name)[0]
1030
        p = self.permissions.public_get(path)
1031
        return p
1032

    
1033
    @debug_method
1034
    @backend_method
1035
    def update_object_public(self, user, account, container, name, public):
1036
        """Update the public status of the object."""
1037

    
1038
        self._can_write(user, account, container, name)
1039
        path = self._lookup_object(account, container, name,
1040
                                   lock_container=True)[0]
1041
        if not public:
1042
            self.permissions.public_unset(path)
1043
        else:
1044
            self.permissions.public_set(
1045
                path, self.public_url_security, self.public_url_alphabet)
1046

    
1047
    @debug_method
1048
    @backend_method
1049
    def get_object_hashmap(self, user, account, container, name, version=None):
1050
        """Return the object's size and a list with partial hashes."""
1051

    
1052
        self._can_read(user, account, container, name)
1053
        path, node = self._lookup_object(account, container, name)
1054
        props = self._get_version(node, version)
1055
        if props[self.HASH] is None:
1056
            return 0, ()
1057
        if props[self.HASH].startswith('archip:'):
1058
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1059
                                                     props[self.SIZE])
1060
            return props[self.SIZE], [x for x in hashmap]
1061
        else:
1062
            hashmap = self.store.map_get(self._unhexlify_hash(
1063
                props[self.HASH]))
1064
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1065

    
1066
    def _update_object_hash(self, user, account, container, name, size, type,
1067
                            hash, checksum, domain, meta, replace_meta,
1068
                            permissions, src_node=None, src_version_id=None,
1069
                            is_copy=False, report_size_change=True):
1070
        if permissions is not None and user != account:
1071
            raise NotAllowedError
1072
        self._can_write(user, account, container, name)
1073
        if permissions is not None:
1074
            path = '/'.join((account, container, name))
1075
            self._check_permissions(path, permissions)
1076

    
1077
        account_path, account_node = self._lookup_account(account, True)
1078
        container_path, container_node = self._lookup_container(
1079
            account, container)
1080

    
1081
        path, node = self._put_object_node(
1082
            container_path, container_node, name)
1083
        pre_version_id, dest_version_id = self._put_version_duplicate(
1084
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1085
            checksum=checksum, is_copy=is_copy,
1086
            update_statistics_ancestors_depth=1)
1087

    
1088
        # Handle meta.
1089
        if src_version_id is None:
1090
            src_version_id = pre_version_id
1091
        self._put_metadata_duplicate(
1092
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1093

    
1094
        del_size = self._apply_versioning(account, container, pre_version_id,
1095
                                          update_statistics_ancestors_depth=1)
1096
        size_delta = size - del_size
1097
        if size_delta > 0:
1098
            # Check account quota.
1099
            if not self.using_external_quotaholder:
1100
                account_quota = long(self._get_policy(
1101
                    account_node, is_account_policy=True)['quota'])
1102
                account_usage = self._get_statistics(account_node,
1103
                                                     compute=True)[1]
1104
                if (account_quota > 0 and account_usage > account_quota):
1105
                    raise QuotaError(
1106
                        'Account quota exceeded: limit: %s, usage: %s' % (
1107
                            account_quota, account_usage))
1108

    
1109
            # Check container quota.
1110
            container_quota = long(self._get_policy(
1111
                container_node, is_account_policy=False)['quota'])
1112
            container_usage = self._get_statistics(container_node)[1]
1113
            if (container_quota > 0 and container_usage > container_quota):
1114
                # This must be executed in a transaction, so the version is
1115
                # never created if it fails.
1116
                raise QuotaError(
1117
                    'Container quota exceeded: limit: %s, usage: %s' % (
1118
                        container_quota, container_usage
1119
                    )
1120
                )
1121

    
1122
        if report_size_change:
1123
            self._report_size_change(
1124
                user, account, size_delta,
1125
                {'action': 'object update', 'path': path,
1126
                 'versions': ','.join([str(dest_version_id)])})
1127
        if permissions is not None:
1128
            self.permissions.access_set(path, permissions)
1129
            self._report_sharing_change(
1130
                user, account, path,
1131
                {'members': self.permissions.access_members(path)})
1132

    
1133
        self._report_object_change(
1134
            user, account, path,
1135
            details={'version': dest_version_id, 'action': 'object update'})
1136
        return dest_version_id
1137

    
1138
    @debug_method
1139
    @backend_method
1140
    def register_object_map(self, user, account, container, name, size, type,
1141
                            mapfile, checksum='', domain='pithos', meta=None,
1142
                            replace_meta=False, permissions=None):
1143
        """Register an object mapfile without providing any data.
1144

1145
        Lock the container path, create a node pointing to the object path,
1146
        create a version pointing to the mapfile
1147
        and issue the size change in the quotaholder.
1148

1149
        :param user: the user account which performs the action
1150

1151
        :param account: the account under which the object resides
1152

1153
        :param container: the container under which the object resides
1154

1155
        :param name: the object name
1156

1157
        :param size: the object size
1158

1159
        :param type: the object mimetype
1160

1161
        :param mapfile: the mapfile pointing to the object data
1162

1163
        :param checkcum: the md5 checksum (optional)
1164

1165
        :param domain: the object domain
1166

1167
        :param meta: a dict with custom object metadata
1168

1169
        :param replace_meta: replace existing metadata or not
1170

1171
        :param permissions: a dict with the read and write object permissions
1172

1173
        :returns: the new object uuid
1174

1175
        :raises: ItemNotExists, NotAllowedError, QuotaError
1176
        """
1177

    
1178
        meta = meta or {}
1179
        try:
1180
            self.lock_container_path = True
1181
            self.put_container(user, account, container, policy=None)
1182
        except ContainerExists:
1183
            pass
1184
        finally:
1185
            self.lock_container_path = False
1186
        dest_version_id = self._update_object_hash(
1187
            user, account, container, name, size, type, mapfile, checksum,
1188
            domain, meta, replace_meta, permissions)
1189
        return self.node.version_get_properties(dest_version_id,
1190
                                                keys=('uuid',))[0]
1191

    
1192
    @debug_method
1193
    def update_object_hashmap(self, user, account, container, name, size, type,
1194
                              hashmap, checksum, domain, meta=None,
1195
                              replace_meta=False, permissions=None):
1196
        """Create/update an object's hashmap and return the new version."""
1197

    
1198
        for h in hashmap:
1199
            if h.startswith('archip_'):
1200
                raise IllegalOperationError(
1201
                    'Cannot update Archipelago Volume hashmap.')
1202
        meta = meta or {}
1203
        if size == 0:  # No such thing as an empty hashmap.
1204
            hashmap = [self.put_block('')]
1205
        map = HashMap(self.block_size, self.hash_algorithm)
1206
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1207
        missing = self.store.block_search(map)
1208
        if missing:
1209
            ie = IndexError()
1210
            ie.data = [binascii.hexlify(x) for x in missing]
1211
            raise ie
1212

    
1213
        hash = map.hash()
1214
        hexlified = binascii.hexlify(hash)
1215
        # _update_object_hash() locks destination path
1216
        dest_version_id = self._update_object_hash(
1217
            user, account, container, name, size, type, hexlified, checksum,
1218
            domain, meta, replace_meta, permissions)
1219
        self.store.map_put(hash, map)
1220
        return dest_version_id, hexlified
1221

    
1222
    @debug_method
1223
    @backend_method
1224
    def update_object_checksum(self, user, account, container, name, version,
1225
                               checksum):
1226
        """Update an object's checksum."""
1227

    
1228
        # Update objects with greater version and same hashmap
1229
        # and size (fix metadata updates).
1230
        self._can_write(user, account, container, name)
1231
        path, node = self._lookup_object(account, container, name,
1232
                                         lock_container=True)
1233
        props = self._get_version(node, version)
1234
        versions = self.node.node_get_versions(node)
1235
        for x in versions:
1236
            if (x[self.SERIAL] >= int(version) and
1237
                x[self.HASH] == props[self.HASH] and
1238
                    x[self.SIZE] == props[self.SIZE]):
1239
                self.node.version_put_property(
1240
                    x[self.SERIAL], 'checksum', checksum)
1241

    
1242
    def _copy_object(self, user, src_account, src_container, src_name,
1243
                     dest_account, dest_container, dest_name, type,
1244
                     dest_domain=None, dest_meta=None, replace_meta=False,
1245
                     permissions=None, src_version=None, is_move=False,
1246
                     delimiter=None):
1247

    
1248
        report_size_change = not is_move
1249
        dest_meta = dest_meta or {}
1250
        dest_version_ids = []
1251
        self._can_read(user, src_account, src_container, src_name)
1252

    
1253
        src_container_path = '/'.join((src_account, src_container))
1254
        dest_container_path = '/'.join((dest_account, dest_container))
1255
        # Lock container paths in alphabetical order
1256
        if src_container_path < dest_container_path:
1257
            self._lookup_container(src_account, src_container)
1258
            self._lookup_container(dest_account, dest_container)
1259
        else:
1260
            self._lookup_container(dest_account, dest_container)
1261
            self._lookup_container(src_account, src_container)
1262

    
1263
        path, node = self._lookup_object(src_account, src_container, src_name)
1264
        # TODO: Will do another fetch of the properties in duplicate version...
1265
        props = self._get_version(
1266
            node, src_version)  # Check to see if source exists.
1267
        src_version_id = props[self.SERIAL]
1268
        hash = props[self.HASH]
1269
        size = props[self.SIZE]
1270
        is_copy = not is_move and (src_account, src_container, src_name) != (
1271
            dest_account, dest_container, dest_name)  # New uuid.
1272
        dest_version_ids.append(self._update_object_hash(
1273
            user, dest_account, dest_container, dest_name, size, type, hash,
1274
            None, dest_domain, dest_meta, replace_meta, permissions,
1275
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1276
            report_size_change=report_size_change))
1277
        if is_move and ((src_account, src_container, src_name) !=
1278
                        (dest_account, dest_container, dest_name)):
1279
            self._delete_object(user, src_account, src_container, src_name,
1280
                                report_size_change=report_size_change)
1281

    
1282
        if delimiter:
1283
            prefix = (src_name + delimiter if not
1284
                      src_name.endswith(delimiter) else src_name)
1285
            src_names = self._list_objects_no_limit(
1286
                user, src_account, src_container, prefix, delimiter=None,
1287
                virtual=False, domain=None, keys=[], shared=False, until=None,
1288
                size_range=None, all_props=True, public=False)
1289
            src_names.sort(key=lambda x: x[2])  # order by nodes
1290
            paths = [elem[0] for elem in src_names]
1291
            nodes = [elem[2] for elem in src_names]
1292
            # TODO: Will do another fetch of the properties
1293
            # in duplicate version...
1294
            props = self._get_versions(nodes)  # Check to see if source exists.
1295

    
1296
            for prop, path, node in zip(props, paths, nodes):
1297
                src_version_id = prop[self.SERIAL]
1298
                hash = prop[self.HASH]
1299
                vtype = prop[self.TYPE]
1300
                size = prop[self.SIZE]
1301
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1302
                    delimiter) else dest_name
1303
                vdest_name = path.replace(prefix, dest_prefix, 1)
1304
                # _update_object_hash() locks destination path
1305
                dest_version_ids.append(self._update_object_hash(
1306
                    user, dest_account, dest_container, vdest_name, size,
1307
                    vtype, hash, None, dest_domain, meta={},
1308
                    replace_meta=False, permissions=None, src_node=node,
1309
                    src_version_id=src_version_id, is_copy=is_copy,
1310
                    report_size_change=report_size_change))
1311
                if is_move and ((src_account, src_container, src_name) !=
1312
                                (dest_account, dest_container, dest_name)):
1313
                    self._delete_object(user, src_account, src_container, path,
1314
                                        report_size_change=report_size_change)
1315
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1316
                dest_version_ids)
1317

    
1318
    @debug_method
1319
    @backend_method
1320
    def copy_object(self, user, src_account, src_container, src_name,
1321
                    dest_account, dest_container, dest_name, type, domain,
1322
                    meta=None, replace_meta=False, permissions=None,
1323
                    src_version=None, delimiter=None):
1324
        """Copy an object's data and metadata."""
1325

    
1326
        meta = meta or {}
1327
        dest_version_id = self._copy_object(
1328
            user, src_account, src_container, src_name, dest_account,
1329
            dest_container, dest_name, type, domain, meta, replace_meta,
1330
            permissions, src_version, False, delimiter)
1331
        return dest_version_id
1332

    
1333
    @debug_method
1334
    @backend_method
1335
    def move_object(self, user, src_account, src_container, src_name,
1336
                    dest_account, dest_container, dest_name, type, domain,
1337
                    meta=None, replace_meta=False, permissions=None,
1338
                    delimiter=None):
1339
        """Move an object's data and metadata."""
1340

    
1341
        meta = meta or {}
1342
        if user != src_account:
1343
            raise NotAllowedError
1344
        dest_version_id = self._move_object(
1345
            user, src_account, src_container, src_name, dest_account,
1346
            dest_container, dest_name, type, domain, meta, replace_meta,
1347
            permissions, None, delimiter=delimiter)
1348
        return dest_version_id
1349

    
1350
    def _delete_object(self, user, account, container, name, until=None,
1351
                       delimiter=None, report_size_change=True):
1352
        if user != account:
1353
            raise NotAllowedError
1354

    
1355
        # lookup object and lock container path also
1356
        path, node = self._lookup_object(account, container, name,
1357
                                         lock_container=True)
1358

    
1359
        if until is not None:
1360
            if node is None:
1361
                return
1362
            hashes = []
1363
            size = 0
1364
            serials = []
1365
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1366
                                           update_statistics_ancestors_depth=1)
1367
            hashes += h
1368
            size += s
1369
            serials += v
1370
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1371
                                           update_statistics_ancestors_depth=1)
1372
            hashes += h
1373
            if not self.free_versioning:
1374
                size += s
1375
            serials += v
1376
            for h in hashes:
1377
                self.store.map_delete(h)
1378
            self.node.node_purge(node, until, CLUSTER_DELETED,
1379
                                 update_statistics_ancestors_depth=1)
1380
            try:
1381
                self._get_version(node)
1382
            except NameError:
1383
                self.permissions.access_clear(path)
1384
            self._report_size_change(
1385
                user, account, -size, {
1386
                    'action': 'object purge',
1387
                    'path': path,
1388
                    'versions': ','.join(str(i) for i in serials)
1389
                }
1390
            )
1391
            return
1392

    
1393
        if not self._exists(node):
1394
            raise ItemNotExists('Object is deleted.')
1395

    
1396
        src_version_id, dest_version_id = self._put_version_duplicate(
1397
            user, node, size=0, type='', hash=None, checksum='',
1398
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1399
        del_size = self._apply_versioning(account, container, src_version_id,
1400
                                          update_statistics_ancestors_depth=1)
1401
        if report_size_change:
1402
            self._report_size_change(
1403
                user, account, -del_size,
1404
                {'action': 'object delete',
1405
                 'path': path,
1406
                 'versions': ','.join([str(dest_version_id)])})
1407
        self._report_object_change(
1408
            user, account, path, details={'action': 'object delete'})
1409
        self.permissions.access_clear(path)
1410

    
1411
        if delimiter:
1412
            prefix = name + delimiter if not name.endswith(delimiter) else name
1413
            src_names = self._list_objects_no_limit(
1414
                user, account, container, prefix, delimiter=None,
1415
                virtual=False, domain=None, keys=[], shared=False, until=None,
1416
                size_range=None, all_props=True, public=False)
1417
            paths = []
1418
            for t in src_names:
1419
                path = '/'.join((account, container, t[0]))
1420
                node = t[2]
1421
                if not self._exists(node):
1422
                    continue
1423
                src_version_id, dest_version_id = self._put_version_duplicate(
1424
                    user, node, size=0, type='', hash=None, checksum='',
1425
                    cluster=CLUSTER_DELETED,
1426
                    update_statistics_ancestors_depth=1)
1427
                del_size = self._apply_versioning(
1428
                    account, container, src_version_id,
1429
                    update_statistics_ancestors_depth=1)
1430
                if report_size_change:
1431
                    self._report_size_change(
1432
                        user, account, -del_size,
1433
                        {'action': 'object delete',
1434
                         'path': path,
1435
                         'versions': ','.join([str(dest_version_id)])})
1436
                self._report_object_change(
1437
                    user, account, path, details={'action': 'object delete'})
1438
                paths.append(path)
1439
            self.permissions.access_clear_bulk(paths)
1440

    
1441
    @debug_method
1442
    @backend_method
1443
    def delete_object(self, user, account, container, name, until=None,
1444
                      prefix='', delimiter=None):
1445
        """Delete/purge an object."""
1446

    
1447
        self._delete_object(user, account, container, name, until, delimiter)
1448

    
1449
    @debug_method
1450
    @backend_method
1451
    def list_versions(self, user, account, container, name):
1452
        """Return a list of all object (version, version_timestamp) tuples."""
1453

    
1454
        self._can_read(user, account, container, name)
1455
        path, node = self._lookup_object(account, container, name)
1456
        versions = self.node.node_get_versions(node)
1457
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1458
                x[self.CLUSTER] != CLUSTER_DELETED]
1459

    
1460
    @debug_method
1461
    @backend_method
1462
    def get_uuid(self, user, uuid, check_permissions=True):
1463
        """Return the (account, container, name) for the UUID given."""
1464

    
1465
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1466
        if info is None:
1467
            raise NameError
1468
        path, serial = info
1469
        account, container, name = path.split('/', 2)
1470
        if check_permissions:
1471
            self._can_read(user, account, container, name)
1472
        return (account, container, name)
1473

    
1474
    @debug_method
1475
    @backend_method
1476
    def get_public(self, user, public):
1477
        """Return the (account, container, name) for the public id given."""
1478

    
1479
        path = self.permissions.public_path(public)
1480
        if path is None:
1481
            raise NameError
1482
        account, container, name = path.split('/', 2)
1483
        self._can_read(user, account, container, name)
1484
        return (account, container, name)
1485

    
1486
    def get_block(self, hash):
1487
        """Return a block's data."""
1488

    
1489
        logger.debug("get_block: %s", hash)
1490
        if hash.startswith('archip_'):
1491
            block = self.store.block_get_archipelago(hash)
1492
        else:
1493
            block = self.store.block_get(self._unhexlify_hash(hash))
1494
        if not block:
1495
            raise ItemNotExists('Block does not exist')
1496
        return block
1497

    
1498
    def put_block(self, data):
1499
        """Store a block and return the hash."""
1500

    
1501
        logger.debug("put_block: %s", len(data))
1502
        return binascii.hexlify(self.store.block_put(data))
1503

    
1504
    def update_block(self, hash, data, offset=0):
1505
        """Update a known block and return the hash."""
1506

    
1507
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1508
        if hash.startswith('archip_'):
1509
            raise IllegalOperationError(
1510
                'Cannot update an Archipelago Volume block.')
1511
        if offset == 0 and len(data) == self.block_size:
1512
            return self.put_block(data)
1513
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1514
        return binascii.hexlify(h)
1515

    
1516
    # Path functions.
1517

    
1518
    def _generate_uuid(self):
1519
        return str(uuidlib.uuid4())
1520

    
1521
    def _put_object_node(self, path, parent, name):
1522
        path = '/'.join((path, name))
1523
        node = self.node.node_lookup(path)
1524
        if node is None:
1525
            node = self.node.node_create(parent, path)
1526
        return path, node
1527

    
1528
    def _put_path(self, user, parent, path,
1529
                  update_statistics_ancestors_depth=None):
1530
        node = self.node.node_create(parent, path)
1531
        self.node.version_create(node, None, 0, '', None, user,
1532
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1533
                                 update_statistics_ancestors_depth)
1534
        return node
1535

    
1536
    def _lookup_account(self, account, create=True):
1537
        node = self.node.node_lookup(account)
1538
        if node is None and create:
1539
            node = self._put_path(
1540
                account, self.ROOTNODE, account,
1541
                update_statistics_ancestors_depth=-1)  # User is account.
1542
        return account, node
1543

    
1544
    def _lookup_container(self, account, container):
1545
        for_update = True if self.lock_container_path else False
1546
        path = '/'.join((account, container))
1547
        node = self.node.node_lookup(path, for_update)
1548
        if node is None:
1549
            raise ItemNotExists('Container does not exist')
1550
        return path, node
1551

    
1552
    def _lookup_object(self, account, container, name, lock_container=False):
1553
        if lock_container:
1554
            self._lookup_container(account, container)
1555

    
1556
        path = '/'.join((account, container, name))
1557
        node = self.node.node_lookup(path)
1558
        if node is None:
1559
            raise ItemNotExists('Object does not exist')
1560
        return path, node
1561

    
1562
    def _lookup_objects(self, paths):
1563
        nodes = self.node.node_lookup_bulk(paths)
1564
        return paths, nodes
1565

    
1566
    def _get_properties(self, node, until=None):
1567
        """Return properties until the timestamp given."""
1568

    
1569
        before = until if until is not None else inf
1570
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1571
        if props is None and until is not None:
1572
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1573
        if props is None:
1574
            raise ItemNotExists('Path does not exist')
1575
        return props
1576

    
1577
    def _get_statistics(self, node, until=None, compute=False):
1578
        """Return (count, sum of size, timestamp) of everything under node."""
1579

    
1580
        if until is not None:
1581
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1582
        elif compute:
1583
            stats = self.node.statistics_latest(node,
1584
                                                except_cluster=CLUSTER_DELETED)
1585
        else:
1586
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1587
        if stats is None:
1588
            stats = (0, 0, 0)
1589
        return stats
1590

    
1591
    def _get_version(self, node, version=None):
1592
        if version is None:
1593
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1594
            if props is None:
1595
                raise ItemNotExists('Object does not exist')
1596
        else:
1597
            try:
1598
                version = int(version)
1599
            except ValueError:
1600
                raise VersionNotExists('Version does not exist')
1601
            props = self.node.version_get_properties(version, node=node)
1602
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1603
                raise VersionNotExists('Version does not exist')
1604
        return props
1605

    
1606
    def _get_versions(self, nodes):
1607
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1608

    
1609
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1610
                               type=None, hash=None, checksum=None,
1611
                               cluster=CLUSTER_NORMAL, is_copy=False,
1612
                               update_statistics_ancestors_depth=None):
1613
        """Create a new version of the node."""
1614

    
1615
        props = self.node.version_lookup(
1616
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1617
        if props is not None:
1618
            src_version_id = props[self.SERIAL]
1619
            src_hash = props[self.HASH]
1620
            src_size = props[self.SIZE]
1621
            src_type = props[self.TYPE]
1622
            src_checksum = props[self.CHECKSUM]
1623
        else:
1624
            src_version_id = None
1625
            src_hash = None
1626
            src_size = 0
1627
            src_type = ''
1628
            src_checksum = ''
1629
        if size is None:  # Set metadata.
1630
            hash = src_hash  # This way hash can be set to None
1631
                             # (account or container).
1632
            size = src_size
1633
        if type is None:
1634
            type = src_type
1635
        if checksum is None:
1636
            checksum = src_checksum
1637
        uuid = self._generate_uuid(
1638
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1639

    
1640
        if src_node is None:
1641
            pre_version_id = src_version_id
1642
        else:
1643
            pre_version_id = None
1644
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1645
            if props is not None:
1646
                pre_version_id = props[self.SERIAL]
1647
        if pre_version_id is not None:
1648
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1649
                                        update_statistics_ancestors_depth)
1650

    
1651
        dest_version_id, mtime = self.node.version_create(
1652
            node, hash, size, type, src_version_id, user, uuid, checksum,
1653
            cluster, update_statistics_ancestors_depth)
1654

    
1655
        self.node.attribute_unset_is_latest(node, dest_version_id)
1656

    
1657
        return pre_version_id, dest_version_id
1658

    
1659
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1660
                                node, meta, replace=False):
1661
        if src_version_id is not None:
1662
            self.node.attribute_copy(src_version_id, dest_version_id)
1663
        if not replace:
1664
            self.node.attribute_del(dest_version_id, domain, (
1665
                k for k, v in meta.iteritems() if v == ''))
1666
            self.node.attribute_set(dest_version_id, domain, node, (
1667
                (k, v) for k, v in meta.iteritems() if v != ''))
1668
        else:
1669
            self.node.attribute_del(dest_version_id, domain)
1670
            self.node.attribute_set(dest_version_id, domain, node, ((
1671
                k, v) for k, v in meta.iteritems()))
1672

    
1673
    def _put_metadata(self, user, node, domain, meta, replace=False,
1674
                      update_statistics_ancestors_depth=None):
1675
        """Create a new version and store metadata."""
1676

    
1677
        src_version_id, dest_version_id = self._put_version_duplicate(
1678
            user, node,
1679
            update_statistics_ancestors_depth=
1680
            update_statistics_ancestors_depth)
1681
        self._put_metadata_duplicate(
1682
            src_version_id, dest_version_id, domain, node, meta, replace)
1683
        return src_version_id, dest_version_id
1684

    
1685
    def _list_limits(self, listing, marker, limit):
1686
        start = 0
1687
        if marker:
1688
            try:
1689
                start = listing.index(marker) + 1
1690
            except ValueError:
1691
                pass
1692
        if not limit or limit > 10000:
1693
            limit = 10000
1694
        return start, limit
1695

    
1696
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1697
                                marker=None, limit=10000, virtual=True,
1698
                                domain=None, keys=None, until=None,
1699
                                size_range=None, allowed=None,
1700
                                all_props=False):
1701
        keys = keys or []
1702
        allowed = allowed or []
1703
        cont_prefix = path + '/'
1704
        prefix = cont_prefix + prefix
1705
        start = cont_prefix + marker if marker else None
1706
        before = until if until is not None else inf
1707
        filterq = keys if domain else []
1708
        sizeq = size_range
1709

    
1710
        objects, prefixes = self.node.latest_version_list(
1711
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1712
            allowed, domain, filterq, sizeq, all_props)
1713
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1714
        objects.sort(key=lambda x: x[0])
1715
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1716
        return objects
1717

    
1718
    # Reporting functions.
1719

    
1720
    @debug_method
1721
    @backend_method
1722
    def _report_size_change(self, user, account, size, details=None):
1723
        details = details or {}
1724

    
1725
        if size == 0:
1726
            return
1727

    
1728
        account_node = self._lookup_account(account, True)[1]
1729
        total = self._get_statistics(account_node, compute=True)[1]
1730
        details.update({'user': user, 'total': total})
1731
        self.messages.append(
1732
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1733
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1734

    
1735
        if not self.using_external_quotaholder:
1736
            return
1737

    
1738
        try:
1739
            name = details['path'] if 'path' in details else ''
1740
            serial = self.astakosclient.issue_one_commission(
1741
                holder=account,
1742
                source=DEFAULT_SOURCE,
1743
                provisions={'pithos.diskspace': size},
1744
                name=name)
1745
        except BaseException, e:
1746
            raise QuotaError(e)
1747
        else:
1748
            self.serials.append(serial)
1749

    
1750
    @debug_method
1751
    @backend_method
1752
    def _report_object_change(self, user, account, path, details=None):
1753
        details = details or {}
1754
        details.update({'user': user})
1755
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1756
                              account, QUEUE_INSTANCE_ID, 'object', path,
1757
                              details))
1758

    
1759
    @debug_method
1760
    @backend_method
1761
    def _report_sharing_change(self, user, account, path, details=None):
1762
        details = details or {}
1763
        details.update({'user': user})
1764
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1765
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1766
                              details))
1767

    
1768
    # Policy functions.
1769

    
1770
    def _check_policy(self, policy, is_account_policy=True):
1771
        default_policy = self.default_account_policy \
1772
            if is_account_policy else self.default_container_policy
1773
        for k in policy.keys():
1774
            if policy[k] == '':
1775
                policy[k] = default_policy.get(k)
1776
        for k, v in policy.iteritems():
1777
            if k == 'quota':
1778
                q = int(v)  # May raise ValueError.
1779
                if q < 0:
1780
                    raise ValueError
1781
            elif k == 'versioning':
1782
                if v not in ['auto', 'none']:
1783
                    raise ValueError
1784
            else:
1785
                raise ValueError
1786

    
1787
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1788
        default_policy = self.default_account_policy \
1789
            if is_account_policy else self.default_container_policy
1790
        if replace:
1791
            for k, v in default_policy.iteritems():
1792
                if k not in policy:
1793
                    policy[k] = v
1794
        self.node.policy_set(node, policy)
1795

    
1796
    def _get_policy(self, node, is_account_policy=True):
1797
        default_policy = self.default_account_policy \
1798
            if is_account_policy else self.default_container_policy
1799
        policy = default_policy.copy()
1800
        policy.update(self.node.policy_get(node))
1801
        return policy
1802

    
1803
    def _apply_versioning(self, account, container, version_id,
1804
                          update_statistics_ancestors_depth=None):
1805
        """Delete the provided version if such is the policy.
1806
           Return size of object removed.
1807
        """
1808

    
1809
        if version_id is None:
1810
            return 0
1811
        path, node = self._lookup_container(account, container)
1812
        versioning = self._get_policy(
1813
            node, is_account_policy=False)['versioning']
1814
        if versioning != 'auto':
1815
            hash, size = self.node.version_remove(
1816
                version_id, update_statistics_ancestors_depth)
1817
            self.store.map_delete(hash)
1818
            return size
1819
        elif self.free_versioning:
1820
            return self.node.version_get_properties(
1821
                version_id, keys=('size',))[0]
1822
        return 0
1823

    
1824
    # Access control functions.
1825

    
1826
    def _check_groups(self, groups):
1827
        # raise ValueError('Bad characters in groups')
1828
        pass
1829

    
1830
    def _check_permissions(self, path, permissions):
1831
        # raise ValueError('Bad characters in permissions')
1832
        pass
1833

    
1834
    def _get_formatted_paths(self, paths):
1835
        formatted = []
1836
        if len(paths) == 0:
1837
            return formatted
1838
        props = self.node.get_props(paths)
1839
        if props:
1840
            for prop in props:
1841
                if prop[1].split(';', 1)[0].strip() in (
1842
                        'application/directory', 'application/folder'):
1843
                    formatted.append((prop[0].rstrip('/') + '/',
1844
                                      self.MATCH_PREFIX))
1845
                formatted.append((prop[0], self.MATCH_EXACT))
1846
        return formatted
1847

    
1848
    def _get_permissions_path(self, account, container, name):
1849
        path = '/'.join((account, container, name))
1850
        permission_paths = self.permissions.access_inherit(path)
1851
        permission_paths.sort()
1852
        permission_paths.reverse()
1853
        for p in permission_paths:
1854
            if p == path:
1855
                return p
1856
            else:
1857
                if p.count('/') < 2:
1858
                    continue
1859
                node = self.node.node_lookup(p)
1860
                props = None
1861
                if node is not None:
1862
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1863
                if props is not None:
1864
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1865
                            'application/directory', 'application/folder'):
1866
                        return p
1867
        return None
1868

    
1869
    def _get_permissions_path_bulk(self, account, container, names):
1870
        formatted_paths = []
1871
        for name in names:
1872
            path = '/'.join((account, container, name))
1873
            formatted_paths.append(path)
1874
        permission_paths = self.permissions.access_inherit_bulk(
1875
            formatted_paths)
1876
        permission_paths.sort()
1877
        permission_paths.reverse()
1878
        permission_paths_list = []
1879
        lookup_list = []
1880
        for p in permission_paths:
1881
            if p in formatted_paths:
1882
                permission_paths_list.append(p)
1883
            else:
1884
                if p.count('/') < 2:
1885
                    continue
1886
                lookup_list.append(p)
1887

    
1888
        if len(lookup_list) > 0:
1889
            props = self.node.get_props(lookup_list)
1890
            if props:
1891
                for prop in props:
1892
                    if prop[1].split(';', 1)[0].strip() in (
1893
                            'application/directory', 'application/folder'):
1894
                        permission_paths_list.append(prop[0])
1895

    
1896
        if len(permission_paths_list) > 0:
1897
            return permission_paths_list
1898

    
1899
        return None
1900

    
1901
    def _can_read(self, user, account, container, name):
1902
        if user == account:
1903
            return True
1904
        path = '/'.join((account, container, name))
1905
        if self.permissions.public_get(path) is not None:
1906
            return True
1907
        path = self._get_permissions_path(account, container, name)
1908
        if not path:
1909
            raise NotAllowedError
1910
        if (not self.permissions.access_check(path, self.READ, user) and not
1911
                self.permissions.access_check(path, self.WRITE, user)):
1912
            raise NotAllowedError
1913

    
1914
    def _can_write(self, user, account, container, name):
1915
        if user == account:
1916
            return True
1917
        path = '/'.join((account, container, name))
1918
        path = self._get_permissions_path(account, container, name)
1919
        if not path:
1920
            raise NotAllowedError
1921
        if not self.permissions.access_check(path, self.WRITE, user):
1922
            raise NotAllowedError
1923

    
1924
    def _allowed_accounts(self, user):
1925
        allow = set()
1926
        for path in self.permissions.access_list_paths(user):
1927
            allow.add(path.split('/', 1)[0])
1928
        return sorted(allow)
1929

    
1930
    def _allowed_containers(self, user, account):
1931
        allow = set()
1932
        for path in self.permissions.access_list_paths(user, account):
1933
            allow.add(path.split('/', 2)[1])
1934
        return sorted(allow)
1935

    
1936
    # Domain functions
1937

    
1938
    @debug_method
1939
    @backend_method
1940
    def get_domain_objects(self, domain, user=None):
1941
        allowed_paths = self.permissions.access_list_paths(
1942
            user, include_owned=user is not None, include_containers=False)
1943
        if not allowed_paths:
1944
            return []
1945
        obj_list = self.node.domain_object_list(
1946
            domain, allowed_paths, CLUSTER_NORMAL)
1947
        return [(path,
1948
                 self._build_metadata(props, user_defined_meta),
1949
                 self.permissions.access_get(path)) for
1950
                path, props, user_defined_meta in obj_list]
1951

    
1952
    # util functions
1953

    
1954
    def _build_metadata(self, props, user_defined=None,
1955
                        include_user_defined=True):
1956
        meta = {'bytes': props[self.SIZE],
1957
                'type': props[self.TYPE],
1958
                'hash': props[self.HASH],
1959
                'version': props[self.SERIAL],
1960
                'version_timestamp': props[self.MTIME],
1961
                'modified_by': props[self.MUSER],
1962
                'uuid': props[self.UUID],
1963
                'checksum': props[self.CHECKSUM]}
1964
        if include_user_defined and user_defined is not None:
1965
            meta.update(user_defined)
1966
        return meta
1967

    
1968
    def _exists(self, node):
1969
        try:
1970
            self._get_version(node)
1971
        except ItemNotExists:
1972
            return False
1973
        else:
1974
            return True
1975

    
1976
    def _unhexlify_hash(self, hash):
1977
        try:
1978
            return binascii.unhexlify(hash)
1979
        except TypeError:
1980
            raise InvalidHash(hash)