Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b48ac643

History | View | Annotate | Download (77.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash, IllegalOperationError)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
124

    
125
logger = logging.getLogger(__name__)
126

    
127

    
128
def backend_method(func):
129
    @wraps(func)
130
    def wrapper(self, *args, **kw):
131
        # if we are inside a database transaction
132
        # just proceed with the method execution
133
        # otherwise manage a new transaction
134
        if self.in_transaction:
135
            return func(self, *args, **kw)
136

    
137
        try:
138
            self.pre_exec()
139
            result = func(self, *args, **kw)
140
            success_status = True
141
            return result
142
        except:
143
            success_status = False
144
            raise
145
        finally:
146
            self.post_exec(success_status)
147
    return wrapper
148

    
149

    
150
def debug_method(func):
151
    @wraps(func)
152
    def wrapper(self, *args, **kw):
153
        try:
154
            result = func(self, *args, **kw)
155
            return result
156
        except:
157
            result = format_exc()
158
            raise
159
        finally:
160
            all_args = map(repr, args)
161
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
162
            logger.debug(">>> %s(%s) <<< %s" % (
163
                func.__name__, ', '.join(all_args).rstrip(', '), result))
164
    return wrapper
165

    
166

    
167
def list_method(func):
168
    @wraps(func)
169
    def wrapper(self, *args, **kw):
170
        marker = kw.get('marker')
171
        limit = kw.get('limit')
172
        result = func(self, *args, **kw)
173
        start, limit = self._list_limits(result, marker, limit)
174
        return result[start:start + limit]
175
    return wrapper
176

    
177

    
178
class ModularBackend(BaseBackend):
179
    """A modular backend.
180

181
    Uses modules for SQL functions and storage.
182
    """
183

    
184
    def __init__(self, db_module=None, db_connection=None,
185
                 block_module=None, block_path=None, block_umask=None,
186
                 block_size=None, hash_algorithm=None,
187
                 queue_module=None, queue_hosts=None, queue_exchange=None,
188
                 astakos_auth_url=None, service_token=None,
189
                 astakosclient_poolsize=None,
190
                 free_versioning=True, block_params=None,
191
                 public_url_security=None,
192
                 public_url_alphabet=None,
193
                 account_quota_policy=None,
194
                 container_quota_policy=None,
195
                 container_versioning_policy=None):
196
        db_module = db_module or DEFAULT_DB_MODULE
197
        db_connection = db_connection or DEFAULT_DB_CONNECTION
198
        block_module = block_module or DEFAULT_BLOCK_MODULE
199
        block_path = block_path or DEFAULT_BLOCK_PATH
200
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
201
        block_params = block_params or DEFAULT_BLOCK_PARAMS
202
        block_size = block_size or DEFAULT_BLOCK_SIZE
203
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
204
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
205
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
206
        container_quota_policy = container_quota_policy \
207
            or DEFAULT_CONTAINER_QUOTA
208
        container_versioning_policy = container_versioning_policy \
209
            or DEFAULT_CONTAINER_VERSIONING
210

    
211
        self.default_account_policy = {'quota': account_quota_policy}
212
        self.default_container_policy = {
213
            'quota': container_quota_policy,
214
            'versioning': container_versioning_policy
215
        }
216
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
217
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
218

    
219
        self.public_url_security = (public_url_security or
220
                                    DEFAULT_PUBLIC_URL_SECURITY)
221
        self.public_url_alphabet = (public_url_alphabet or
222
                                    DEFAULT_PUBLIC_URL_ALPHABET)
223

    
224
        self.hash_algorithm = hash_algorithm
225
        self.block_size = block_size
226
        self.free_versioning = free_versioning
227

    
228
        def load_module(m):
229
            __import__(m)
230
            return sys.modules[m]
231

    
232
        self.db_module = load_module(db_module)
233
        self.wrapper = self.db_module.DBWrapper(db_connection)
234
        params = {'wrapper': self.wrapper}
235
        self.permissions = self.db_module.Permissions(**params)
236
        self.config = self.db_module.Config(**params)
237
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
238
        for x in ['READ', 'WRITE']:
239
            setattr(self, x, getattr(self.db_module, x))
240
        self.node = self.db_module.Node(**params)
241
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
242
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
243
                  'MATCH_PREFIX', 'MATCH_EXACT']:
244
            setattr(self, x, getattr(self.db_module, x))
245

    
246
        self.ALLOWED = ['read', 'write']
247

    
248
        self.block_module = load_module(block_module)
249
        self.block_params = block_params
250
        params = {'path': block_path,
251
                  'block_size': self.block_size,
252
                  'hash_algorithm': self.hash_algorithm,
253
                  'umask': block_umask}
254
        params.update(self.block_params)
255
        self.store = self.block_module.Store(**params)
256

    
257
        if queue_module and queue_hosts:
258
            self.queue_module = load_module(queue_module)
259
            params = {'hosts': queue_hosts,
260
                      'exchange': queue_exchange,
261
                      'client_id': QUEUE_CLIENT_ID}
262
            self.queue = self.queue_module.Queue(**params)
263
        else:
264
            class NoQueue:
265
                def send(self, *args):
266
                    pass
267

    
268
                def close(self):
269
                    pass
270

    
271
            self.queue = NoQueue()
272

    
273
        self.astakos_auth_url = astakos_auth_url
274
        self.service_token = service_token
275

    
276
        if not astakos_auth_url or not AstakosClient:
277
            self.astakosclient = DisabledAstakosClient(
278
                service_token, astakos_auth_url,
279
                use_pool=True,
280
                pool_size=astakosclient_poolsize)
281
        else:
282
            self.astakosclient = AstakosClient(
283
                service_token, astakos_auth_url,
284
                use_pool=True,
285
                pool_size=astakosclient_poolsize)
286

    
287
        self.serials = []
288
        self.messages = []
289

    
290
        self._move_object = partial(self._copy_object, is_move=True)
291

    
292
        self.lock_container_path = False
293

    
294
        self.in_transaction = False
295

    
296
    def pre_exec(self, lock_container_path=False):
297
        self.lock_container_path = lock_container_path
298
        self.wrapper.execute()
299
        self.serials = []
300
        self.in_transaction = True
301

    
302
    def post_exec(self, success_status=True):
303
        if success_status:
304
            # send messages produced
305
            for m in self.messages:
306
                self.queue.send(*m)
307

    
308
            # register serials
309
            if self.serials:
310
                self.commission_serials.insert_many(
311
                    self.serials)
312

    
313
                # commit to ensure that the serials are registered
314
                # even if resolve commission fails
315
                self.wrapper.commit()
316

    
317
                # start new transaction
318
                self.wrapper.execute()
319

    
320
                r = self.astakosclient.resolve_commissions(
321
                    accept_serials=self.serials,
322
                    reject_serials=[])
323
                self.commission_serials.delete_many(
324
                    r['accepted'])
325

    
326
            self.wrapper.commit()
327
        else:
328
            if self.serials:
329
                r = self.astakosclient.resolve_commissions(
330
                    accept_serials=[],
331
                    reject_serials=self.serials)
332
                self.commission_serials.delete_many(
333
                    r['rejected'])
334
            self.wrapper.rollback()
335
        self.in_transaction = False
336

    
337
    def close(self):
338
        self.wrapper.close()
339
        self.queue.close()
340

    
341
    @property
342
    def using_external_quotaholder(self):
343
        return not isinstance(self.astakosclient, DisabledAstakosClient)
344

    
345
    @debug_method
346
    @backend_method
347
    @list_method
348
    def list_accounts(self, user, marker=None, limit=10000):
349
        """Return a list of accounts the user can access."""
350

    
351
        return self._allowed_accounts(user)
352

    
353
    def _get_account_quotas(self, account):
354
        """Get account usage from astakos."""
355

    
356
        quotas = self.astakosclient.service_get_quotas(account)[account]
357
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
358
                                                  {})
359

    
360
    def _get_account_quotas(self, account):
361
        """Get account usage from astakos."""
362

    
363
        quotas = self.astakosclient.service_get_quotas(account)[account]
364
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
365
                                                  {})
366

    
367
    @debug_method
368
    @backend_method
369
    def get_account_meta(self, user, account, domain, until=None,
370
                         include_user_defined=True):
371
        """Return a dictionary with the account metadata for the domain."""
372

    
373
        path, node = self._lookup_account(account, user == account)
374
        if user != account:
375
            if until or (node is None) or (account not
376
                                           in self._allowed_accounts(user)):
377
                raise NotAllowedError
378
        try:
379
            props = self._get_properties(node, until)
380
            mtime = props[self.MTIME]
381
        except NameError:
382
            props = None
383
            mtime = until
384
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
385
        tstamp = max(tstamp, mtime)
386
        if until is None:
387
            modified = tstamp
388
        else:
389
            modified = self._get_statistics(
390
                node, compute=True)[2]  # Overall last modification.
391
            modified = max(modified, mtime)
392

    
393
        if user != account:
394
            meta = {'name': account}
395
        else:
396
            meta = {}
397
            if props is not None and include_user_defined:
398
                meta.update(
399
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
400
            if until is not None:
401
                meta.update({'until_timestamp': tstamp})
402
            meta.update({'name': account, 'count': count, 'bytes': bytes})
403
            if self.using_external_quotaholder:
404
                external_quota = self._get_account_quotas(account)
405
                meta['bytes'] = external_quota.get('usage', 0)
406
        meta.update({'modified': modified})
407
        return meta
408

    
409
    @debug_method
410
    @backend_method
411
    def update_account_meta(self, user, account, domain, meta, replace=False):
412
        """Update the metadata associated with the account for the domain."""
413

    
414
        if user != account:
415
            raise NotAllowedError
416
        path, node = self._lookup_account(account, True)
417
        self._put_metadata(user, node, domain, meta, replace,
418
                           update_statistics_ancestors_depth=-1)
419

    
420
    @debug_method
421
    @backend_method
422
    def get_account_groups(self, user, account):
423
        """Return a dictionary with the user groups defined for the account."""
424

    
425
        if user != account:
426
            if account not in self._allowed_accounts(user):
427
                raise NotAllowedError
428
            return {}
429
        self._lookup_account(account, True)
430
        return self.permissions.group_dict(account)
431

    
432
    @debug_method
433
    @backend_method
434
    def update_account_groups(self, user, account, groups, replace=False):
435
        """Update the groups associated with the account."""
436

    
437
        if user != account:
438
            raise NotAllowedError
439
        self._lookup_account(account, True)
440
        self._check_groups(groups)
441
        if replace:
442
            self.permissions.group_destroy(account)
443
        for k, v in groups.iteritems():
444
            if not replace:  # If not already deleted.
445
                self.permissions.group_delete(account, k)
446
            if v:
447
                self.permissions.group_addmany(account, k, v)
448

    
449
    @debug_method
450
    @backend_method
451
    def get_account_policy(self, user, account):
452
        """Return a dictionary with the account policy."""
453

    
454
        if user != account:
455
            if account not in self._allowed_accounts(user):
456
                raise NotAllowedError
457
            return {}
458
        path, node = self._lookup_account(account, True)
459
        policy = self._get_policy(node, is_account_policy=True)
460
        if self.using_external_quotaholder:
461
            external_quota = self._get_account_quotas(account)
462
            policy['quota'] = external_quota.get('limit', 0)
463
        return policy
464

    
465
    @debug_method
466
    @backend_method
467
    def update_account_policy(self, user, account, policy, replace=False):
468
        """Update the policy associated with the account."""
469

    
470
        if user != account:
471
            raise NotAllowedError
472
        path, node = self._lookup_account(account, True)
473
        self._check_policy(policy, is_account_policy=True)
474
        self._put_policy(node, policy, replace, is_account_policy=True)
475

    
476
    @debug_method
477
    @backend_method
478
    def put_account(self, user, account, policy=None):
479
        """Create a new account with the given name."""
480

    
481
        policy = policy or {}
482
        if user != account:
483
            raise NotAllowedError
484
        node = self.node.node_lookup(account)
485
        if node is not None:
486
            raise AccountExists('Account already exists')
487
        if policy:
488
            self._check_policy(policy, is_account_policy=True)
489
        node = self._put_path(user, self.ROOTNODE, account,
490
                              update_statistics_ancestors_depth=-1)
491
        self._put_policy(node, policy, True, is_account_policy=True)
492

    
493
    @debug_method
494
    @backend_method
495
    def delete_account(self, user, account):
496
        """Delete the account with the given name."""
497

    
498
        if user != account:
499
            raise NotAllowedError
500
        node = self.node.node_lookup(account)
501
        if node is None:
502
            return
503
        if not self.node.node_remove(node,
504
                                     update_statistics_ancestors_depth=-1):
505
            raise AccountNotEmpty('Account is not empty')
506
        self.permissions.group_destroy(account)
507

    
508
    @debug_method
509
    @backend_method
510
    @list_method
511
    def list_containers(self, user, account, marker=None, limit=10000,
512
                        shared=False, until=None, public=False):
513
        """Return a list of containers existing under an account."""
514

    
515
        if user != account:
516
            if until or account not in self._allowed_accounts(user):
517
                raise NotAllowedError
518
            return self._allowed_containers(user, account)
519
        if shared or public:
520
            allowed = set()
521
            if shared:
522
                allowed.update([x.split('/', 2)[1] for x in
523
                               self.permissions.access_list_shared(account)])
524
            if public:
525
                allowed.update([x[0].split('/', 2)[1] for x in
526
                               self.permissions.public_list(account)])
527
            return sorted(allowed)
528
        node = self.node.node_lookup(account)
529
        return [x[0] for x in self._list_object_properties(
530
            node, account, '', '/', marker, limit, False, None, [], until)]
531

    
532
    @debug_method
533
    @backend_method
534
    def list_container_meta(self, user, account, container, domain,
535
                            until=None):
536
        """Return a list of the container's object meta keys for a domain."""
537

    
538
        allowed = []
539
        if user != account:
540
            if until:
541
                raise NotAllowedError
542
            allowed = self.permissions.access_list_paths(
543
                user, '/'.join((account, container)))
544
            if not allowed:
545
                raise NotAllowedError
546
        path, node = self._lookup_container(account, container)
547
        before = until if until is not None else inf
548
        allowed = self._get_formatted_paths(allowed)
549
        return self.node.latest_attribute_keys(node, domain, before,
550
                                               CLUSTER_DELETED, allowed)
551

    
552
    @debug_method
553
    @backend_method
554
    def get_container_meta(self, user, account, container, domain, until=None,
555
                           include_user_defined=True):
556
        """Return a dictionary with the container metadata for the domain."""
557

    
558
        if user != account:
559
            if until or container not in self._allowed_containers(user,
560
                                                                  account):
561
                raise NotAllowedError
562
        path, node = self._lookup_container(account, container)
563
        props = self._get_properties(node, until)
564
        mtime = props[self.MTIME]
565
        count, bytes, tstamp = self._get_statistics(node, until)
566
        tstamp = max(tstamp, mtime)
567
        if until is None:
568
            modified = tstamp
569
        else:
570
            modified = self._get_statistics(
571
                node)[2]  # Overall last modification.
572
            modified = max(modified, mtime)
573

    
574
        if user != account:
575
            meta = {'name': container}
576
        else:
577
            meta = {}
578
            if include_user_defined:
579
                meta.update(
580
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
581
            if until is not None:
582
                meta.update({'until_timestamp': tstamp})
583
            meta.update({'name': container, 'count': count, 'bytes': bytes})
584
        meta.update({'modified': modified})
585
        return meta
586

    
587
    @debug_method
588
    @backend_method
589
    def update_container_meta(self, user, account, container, domain, meta,
590
                              replace=False):
591
        """Update the metadata associated with the container for the domain."""
592

    
593
        if user != account:
594
            raise NotAllowedError
595
        path, node = self._lookup_container(account, container)
596
        src_version_id, dest_version_id = self._put_metadata(
597
            user, node, domain, meta, replace,
598
            update_statistics_ancestors_depth=0)
599
        if src_version_id is not None:
600
            versioning = self._get_policy(
601
                node, is_account_policy=False)['versioning']
602
            if versioning != 'auto':
603
                self.node.version_remove(src_version_id,
604
                                         update_statistics_ancestors_depth=0)
605

    
606
    @debug_method
607
    @backend_method
608
    def get_container_policy(self, user, account, container):
609
        """Return a dictionary with the container policy."""
610

    
611
        if user != account:
612
            if container not in self._allowed_containers(user, account):
613
                raise NotAllowedError
614
            return {}
615
        path, node = self._lookup_container(account, container)
616
        return self._get_policy(node, is_account_policy=False)
617

    
618
    @debug_method
619
    @backend_method
620
    def update_container_policy(self, user, account, container, policy,
621
                                replace=False):
622
        """Update the policy associated with the container."""
623

    
624
        if user != account:
625
            raise NotAllowedError
626
        path, node = self._lookup_container(account, container)
627
        self._check_policy(policy, is_account_policy=False)
628
        self._put_policy(node, policy, replace, is_account_policy=False)
629

    
630
    @debug_method
631
    @backend_method
632
    def put_container(self, user, account, container, policy=None):
633
        """Create a new container with the given name."""
634

    
635
        policy = policy or {}
636
        if user != account:
637
            raise NotAllowedError
638
        try:
639
            path, node = self._lookup_container(account, container)
640
        except NameError:
641
            pass
642
        else:
643
            raise ContainerExists('Container already exists')
644
        if policy:
645
            self._check_policy(policy, is_account_policy=False)
646
        path = '/'.join((account, container))
647
        node = self._put_path(
648
            user, self._lookup_account(account, True)[1], path,
649
            update_statistics_ancestors_depth=-1)
650
        self._put_policy(node, policy, True, is_account_policy=False)
651

    
652
    @debug_method
653
    @backend_method
654
    def delete_container(self, user, account, container, until=None, prefix='',
655
                         delimiter=None):
656
        """Delete/purge the container with the given name."""
657

    
658
        if user != account:
659
            raise NotAllowedError
660
        path, node = self._lookup_container(account, container)
661

    
662
        if until is not None:
663
            hashes, size, serials = self.node.node_purge_children(
664
                node, until, CLUSTER_HISTORY,
665
                update_statistics_ancestors_depth=0)
666
            for h in hashes:
667
                self.store.map_delete(h)
668
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
669
                                          update_statistics_ancestors_depth=0)
670
            if not self.free_versioning:
671
                self._report_size_change(
672
                    user, account, -size, {
673
                        'action': 'container purge',
674
                        'path': path,
675
                        'versions': ','.join(str(i) for i in serials)
676
                    }
677
                )
678
            return
679

    
680
        if not delimiter:
681
            if self._get_statistics(node)[0] > 0:
682
                raise ContainerNotEmpty('Container is not empty')
683
            hashes, size, serials = self.node.node_purge_children(
684
                node, inf, CLUSTER_HISTORY,
685
                update_statistics_ancestors_depth=0)
686
            for h in hashes:
687
                self.store.map_delete(h)
688
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
689
                                          update_statistics_ancestors_depth=0)
690
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
691
            if not self.free_versioning:
692
                self._report_size_change(
693
                    user, account, -size, {
694
                        'action': 'container purge',
695
                        'path': path,
696
                        'versions': ','.join(str(i) for i in serials)
697
                    }
698
                )
699
        else:
700
            # remove only contents
701
            src_names = self._list_objects_no_limit(
702
                user, account, container, prefix='', delimiter=None,
703
                virtual=False, domain=None, keys=[], shared=False, until=None,
704
                size_range=None, all_props=True, public=False)
705
            paths = []
706
            for t in src_names:
707
                path = '/'.join((account, container, t[0]))
708
                node = t[2]
709
                if not self._exists(node):
710
                    continue
711
                src_version_id, dest_version_id = self._put_version_duplicate(
712
                    user, node, size=0, type='', hash=None, checksum='',
713
                    cluster=CLUSTER_DELETED,
714
                    update_statistics_ancestors_depth=1)
715
                del_size = self._apply_versioning(
716
                    account, container, src_version_id,
717
                    update_statistics_ancestors_depth=1)
718
                self._report_size_change(
719
                    user, account, -del_size, {
720
                        'action': 'object delete',
721
                        'path': path,
722
                        'versions': ','.join([str(dest_version_id)])})
723
                self._report_object_change(
724
                    user, account, path, details={'action': 'object delete'})
725
                paths.append(path)
726
            self.permissions.access_clear_bulk(paths)
727

    
728
    def _list_objects(self, user, account, container, prefix, delimiter,
729
                      marker, limit, virtual, domain, keys, shared, until,
730
                      size_range, all_props, public):
731
        if user != account and until:
732
            raise NotAllowedError
733

    
734
        objects = []
735
        if shared and public:
736
            # get shared first
737
            shared_paths = self._list_object_permissions(
738
                user, account, container, prefix, shared=True, public=False)
739
            if shared_paths:
740
                path, node = self._lookup_container(account, container)
741
                shared_paths = self._get_formatted_paths(shared_paths)
742
                objects = set(self._list_object_properties(
743
                    node, path, prefix, delimiter, marker, limit, virtual,
744
                    domain, keys, until, size_range, shared_paths, all_props))
745

    
746
            # get public
747
            objects |= set(self._list_public_object_properties(
748
                user, account, container, prefix, all_props))
749
            objects = list(objects)
750

    
751
            objects.sort(key=lambda x: x[0])
752
        elif public:
753
            objects = self._list_public_object_properties(
754
                user, account, container, prefix, all_props)
755
        else:
756
            allowed = self._list_object_permissions(
757
                user, account, container, prefix, shared, public=False)
758
            if shared and not allowed:
759
                return []
760
            path, node = self._lookup_container(account, container)
761
            allowed = self._get_formatted_paths(allowed)
762
            objects = self._list_object_properties(
763
                node, path, prefix, delimiter, marker, limit, virtual, domain,
764
                keys, until, size_range, allowed, all_props)
765

    
766
        # apply limits
767
        start, limit = self._list_limits(objects, marker, limit)
768
        return objects[start:start + limit]
769

    
770
    def _list_public_object_properties(self, user, account, container, prefix,
771
                                       all_props):
772
        public = self._list_object_permissions(
773
            user, account, container, prefix, shared=False, public=True)
774
        paths, nodes = self._lookup_objects(public)
775
        path = '/'.join((account, container))
776
        cont_prefix = path + '/'
777
        paths = [x[len(cont_prefix):] for x in paths]
778
        objects = [(p,) + props for p, props in
779
                   zip(paths, self.node.version_lookup_bulk(
780
                       nodes, all_props=all_props, order_by_path=True))]
781
        return objects
782

    
783
    def _list_objects_no_limit(self, user, account, container, prefix,
784
                               delimiter, virtual, domain, keys, shared, until,
785
                               size_range, all_props, public):
786
        objects = []
787
        while True:
788
            marker = objects[-1] if objects else None
789
            limit = 10000
790
            l = self._list_objects(
791
                user, account, container, prefix, delimiter, marker, limit,
792
                virtual, domain, keys, shared, until, size_range, all_props,
793
                public)
794
            objects.extend(l)
795
            if not l or len(l) < limit:
796
                break
797
        return objects
798

    
799
    def _list_object_permissions(self, user, account, container, prefix,
800
                                 shared, public):
801
        allowed = []
802
        path = '/'.join((account, container, prefix)).rstrip('/')
803
        if user != account:
804
            allowed = self.permissions.access_list_paths(user, path)
805
            if not allowed:
806
                raise NotAllowedError
807
        else:
808
            allowed = set()
809
            if shared:
810
                allowed.update(self.permissions.access_list_shared(path))
811
            if public:
812
                allowed.update(
813
                    [x[0] for x in self.permissions.public_list(path)])
814
            allowed = sorted(allowed)
815
            if not allowed:
816
                return []
817
        return allowed
818

    
819
    @debug_method
820
    @backend_method
821
    def list_objects(self, user, account, container, prefix='', delimiter=None,
822
                     marker=None, limit=10000, virtual=True, domain=None,
823
                     keys=None, shared=False, until=None, size_range=None,
824
                     public=False):
825
        """List (object name, object version_id) under a container."""
826

    
827
        keys = keys or []
828
        return self._list_objects(
829
            user, account, container, prefix, delimiter, marker, limit,
830
            virtual, domain, keys, shared, until, size_range, False, public)
831

    
832
    @debug_method
833
    @backend_method
834
    def list_object_meta(self, user, account, container, prefix='',
835
                         delimiter=None, marker=None, limit=10000,
836
                         virtual=True, domain=None, keys=None, shared=False,
837
                         until=None, size_range=None, public=False):
838
        """Return a list of metadata dicts of objects under a container."""
839

    
840
        keys = keys or []
841
        props = self._list_objects(
842
            user, account, container, prefix, delimiter, marker, limit,
843
            virtual, domain, keys, shared, until, size_range, True, public)
844
        objects = []
845
        for p in props:
846
            if len(p) == 2:
847
                objects.append({'subdir': p[0]})
848
            else:
849
                objects.append({
850
                    'name': p[0],
851
                    'bytes': p[self.SIZE + 1],
852
                    'type': p[self.TYPE + 1],
853
                    'hash': p[self.HASH + 1],
854
                    'version': p[self.SERIAL + 1],
855
                    'version_timestamp': p[self.MTIME + 1],
856
                    'modified': p[self.MTIME + 1] if until is None else None,
857
                    'modified_by': p[self.MUSER + 1],
858
                    'uuid': p[self.UUID + 1],
859
                    'checksum': p[self.CHECKSUM + 1]})
860
        return objects
861

    
862
    @debug_method
863
    @backend_method
864
    def list_object_permissions(self, user, account, container, prefix=''):
865
        """Return a list of paths enforce permissions under a container."""
866

    
867
        return self._list_object_permissions(user, account, container, prefix,
868
                                             True, False)
869

    
870
    @debug_method
871
    @backend_method
872
    def list_object_public(self, user, account, container, prefix=''):
873
        """Return a mapping of object paths to public ids under a container."""
874

    
875
        public = {}
876
        for path, p in self.permissions.public_list('/'.join((account,
877
                                                              container,
878
                                                              prefix))):
879
            public[path] = p
880
        return public
881

    
882
    @debug_method
883
    @backend_method
884
    def get_object_meta(self, user, account, container, name, domain,
885
                        version=None, include_user_defined=True):
886
        """Return a dictionary with the object metadata for the domain."""
887

    
888
        self._can_read(user, account, container, name)
889
        path, node = self._lookup_object(account, container, name)
890
        props = self._get_version(node, version)
891
        if version is None:
892
            modified = props[self.MTIME]
893
        else:
894
            try:
895
                modified = self._get_version(
896
                    node)[self.MTIME]  # Overall last modification.
897
            except NameError:  # Object may be deleted.
898
                del_props = self.node.version_lookup(
899
                    node, inf, CLUSTER_DELETED)
900
                if del_props is None:
901
                    raise ItemNotExists('Object does not exist')
902
                modified = del_props[self.MTIME]
903

    
904
        meta = {}
905
        if include_user_defined:
906
            meta.update(
907
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
908
        meta.update({'name': name,
909
                     'bytes': props[self.SIZE],
910
                     'type': props[self.TYPE],
911
                     'hash': props[self.HASH],
912
                     'version': props[self.SERIAL],
913
                     'version_timestamp': props[self.MTIME],
914
                     'modified': modified,
915
                     'modified_by': props[self.MUSER],
916
                     'uuid': props[self.UUID],
917
                     'checksum': props[self.CHECKSUM]})
918
        return meta
919

    
920
    @debug_method
921
    @backend_method
922
    def update_object_meta(self, user, account, container, name, domain, meta,
923
                           replace=False):
924
        """Update object metadata for a domain and return the new version."""
925

    
926
        self._can_write(user, account, container, name)
927

    
928
        path, node = self._lookup_object(account, container, name,
929
                                         lock_container=True)
930
        src_version_id, dest_version_id = self._put_metadata(
931
            user, node, domain, meta, replace,
932
            update_statistics_ancestors_depth=1)
933
        self._apply_versioning(account, container, src_version_id,
934
                               update_statistics_ancestors_depth=1)
935
        return dest_version_id
936

    
937
    @debug_method
938
    @backend_method
939
    def get_object_permissions_bulk(self, user, account, container, names):
940
        """Return the action allowed on the object, the path
941
        from which the object gets its permissions from,
942
        along with a dictionary containing the permissions."""
943

    
944
        permissions_path = self._get_permissions_path_bulk(account, container,
945
                                                           names)
946
        access_objects = self.permissions.access_check_bulk(permissions_path,
947
                                                            user)
948
        #group_parents = access_objects['group_parents']
949
        nobject_permissions = {}
950
        cpath = '/'.join((account, container, ''))
951
        cpath_idx = len(cpath)
952
        for path in permissions_path:
953
            allowed = 1
954
            name = path[cpath_idx:]
955
            if user != account:
956
                try:
957
                    allowed = access_objects[path]
958
                except KeyError:
959
                    raise NotAllowedError
960
            access_dict, allowed = \
961
                self.permissions.access_get_for_bulk(access_objects[path])
962
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
963
                                         access_dict)
964
        self._lookup_objects(permissions_path)
965
        return nobject_permissions
966

    
967
    @debug_method
968
    @backend_method
969
    def get_object_permissions(self, user, account, container, name):
970
        """Return the action allowed on the object, the path
971
        from which the object gets its permissions from,
972
        along with a dictionary containing the permissions."""
973

    
974
        allowed = 'write'
975
        permissions_path = self._get_permissions_path(account, container, name)
976
        if user != account:
977
            if self.permissions.access_check(permissions_path, self.WRITE,
978
                                             user):
979
                allowed = 'write'
980
            elif self.permissions.access_check(permissions_path, self.READ,
981
                                               user):
982
                allowed = 'read'
983
            else:
984
                raise NotAllowedError
985
        self._lookup_object(account, container, name)
986
        return (allowed,
987
                permissions_path,
988
                self.permissions.access_get(permissions_path))
989

    
990
    @debug_method
991
    @backend_method
992
    def update_object_permissions(self, user, account, container, name,
993
                                  permissions):
994
        """Update the permissions associated with the object."""
995

    
996
        if user != account:
997
            raise NotAllowedError
998
        path = self._lookup_object(account, container, name,
999
                                   lock_container=True)[0]
1000
        self._check_permissions(path, permissions)
1001
        try:
1002
            self.permissions.access_set(path, permissions)
1003
        except:
1004
            raise ValueError
1005
        else:
1006
            self._report_sharing_change(user, account, path, {'members':
1007
                                        self.permissions.access_members(path)})
1008

    
1009
    @debug_method
1010
    @backend_method
1011
    def get_object_public(self, user, account, container, name):
1012
        """Return the public id of the object if applicable."""
1013

    
1014
        self._can_read(user, account, container, name)
1015
        path = self._lookup_object(account, container, name)[0]
1016
        p = self.permissions.public_get(path)
1017
        return p
1018

    
1019
    @debug_method
1020
    @backend_method
1021
    def update_object_public(self, user, account, container, name, public):
1022
        """Update the public status of the object."""
1023

    
1024
        self._can_write(user, account, container, name)
1025
        path = self._lookup_object(account, container, name,
1026
                                   lock_container=True)[0]
1027
        if not public:
1028
            self.permissions.public_unset(path)
1029
        else:
1030
            self.permissions.public_set(
1031
                path, self.public_url_security, self.public_url_alphabet)
1032

    
1033
    @debug_method
1034
    @backend_method
1035
    def get_object_hashmap(self, user, account, container, name, version=None):
1036
        """Return the object's size and a list with partial hashes."""
1037

    
1038
        self._can_read(user, account, container, name)
1039
        path, node = self._lookup_object(account, container, name)
1040
        props = self._get_version(node, version)
1041
        if props[self.HASH] is None:
1042
            return 0, ()
1043
        if props[self.HASH].startswith('archip:'):
1044
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1045
                                                     props[self.SIZE])
1046
            return props[self.SIZE], [x for x in hashmap]
1047
        else:
1048
            hashmap = self.store.map_get(self._unhexlify_hash(
1049
                props[self.HASH]))
1050
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1051

    
1052
    def _update_object_hash(self, user, account, container, name, size, type,
1053
                            hash, checksum, domain, meta, replace_meta,
1054
                            permissions, src_node=None, src_version_id=None,
1055
                            is_copy=False, report_size_change=True):
1056
        if permissions is not None and user != account:
1057
            raise NotAllowedError
1058
        self._can_write(user, account, container, name)
1059
        if permissions is not None:
1060
            path = '/'.join((account, container, name))
1061
            self._check_permissions(path, permissions)
1062

    
1063
        account_path, account_node = self._lookup_account(account, True)
1064
        container_path, container_node = self._lookup_container(
1065
            account, container)
1066

    
1067
        path, node = self._put_object_node(
1068
            container_path, container_node, name)
1069
        pre_version_id, dest_version_id = self._put_version_duplicate(
1070
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1071
            checksum=checksum, is_copy=is_copy,
1072
            update_statistics_ancestors_depth=1)
1073

    
1074
        # Handle meta.
1075
        if src_version_id is None:
1076
            src_version_id = pre_version_id
1077
        self._put_metadata_duplicate(
1078
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1079

    
1080
        del_size = self._apply_versioning(account, container, pre_version_id,
1081
                                          update_statistics_ancestors_depth=1)
1082
        size_delta = size - del_size
1083
        if size_delta > 0:
1084
            # Check account quota.
1085
            if not self.using_external_quotaholder:
1086
                account_quota = long(self._get_policy(
1087
                    account_node, is_account_policy=True)['quota'])
1088
                account_usage = self._get_statistics(account_node,
1089
                                                     compute=True)[1]
1090
                if (account_quota > 0 and account_usage > account_quota):
1091
                    raise QuotaError(
1092
                        'Account quota exceeded: limit: %s, usage: %s' % (
1093
                            account_quota, account_usage))
1094

    
1095
            # Check container quota.
1096
            container_quota = long(self._get_policy(
1097
                container_node, is_account_policy=False)['quota'])
1098
            container_usage = self._get_statistics(container_node)[1]
1099
            if (container_quota > 0 and container_usage > container_quota):
1100
                # This must be executed in a transaction, so the version is
1101
                # never created if it fails.
1102
                raise QuotaError(
1103
                    'Container quota exceeded: limit: %s, usage: %s' % (
1104
                        container_quota, container_usage
1105
                    )
1106
                )
1107

    
1108
        if report_size_change:
1109
            self._report_size_change(
1110
                user, account, size_delta,
1111
                {'action': 'object update', 'path': path,
1112
                 'versions': ','.join([str(dest_version_id)])})
1113
        if permissions is not None:
1114
            self.permissions.access_set(path, permissions)
1115
            self._report_sharing_change(
1116
                user, account, path,
1117
                {'members': self.permissions.access_members(path)})
1118

    
1119
        self._report_object_change(
1120
            user, account, path,
1121
            details={'version': dest_version_id, 'action': 'object update'})
1122
        return dest_version_id
1123

    
1124
    @debug_method
1125
    @backend_method
1126
    def register_object_map(self, user, account, container, name, size, type,
1127
                            mapfile, checksum='', domain='pithos', meta=None,
1128
                            replace_meta=False, permissions=None):
1129
        """Register an object mapfile without providing any data.
1130

1131
        Lock the container path, create a node pointing to the object path,
1132
        create a version pointing to the mapfile
1133
        and issue the size change in the quotaholder.
1134

1135
        :param user: the user account which performs the action
1136

1137
        :param account: the account under which the object resides
1138

1139
        :param container: the container under which the object resides
1140

1141
        :param name: the object name
1142

1143
        :param size: the object size
1144

1145
        :param type: the object mimetype
1146

1147
        :param mapfile: the mapfile pointing to the object data
1148

1149
        :param checkcum: the md5 checksum (optional)
1150

1151
        :param domain: the object domain
1152

1153
        :param meta: a dict with custom object metadata
1154

1155
        :param replace_meta: replace existing metadata or not
1156

1157
        :param permissions: a dict with the read and write object permissions
1158

1159
        :returns: the new object uuid
1160

1161
        :raises: ItemNotExists, NotAllowedError, QuotaError
1162
        """
1163

    
1164
        meta = meta or {}
1165
        try:
1166
            self.lock_container_path = True
1167
            self.put_container(user, account, container, policy=None)
1168
        except ContainerExists:
1169
            pass
1170
        finally:
1171
            self.lock_container_path = False
1172
        dest_version_id = self._update_object_hash(
1173
            user, account, container, name, size, type, mapfile, checksum,
1174
            domain, meta, replace_meta, permissions)
1175
        return self.node.version_get_properties(dest_version_id,
1176
                                                keys=('uuid',))[0]
1177

    
1178
    @debug_method
1179
    def update_object_hashmap(self, user, account, container, name, size, type,
1180
                              hashmap, checksum, domain, meta=None,
1181
                              replace_meta=False, permissions=None):
1182
        """Create/update an object's hashmap and return the new version."""
1183

    
1184
        for h in hashmap:
1185
            if h.startswith('archip_'):
1186
                raise IllegalOperationError(
1187
                        'Cannot update Archipelago Volume hashmap.')
1188
        meta = meta or {}
1189
        if size == 0:  # No such thing as an empty hashmap.
1190
            hashmap = [self.put_block('')]
1191
        map = HashMap(self.block_size, self.hash_algorithm)
1192
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1193
        missing = self.store.block_search(map)
1194
        if missing:
1195
            ie = IndexError()
1196
            ie.data = [binascii.hexlify(x) for x in missing]
1197
            raise ie
1198

    
1199
        hash = map.hash()
1200
        hexlified = binascii.hexlify(hash)
1201
        # _update_object_hash() locks destination path
1202
        dest_version_id = self._update_object_hash(
1203
            user, account, container, name, size, type, hexlified, checksum,
1204
            domain, meta, replace_meta, permissions)
1205
        self.store.map_put(hash, map)
1206
        return dest_version_id, hexlified
1207

    
1208
    @debug_method
1209
    @backend_method
1210
    def update_object_checksum(self, user, account, container, name, version,
1211
                               checksum):
1212
        """Update an object's checksum."""
1213

    
1214
        # Update objects with greater version and same hashmap
1215
        # and size (fix metadata updates).
1216
        self._can_write(user, account, container, name)
1217
        path, node = self._lookup_object(account, container, name,
1218
                                         lock_container=True)
1219
        props = self._get_version(node, version)
1220
        versions = self.node.node_get_versions(node)
1221
        for x in versions:
1222
            if (x[self.SERIAL] >= int(version) and
1223
                x[self.HASH] == props[self.HASH] and
1224
                    x[self.SIZE] == props[self.SIZE]):
1225
                self.node.version_put_property(
1226
                    x[self.SERIAL], 'checksum', checksum)
1227

    
1228
    def _copy_object(self, user, src_account, src_container, src_name,
1229
                     dest_account, dest_container, dest_name, type,
1230
                     dest_domain=None, dest_meta=None, replace_meta=False,
1231
                     permissions=None, src_version=None, is_move=False,
1232
                     delimiter=None):
1233

    
1234
        report_size_change = not is_move
1235
        dest_meta = dest_meta or {}
1236
        dest_version_ids = []
1237
        self._can_read(user, src_account, src_container, src_name)
1238

    
1239
        src_container_path = '/'.join((src_account, src_container))
1240
        dest_container_path = '/'.join((dest_account, dest_container))
1241
        # Lock container paths in alphabetical order
1242
        if src_container_path < dest_container_path:
1243
            self._lookup_container(src_account, src_container)
1244
            self._lookup_container(dest_account, dest_container)
1245
        else:
1246
            self._lookup_container(dest_account, dest_container)
1247
            self._lookup_container(src_account, src_container)
1248

    
1249
        path, node = self._lookup_object(src_account, src_container, src_name)
1250
        # TODO: Will do another fetch of the properties in duplicate version...
1251
        props = self._get_version(
1252
            node, src_version)  # Check to see if source exists.
1253
        src_version_id = props[self.SERIAL]
1254
        hash = props[self.HASH]
1255
        size = props[self.SIZE]
1256
        is_copy = not is_move and (src_account, src_container, src_name) != (
1257
            dest_account, dest_container, dest_name)  # New uuid.
1258
        dest_version_ids.append(self._update_object_hash(
1259
            user, dest_account, dest_container, dest_name, size, type, hash,
1260
            None, dest_domain, dest_meta, replace_meta, permissions,
1261
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1262
            report_size_change=report_size_change))
1263
        if is_move and ((src_account, src_container, src_name) !=
1264
                        (dest_account, dest_container, dest_name)):
1265
            self._delete_object(user, src_account, src_container, src_name,
1266
                                report_size_change=report_size_change)
1267

    
1268
        if delimiter:
1269
            prefix = (src_name + delimiter if not
1270
                      src_name.endswith(delimiter) else src_name)
1271
            src_names = self._list_objects_no_limit(
1272
                user, src_account, src_container, prefix, delimiter=None,
1273
                virtual=False, domain=None, keys=[], shared=False, until=None,
1274
                size_range=None, all_props=True, public=False)
1275
            src_names.sort(key=lambda x: x[2])  # order by nodes
1276
            paths = [elem[0] for elem in src_names]
1277
            nodes = [elem[2] for elem in src_names]
1278
            # TODO: Will do another fetch of the properties
1279
            # in duplicate version...
1280
            props = self._get_versions(nodes)  # Check to see if source exists.
1281

    
1282
            for prop, path, node in zip(props, paths, nodes):
1283
                src_version_id = prop[self.SERIAL]
1284
                hash = prop[self.HASH]
1285
                vtype = prop[self.TYPE]
1286
                size = prop[self.SIZE]
1287
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1288
                    delimiter) else dest_name
1289
                vdest_name = path.replace(prefix, dest_prefix, 1)
1290
                # _update_object_hash() locks destination path
1291
                dest_version_ids.append(self._update_object_hash(
1292
                    user, dest_account, dest_container, vdest_name, size,
1293
                    vtype, hash, None, dest_domain, meta={},
1294
                    replace_meta=False, permissions=None, src_node=node,
1295
                    src_version_id=src_version_id, is_copy=is_copy,
1296
                    report_size_change=report_size_change))
1297
                if is_move and ((src_account, src_container, src_name) !=
1298
                                (dest_account, dest_container, dest_name)):
1299
                    self._delete_object(user, src_account, src_container, path,
1300
                                        report_size_change=report_size_change)
1301
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1302
                dest_version_ids)
1303

    
1304
    @debug_method
1305
    @backend_method
1306
    def copy_object(self, user, src_account, src_container, src_name,
1307
                    dest_account, dest_container, dest_name, type, domain,
1308
                    meta=None, replace_meta=False, permissions=None,
1309
                    src_version=None, delimiter=None):
1310
        """Copy an object's data and metadata."""
1311

    
1312
        meta = meta or {}
1313
        dest_version_id = self._copy_object(
1314
            user, src_account, src_container, src_name, dest_account,
1315
            dest_container, dest_name, type, domain, meta, replace_meta,
1316
            permissions, src_version, False, delimiter)
1317
        return dest_version_id
1318

    
1319
    @debug_method
1320
    @backend_method
1321
    def move_object(self, user, src_account, src_container, src_name,
1322
                    dest_account, dest_container, dest_name, type, domain,
1323
                    meta=None, replace_meta=False, permissions=None,
1324
                    delimiter=None):
1325
        """Move an object's data and metadata."""
1326

    
1327
        meta = meta or {}
1328
        if user != src_account:
1329
            raise NotAllowedError
1330
        dest_version_id = self._move_object(
1331
            user, src_account, src_container, src_name, dest_account,
1332
            dest_container, dest_name, type, domain, meta, replace_meta,
1333
            permissions, None, delimiter=delimiter)
1334
        return dest_version_id
1335

    
1336
    def _delete_object(self, user, account, container, name, until=None,
1337
                       delimiter=None, report_size_change=True):
1338
        if user != account:
1339
            raise NotAllowedError
1340

    
1341
        # lookup object and lock container path also
1342
        path, node = self._lookup_object(account, container, name,
1343
                                         lock_container=True)
1344

    
1345
        if until is not None:
1346
            if node is None:
1347
                return
1348
            hashes = []
1349
            size = 0
1350
            serials = []
1351
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1352
                                           update_statistics_ancestors_depth=1)
1353
            hashes += h
1354
            size += s
1355
            serials += v
1356
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1357
                                           update_statistics_ancestors_depth=1)
1358
            hashes += h
1359
            if not self.free_versioning:
1360
                size += s
1361
            serials += v
1362
            for h in hashes:
1363
                self.store.map_delete(h)
1364
            self.node.node_purge(node, until, CLUSTER_DELETED,
1365
                                 update_statistics_ancestors_depth=1)
1366
            try:
1367
                self._get_version(node)
1368
            except NameError:
1369
                self.permissions.access_clear(path)
1370
            self._report_size_change(
1371
                user, account, -size, {
1372
                    'action': 'object purge',
1373
                    'path': path,
1374
                    'versions': ','.join(str(i) for i in serials)
1375
                }
1376
            )
1377
            return
1378

    
1379
        if not self._exists(node):
1380
            raise ItemNotExists('Object is deleted.')
1381

    
1382
        src_version_id, dest_version_id = self._put_version_duplicate(
1383
            user, node, size=0, type='', hash=None, checksum='',
1384
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1385
        del_size = self._apply_versioning(account, container, src_version_id,
1386
                                          update_statistics_ancestors_depth=1)
1387
        if report_size_change:
1388
            self._report_size_change(
1389
                user, account, -del_size,
1390
                {'action': 'object delete',
1391
                 'path': path,
1392
                 'versions': ','.join([str(dest_version_id)])})
1393
        self._report_object_change(
1394
            user, account, path, details={'action': 'object delete'})
1395
        self.permissions.access_clear(path)
1396

    
1397
        if delimiter:
1398
            prefix = name + delimiter if not name.endswith(delimiter) else name
1399
            src_names = self._list_objects_no_limit(
1400
                user, account, container, prefix, delimiter=None,
1401
                virtual=False, domain=None, keys=[], shared=False, until=None,
1402
                size_range=None, all_props=True, public=False)
1403
            paths = []
1404
            for t in src_names:
1405
                path = '/'.join((account, container, t[0]))
1406
                node = t[2]
1407
                if not self._exists(node):
1408
                    continue
1409
                src_version_id, dest_version_id = self._put_version_duplicate(
1410
                    user, node, size=0, type='', hash=None, checksum='',
1411
                    cluster=CLUSTER_DELETED,
1412
                    update_statistics_ancestors_depth=1)
1413
                del_size = self._apply_versioning(
1414
                    account, container, src_version_id,
1415
                    update_statistics_ancestors_depth=1)
1416
                if report_size_change:
1417
                    self._report_size_change(
1418
                        user, account, -del_size,
1419
                        {'action': 'object delete',
1420
                         'path': path,
1421
                         'versions': ','.join([str(dest_version_id)])})
1422
                self._report_object_change(
1423
                    user, account, path, details={'action': 'object delete'})
1424
                paths.append(path)
1425
            self.permissions.access_clear_bulk(paths)
1426

    
1427
    @debug_method
1428
    @backend_method
1429
    def delete_object(self, user, account, container, name, until=None,
1430
                      prefix='', delimiter=None):
1431
        """Delete/purge an object."""
1432

    
1433
        self._delete_object(user, account, container, name, until, delimiter)
1434

    
1435
    @debug_method
1436
    @backend_method
1437
    def list_versions(self, user, account, container, name):
1438
        """Return a list of all object (version, version_timestamp) tuples."""
1439

    
1440
        self._can_read(user, account, container, name)
1441
        path, node = self._lookup_object(account, container, name)
1442
        versions = self.node.node_get_versions(node)
1443
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1444
                x[self.CLUSTER] != CLUSTER_DELETED]
1445

    
1446
    @debug_method
1447
    @backend_method
1448
    def get_uuid(self, user, uuid, check_permissions=True):
1449
        """Return the (account, container, name) for the UUID given."""
1450

    
1451
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1452
        if info is None:
1453
            raise NameError
1454
        path, serial = info
1455
        account, container, name = path.split('/', 2)
1456
        if check_permissions:
1457
            self._can_read(user, account, container, name)
1458
        return (account, container, name)
1459

    
1460
    @debug_method
1461
    @backend_method
1462
    def get_public(self, user, public):
1463
        """Return the (account, container, name) for the public id given."""
1464

    
1465
        path = self.permissions.public_path(public)
1466
        if path is None:
1467
            raise NameError
1468
        account, container, name = path.split('/', 2)
1469
        self._can_read(user, account, container, name)
1470
        return (account, container, name)
1471

    
1472
    def get_block(self, hash):
1473
        """Return a block's data."""
1474

    
1475
        logger.debug("get_block: %s", hash)
1476
        if hash.startswith('archip_'):
1477
            block = self.store.block_get_archipelago(hash)
1478
        else:
1479
            block = self.store.block_get(self._unhexlify_hash(hash))
1480
        if not block:
1481
            raise ItemNotExists('Block does not exist')
1482
        return block
1483

    
1484
    def put_block(self, data):
1485
        """Store a block and return the hash."""
1486

    
1487
        logger.debug("put_block: %s", len(data))
1488
        return binascii.hexlify(self.store.block_put(data))
1489

    
1490
    def update_block(self, hash, data, offset=0):
1491
        """Update a known block and return the hash."""
1492

    
1493
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1494
        if hash.startswith('archip_'):
1495
            raise IllegalOperationError(
1496
                    'Cannot update an Archipelago Volume block.')
1497
        if offset == 0 and len(data) == self.block_size:
1498
            return self.put_block(data)
1499
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1500
        return binascii.hexlify(h)
1501

    
1502
    # Path functions.
1503

    
1504
    def _generate_uuid(self):
1505
        return str(uuidlib.uuid4())
1506

    
1507
    def _put_object_node(self, path, parent, name):
1508
        path = '/'.join((path, name))
1509
        node = self.node.node_lookup(path)
1510
        if node is None:
1511
            node = self.node.node_create(parent, path)
1512
        return path, node
1513

    
1514
    def _put_path(self, user, parent, path,
1515
                  update_statistics_ancestors_depth=None):
1516
        node = self.node.node_create(parent, path)
1517
        self.node.version_create(node, None, 0, '', None, user,
1518
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1519
                                 update_statistics_ancestors_depth)
1520
        return node
1521

    
1522
    def _lookup_account(self, account, create=True):
1523
        node = self.node.node_lookup(account)
1524
        if node is None and create:
1525
            node = self._put_path(
1526
                account, self.ROOTNODE, account,
1527
                update_statistics_ancestors_depth=-1)  # User is account.
1528
        return account, node
1529

    
1530
    def _lookup_container(self, account, container):
1531
        for_update = True if self.lock_container_path else False
1532
        path = '/'.join((account, container))
1533
        node = self.node.node_lookup(path, for_update)
1534
        if node is None:
1535
            raise ItemNotExists('Container does not exist')
1536
        return path, node
1537

    
1538
    def _lookup_object(self, account, container, name, lock_container=False):
1539
        if lock_container:
1540
            self._lookup_container(account, container)
1541

    
1542
        path = '/'.join((account, container, name))
1543
        node = self.node.node_lookup(path)
1544
        if node is None:
1545
            raise ItemNotExists('Object does not exist')
1546
        return path, node
1547

    
1548
    def _lookup_objects(self, paths):
1549
        nodes = self.node.node_lookup_bulk(paths)
1550
        return paths, nodes
1551

    
1552
    def _get_properties(self, node, until=None):
1553
        """Return properties until the timestamp given."""
1554

    
1555
        before = until if until is not None else inf
1556
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1557
        if props is None and until is not None:
1558
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1559
        if props is None:
1560
            raise ItemNotExists('Path does not exist')
1561
        return props
1562

    
1563
    def _get_statistics(self, node, until=None, compute=False):
1564
        """Return (count, sum of size, timestamp) of everything under node."""
1565

    
1566
        if until is not None:
1567
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1568
        elif compute:
1569
            stats = self.node.statistics_latest(node,
1570
                                                except_cluster=CLUSTER_DELETED)
1571
        else:
1572
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1573
        if stats is None:
1574
            stats = (0, 0, 0)
1575
        return stats
1576

    
1577
    def _get_version(self, node, version=None):
1578
        if version is None:
1579
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1580
            if props is None:
1581
                raise ItemNotExists('Object does not exist')
1582
        else:
1583
            try:
1584
                version = int(version)
1585
            except ValueError:
1586
                raise VersionNotExists('Version does not exist')
1587
            props = self.node.version_get_properties(version, node=node)
1588
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1589
                raise VersionNotExists('Version does not exist')
1590
        return props
1591

    
1592
    def _get_versions(self, nodes):
1593
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1594

    
1595
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1596
                               type=None, hash=None, checksum=None,
1597
                               cluster=CLUSTER_NORMAL, is_copy=False,
1598
                               update_statistics_ancestors_depth=None):
1599
        """Create a new version of the node."""
1600

    
1601
        props = self.node.version_lookup(
1602
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1603
        if props is not None:
1604
            src_version_id = props[self.SERIAL]
1605
            src_hash = props[self.HASH]
1606
            src_size = props[self.SIZE]
1607
            src_type = props[self.TYPE]
1608
            src_checksum = props[self.CHECKSUM]
1609
        else:
1610
            src_version_id = None
1611
            src_hash = None
1612
            src_size = 0
1613
            src_type = ''
1614
            src_checksum = ''
1615
        if size is None:  # Set metadata.
1616
            hash = src_hash  # This way hash can be set to None
1617
                             # (account or container).
1618
            size = src_size
1619
        if type is None:
1620
            type = src_type
1621
        if checksum is None:
1622
            checksum = src_checksum
1623
        uuid = self._generate_uuid(
1624
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1625

    
1626
        if src_node is None:
1627
            pre_version_id = src_version_id
1628
        else:
1629
            pre_version_id = None
1630
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1631
            if props is not None:
1632
                pre_version_id = props[self.SERIAL]
1633
        if pre_version_id is not None:
1634
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1635
                                        update_statistics_ancestors_depth)
1636

    
1637
        dest_version_id, mtime = self.node.version_create(
1638
            node, hash, size, type, src_version_id, user, uuid, checksum,
1639
            cluster, update_statistics_ancestors_depth)
1640

    
1641
        self.node.attribute_unset_is_latest(node, dest_version_id)
1642

    
1643
        return pre_version_id, dest_version_id
1644

    
1645
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1646
                                node, meta, replace=False):
1647
        if src_version_id is not None:
1648
            self.node.attribute_copy(src_version_id, dest_version_id)
1649
        if not replace:
1650
            self.node.attribute_del(dest_version_id, domain, (
1651
                k for k, v in meta.iteritems() if v == ''))
1652
            self.node.attribute_set(dest_version_id, domain, node, (
1653
                (k, v) for k, v in meta.iteritems() if v != ''))
1654
        else:
1655
            self.node.attribute_del(dest_version_id, domain)
1656
            self.node.attribute_set(dest_version_id, domain, node, ((
1657
                k, v) for k, v in meta.iteritems()))
1658

    
1659
    def _put_metadata(self, user, node, domain, meta, replace=False,
1660
                      update_statistics_ancestors_depth=None):
1661
        """Create a new version and store metadata."""
1662

    
1663
        src_version_id, dest_version_id = self._put_version_duplicate(
1664
            user, node,
1665
            update_statistics_ancestors_depth=
1666
            update_statistics_ancestors_depth)
1667
        self._put_metadata_duplicate(
1668
            src_version_id, dest_version_id, domain, node, meta, replace)
1669
        return src_version_id, dest_version_id
1670

    
1671
    def _list_limits(self, listing, marker, limit):
1672
        start = 0
1673
        if marker:
1674
            try:
1675
                start = listing.index(marker) + 1
1676
            except ValueError:
1677
                pass
1678
        if not limit or limit > 10000:
1679
            limit = 10000
1680
        return start, limit
1681

    
1682
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1683
                                marker=None, limit=10000, virtual=True,
1684
                                domain=None, keys=None, until=None,
1685
                                size_range=None, allowed=None,
1686
                                all_props=False):
1687
        keys = keys or []
1688
        allowed = allowed or []
1689
        cont_prefix = path + '/'
1690
        prefix = cont_prefix + prefix
1691
        start = cont_prefix + marker if marker else None
1692
        before = until if until is not None else inf
1693
        filterq = keys if domain else []
1694
        sizeq = size_range
1695

    
1696
        objects, prefixes = self.node.latest_version_list(
1697
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1698
            allowed, domain, filterq, sizeq, all_props)
1699
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1700
        objects.sort(key=lambda x: x[0])
1701
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1702
        return objects
1703

    
1704
    # Reporting functions.
1705

    
1706
    @debug_method
1707
    @backend_method
1708
    def _report_size_change(self, user, account, size, details=None):
1709
        details = details or {}
1710

    
1711
        if size == 0:
1712
            return
1713

    
1714
        account_node = self._lookup_account(account, True)[1]
1715
        total = self._get_statistics(account_node, compute=True)[1]
1716
        details.update({'user': user, 'total': total})
1717
        self.messages.append(
1718
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1719
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1720

    
1721
        if not self.using_external_quotaholder:
1722
            return
1723

    
1724
        try:
1725
            name = details['path'] if 'path' in details else ''
1726
            serial = self.astakosclient.issue_one_commission(
1727
                holder=account,
1728
                source=DEFAULT_SOURCE,
1729
                provisions={'pithos.diskspace': size},
1730
                name=name)
1731
        except BaseException, e:
1732
            raise QuotaError(e)
1733
        else:
1734
            self.serials.append(serial)
1735

    
1736
    @debug_method
1737
    @backend_method
1738
    def _report_object_change(self, user, account, path, details=None):
1739
        details = details or {}
1740
        details.update({'user': user})
1741
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1742
                              account, QUEUE_INSTANCE_ID, 'object', path,
1743
                              details))
1744

    
1745
    @debug_method
1746
    @backend_method
1747
    def _report_sharing_change(self, user, account, path, details=None):
1748
        details = details or {}
1749
        details.update({'user': user})
1750
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1751
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1752
                              details))
1753

    
1754
    # Policy functions.
1755

    
1756
    def _check_policy(self, policy, is_account_policy=True):
1757
        default_policy = self.default_account_policy \
1758
            if is_account_policy else self.default_container_policy
1759
        for k in policy.keys():
1760
            if policy[k] == '':
1761
                policy[k] = default_policy.get(k)
1762
        for k, v in policy.iteritems():
1763
            if k == 'quota':
1764
                q = int(v)  # May raise ValueError.
1765
                if q < 0:
1766
                    raise ValueError
1767
            elif k == 'versioning':
1768
                if v not in ['auto', 'none']:
1769
                    raise ValueError
1770
            else:
1771
                raise ValueError
1772

    
1773
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1774
        default_policy = self.default_account_policy \
1775
            if is_account_policy else self.default_container_policy
1776
        if replace:
1777
            for k, v in default_policy.iteritems():
1778
                if k not in policy:
1779
                    policy[k] = v
1780
        self.node.policy_set(node, policy)
1781

    
1782
    def _get_policy(self, node, is_account_policy=True):
1783
        default_policy = self.default_account_policy \
1784
            if is_account_policy else self.default_container_policy
1785
        policy = default_policy.copy()
1786
        policy.update(self.node.policy_get(node))
1787
        return policy
1788

    
1789
    def _apply_versioning(self, account, container, version_id,
1790
                          update_statistics_ancestors_depth=None):
1791
        """Delete the provided version if such is the policy.
1792
           Return size of object removed.
1793
        """
1794

    
1795
        if version_id is None:
1796
            return 0
1797
        path, node = self._lookup_container(account, container)
1798
        versioning = self._get_policy(
1799
            node, is_account_policy=False)['versioning']
1800
        if versioning != 'auto':
1801
            hash, size = self.node.version_remove(
1802
                version_id, update_statistics_ancestors_depth)
1803
            self.store.map_delete(hash)
1804
            return size
1805
        elif self.free_versioning:
1806
            return self.node.version_get_properties(
1807
                version_id, keys=('size',))[0]
1808
        return 0
1809

    
1810
    # Access control functions.
1811

    
1812
    def _check_groups(self, groups):
1813
        # raise ValueError('Bad characters in groups')
1814
        pass
1815

    
1816
    def _check_permissions(self, path, permissions):
1817
        # raise ValueError('Bad characters in permissions')
1818
        pass
1819

    
1820
    def _get_formatted_paths(self, paths):
1821
        formatted = []
1822
        if len(paths) == 0:
1823
            return formatted
1824
        props = self.node.get_props(paths)
1825
        if props:
1826
            for prop in props:
1827
                if prop[1].split(';', 1)[0].strip() in (
1828
                        'application/directory', 'application/folder'):
1829
                    formatted.append((prop[0].rstrip('/') + '/',
1830
                                      self.MATCH_PREFIX))
1831
                formatted.append((prop[0], self.MATCH_EXACT))
1832
        return formatted
1833

    
1834
    def _get_permissions_path(self, account, container, name):
1835
        path = '/'.join((account, container, name))
1836
        permission_paths = self.permissions.access_inherit(path)
1837
        permission_paths.sort()
1838
        permission_paths.reverse()
1839
        for p in permission_paths:
1840
            if p == path:
1841
                return p
1842
            else:
1843
                if p.count('/') < 2:
1844
                    continue
1845
                node = self.node.node_lookup(p)
1846
                props = None
1847
                if node is not None:
1848
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1849
                if props is not None:
1850
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1851
                            'application/directory', 'application/folder'):
1852
                        return p
1853
        return None
1854

    
1855
    def _get_permissions_path_bulk(self, account, container, names):
1856
        formatted_paths = []
1857
        for name in names:
1858
            path = '/'.join((account, container, name))
1859
            formatted_paths.append(path)
1860
        permission_paths = self.permissions.access_inherit_bulk(
1861
            formatted_paths)
1862
        permission_paths.sort()
1863
        permission_paths.reverse()
1864
        permission_paths_list = []
1865
        lookup_list = []
1866
        for p in permission_paths:
1867
            if p in formatted_paths:
1868
                permission_paths_list.append(p)
1869
            else:
1870
                if p.count('/') < 2:
1871
                    continue
1872
                lookup_list.append(p)
1873

    
1874
        if len(lookup_list) > 0:
1875
            props = self.node.get_props(lookup_list)
1876
            if props:
1877
                for prop in props:
1878
                    if prop[1].split(';', 1)[0].strip() in (
1879
                            'application/directory', 'application/folder'):
1880
                        permission_paths_list.append(prop[0])
1881

    
1882
        if len(permission_paths_list) > 0:
1883
            return permission_paths_list
1884

    
1885
        return None
1886

    
1887
    def _can_read(self, user, account, container, name):
1888
        if user == account:
1889
            return True
1890
        path = '/'.join((account, container, name))
1891
        if self.permissions.public_get(path) is not None:
1892
            return True
1893
        path = self._get_permissions_path(account, container, name)
1894
        if not path:
1895
            raise NotAllowedError
1896
        if (not self.permissions.access_check(path, self.READ, user) and not
1897
                self.permissions.access_check(path, self.WRITE, user)):
1898
            raise NotAllowedError
1899

    
1900
    def _can_write(self, user, account, container, name):
1901
        if user == account:
1902
            return True
1903
        path = '/'.join((account, container, name))
1904
        path = self._get_permissions_path(account, container, name)
1905
        if not path:
1906
            raise NotAllowedError
1907
        if not self.permissions.access_check(path, self.WRITE, user):
1908
            raise NotAllowedError
1909

    
1910
    def _allowed_accounts(self, user):
1911
        allow = set()
1912
        for path in self.permissions.access_list_paths(user):
1913
            allow.add(path.split('/', 1)[0])
1914
        return sorted(allow)
1915

    
1916
    def _allowed_containers(self, user, account):
1917
        allow = set()
1918
        for path in self.permissions.access_list_paths(user, account):
1919
            allow.add(path.split('/', 2)[1])
1920
        return sorted(allow)
1921

    
1922
    # Domain functions
1923

    
1924
    @debug_method
1925
    @backend_method
1926
    def get_domain_objects(self, domain, user=None):
1927
        allowed_paths = self.permissions.access_list_paths(
1928
            user, include_owned=user is not None, include_containers=False)
1929
        if not allowed_paths:
1930
            return []
1931
        obj_list = self.node.domain_object_list(
1932
            domain, allowed_paths, CLUSTER_NORMAL)
1933
        return [(path,
1934
                 self._build_metadata(props, user_defined_meta),
1935
                 self.permissions.access_get(path)) for
1936
                path, props, user_defined_meta in obj_list]
1937

    
1938
    # util functions
1939

    
1940
    def _build_metadata(self, props, user_defined=None,
1941
                        include_user_defined=True):
1942
        meta = {'bytes': props[self.SIZE],
1943
                'type': props[self.TYPE],
1944
                'hash': props[self.HASH],
1945
                'version': props[self.SERIAL],
1946
                'version_timestamp': props[self.MTIME],
1947
                'modified_by': props[self.MUSER],
1948
                'uuid': props[self.UUID],
1949
                'checksum': props[self.CHECKSUM]}
1950
        if include_user_defined and user_defined is not None:
1951
            meta.update(user_defined)
1952
        return meta
1953

    
1954
    def _exists(self, node):
1955
        try:
1956
            self._get_version(node)
1957
        except ItemNotExists:
1958
            return False
1959
        else:
1960
            return True
1961

    
1962
    def _unhexlify_hash(self, hash):
1963
        try:
1964
            return binascii.unhexlify(hash)
1965
        except TypeError:
1966
            raise InvalidHash(hash)