Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 2ed320c3

History | View | Annotate | Download (77.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from pithos.backends.base import (
49
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
50
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
51
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
52
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
53
    InvalidHash, IllegalOperationError)
54

    
55

    
56
class DisabledAstakosClient(object):
57
    def __init__(self, *args, **kwargs):
58
        self.args = args
59
        self.kwargs = kwargs
60

    
61
    def __getattr__(self, name):
62
        m = ("AstakosClient has been disabled, "
63
             "yet an attempt to access it was made")
64
        raise AssertionError(m)
65

    
66

    
67
# Stripped-down version of the HashMap class found in tools.
68

    
69
class HashMap(list):
70

    
71
    def __init__(self, blocksize, blockhash):
72
        super(HashMap, self).__init__()
73
        self.blocksize = blocksize
74
        self.blockhash = blockhash
75

    
76
    def _hash_raw(self, v):
77
        h = hashlib.new(self.blockhash)
78
        h.update(v)
79
        return h.digest()
80

    
81
    def hash(self):
82
        if len(self) == 0:
83
            return self._hash_raw('')
84
        if len(self) == 1:
85
            return self.__getitem__(0)
86

    
87
        h = list(self)
88
        s = 2
89
        while s < len(h):
90
            s = s * 2
91
        h += [('\x00' * len(h[0]))] * (s - len(h))
92
        while len(h) > 1:
93
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
94
        return h[0]
95

    
96
# Default modules and settings.
97
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
98
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
99
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
100
DEFAULT_BLOCK_PATH = 'data/'
101
DEFAULT_BLOCK_UMASK = 0o022
102
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
103
DEFAULT_HASH_ALGORITHM = 'sha256'
104
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
105
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
106
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
107
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
108
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
109
                               'abcdefghijklmnopqrstuvwxyz'
110
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
111
DEFAULT_PUBLIC_URL_SECURITY = 16
112

    
113
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
114
QUEUE_CLIENT_ID = 'pithos'
115
QUEUE_INSTANCE_ID = '1'
116

    
117
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
118

    
119
inf = float('inf')
120

    
121
ULTIMATE_ANSWER = 42
122

    
123
DEFAULT_SOURCE = 'system'
124
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
125

    
126
logger = logging.getLogger(__name__)
127

    
128

    
129
def backend_method(func):
130
    @wraps(func)
131
    def wrapper(self, *args, **kw):
132
        # if we are inside a database transaction
133
        # just proceed with the method execution
134
        # otherwise manage a new transaction
135
        if self.in_transaction:
136
            return func(self, *args, **kw)
137

    
138
        try:
139
            self.pre_exec()
140
            result = func(self, *args, **kw)
141
            success_status = True
142
            return result
143
        except:
144
            success_status = False
145
            raise
146
        finally:
147
            self.post_exec(success_status)
148
    return wrapper
149

    
150

    
151
def debug_method(func):
152
    @wraps(func)
153
    def wrapper(self, *args, **kw):
154
        try:
155
            result = func(self, *args, **kw)
156
            return result
157
        except:
158
            result = format_exc()
159
            raise
160
        finally:
161
            all_args = map(repr, args)
162
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
163
            logger.debug(">>> %s(%s) <<< %s" % (
164
                func.__name__, ', '.join(all_args).rstrip(', '), result))
165
    return wrapper
166

    
167

    
168
def list_method(func):
169
    @wraps(func)
170
    def wrapper(self, *args, **kw):
171
        marker = kw.get('marker')
172
        limit = kw.get('limit')
173
        result = func(self, *args, **kw)
174
        start, limit = self._list_limits(result, marker, limit)
175
        return result[start:start + limit]
176
    return wrapper
177

    
178

    
179
class ModularBackend(BaseBackend):
180
    """A modular backend.
181

182
    Uses modules for SQL functions and storage.
183
    """
184

    
185
    def __init__(self, db_module=None, db_connection=None,
186
                 block_module=None, block_path=None, block_umask=None,
187
                 block_size=None, hash_algorithm=None,
188
                 queue_module=None, queue_hosts=None, queue_exchange=None,
189
                 astakos_auth_url=None, service_token=None,
190
                 astakosclient_poolsize=None,
191
                 free_versioning=True, block_params=None,
192
                 public_url_security=None,
193
                 public_url_alphabet=None,
194
                 account_quota_policy=None,
195
                 container_quota_policy=None,
196
                 container_versioning_policy=None):
197
        db_module = db_module or DEFAULT_DB_MODULE
198
        db_connection = db_connection or DEFAULT_DB_CONNECTION
199
        block_module = block_module or DEFAULT_BLOCK_MODULE
200
        block_path = block_path or DEFAULT_BLOCK_PATH
201
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
202
        block_params = block_params or DEFAULT_BLOCK_PARAMS
203
        block_size = block_size or DEFAULT_BLOCK_SIZE
204
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
205
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
206
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
207
        container_quota_policy = container_quota_policy \
208
            or DEFAULT_CONTAINER_QUOTA
209
        container_versioning_policy = container_versioning_policy \
210
            or DEFAULT_CONTAINER_VERSIONING
211

    
212
        self.default_account_policy = {'quota': account_quota_policy}
213
        self.default_container_policy = {
214
            'quota': container_quota_policy,
215
            'versioning': container_versioning_policy
216
        }
217
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
218
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
219

    
220
        self.public_url_security = (public_url_security or
221
                                    DEFAULT_PUBLIC_URL_SECURITY)
222
        self.public_url_alphabet = (public_url_alphabet or
223
                                    DEFAULT_PUBLIC_URL_ALPHABET)
224

    
225
        self.hash_algorithm = hash_algorithm
226
        self.block_size = block_size
227
        self.free_versioning = free_versioning
228

    
229
        def load_module(m):
230
            __import__(m)
231
            return sys.modules[m]
232

    
233
        self.db_module = load_module(db_module)
234
        self.wrapper = self.db_module.DBWrapper(db_connection)
235
        params = {'wrapper': self.wrapper}
236
        self.permissions = self.db_module.Permissions(**params)
237
        self.config = self.db_module.Config(**params)
238
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
239
        for x in ['READ', 'WRITE']:
240
            setattr(self, x, getattr(self.db_module, x))
241
        self.node = self.db_module.Node(**params)
242
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
243
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
244
                  'MATCH_PREFIX', 'MATCH_EXACT']:
245
            setattr(self, x, getattr(self.db_module, x))
246

    
247
        self.ALLOWED = ['read', 'write']
248

    
249
        self.block_module = load_module(block_module)
250
        self.block_params = block_params
251
        params = {'path': block_path,
252
                  'block_size': self.block_size,
253
                  'hash_algorithm': self.hash_algorithm,
254
                  'umask': block_umask}
255
        params.update(self.block_params)
256
        self.store = self.block_module.Store(**params)
257

    
258
        if queue_module and queue_hosts:
259
            self.queue_module = load_module(queue_module)
260
            params = {'hosts': queue_hosts,
261
                      'exchange': queue_exchange,
262
                      'client_id': QUEUE_CLIENT_ID}
263
            self.queue = self.queue_module.Queue(**params)
264
        else:
265
            class NoQueue:
266
                def send(self, *args):
267
                    pass
268

    
269
                def close(self):
270
                    pass
271

    
272
            self.queue = NoQueue()
273

    
274
        self.astakos_auth_url = astakos_auth_url
275
        self.service_token = service_token
276

    
277
        if not astakos_auth_url or not AstakosClient:
278
            self.astakosclient = DisabledAstakosClient(
279
                service_token, astakos_auth_url,
280
                use_pool=True,
281
                pool_size=astakosclient_poolsize)
282
        else:
283
            self.astakosclient = AstakosClient(
284
                service_token, astakos_auth_url,
285
                use_pool=True,
286
                pool_size=astakosclient_poolsize)
287

    
288
        self.serials = []
289
        self.messages = []
290

    
291
        self._move_object = partial(self._copy_object, is_move=True)
292

    
293
        self.lock_container_path = False
294

    
295
        self.in_transaction = False
296

    
297
    def pre_exec(self, lock_container_path=False):
298
        self.lock_container_path = lock_container_path
299
        self.wrapper.execute()
300
        self.serials = []
301
        self.in_transaction = True
302

    
303
    def post_exec(self, success_status=True):
304
        if success_status:
305
            # send messages produced
306
            for m in self.messages:
307
                self.queue.send(*m)
308

    
309
            # register serials
310
            if self.serials:
311
                self.commission_serials.insert_many(
312
                    self.serials)
313

    
314
                # commit to ensure that the serials are registered
315
                # even if resolve commission fails
316
                self.wrapper.commit()
317

    
318
                # start new transaction
319
                self.wrapper.execute()
320

    
321
                r = self.astakosclient.resolve_commissions(
322
                    accept_serials=self.serials,
323
                    reject_serials=[])
324
                self.commission_serials.delete_many(
325
                    r['accepted'])
326

    
327
            self.wrapper.commit()
328
        else:
329
            if self.serials:
330
                r = self.astakosclient.resolve_commissions(
331
                    accept_serials=[],
332
                    reject_serials=self.serials)
333
                self.commission_serials.delete_many(
334
                    r['rejected'])
335
            self.wrapper.rollback()
336
        self.in_transaction = False
337

    
338
    def close(self):
339
        self.wrapper.close()
340
        self.queue.close()
341

    
342
    @property
343
    def using_external_quotaholder(self):
344
        return not isinstance(self.astakosclient, DisabledAstakosClient)
345

    
346
    @debug_method
347
    @backend_method
348
    @list_method
349
    def list_accounts(self, user, marker=None, limit=10000):
350
        """Return a list of accounts the user can access."""
351

    
352
        return self._allowed_accounts(user)
353

    
354
    def _get_account_quotas(self, account):
355
        """Get account usage from astakos."""
356

    
357
        quotas = self.astakosclient.service_get_quotas(account)[account]
358
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
359
                                                  {})
360

    
361
    def _get_account_quotas(self, account):
362
        """Get account usage from astakos."""
363

    
364
        quotas = self.astakosclient.service_get_quotas(account)[account]
365
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
366
                                                  {})
367

    
368
    @debug_method
369
    @backend_method
370
    def get_account_meta(self, user, account, domain, until=None,
371
                         include_user_defined=True):
372
        """Return a dictionary with the account metadata for the domain."""
373

    
374
        path, node = self._lookup_account(account, user == account)
375
        if user != account:
376
            if until or (node is None) or (account not
377
                                           in self._allowed_accounts(user)):
378
                raise NotAllowedError
379
        try:
380
            props = self._get_properties(node, until)
381
            mtime = props[self.MTIME]
382
        except NameError:
383
            props = None
384
            mtime = until
385
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
386
        tstamp = max(tstamp, mtime)
387
        if until is None:
388
            modified = tstamp
389
        else:
390
            modified = self._get_statistics(
391
                node, compute=True)[2]  # Overall last modification.
392
            modified = max(modified, mtime)
393

    
394
        if user != account:
395
            meta = {'name': account}
396
        else:
397
            meta = {}
398
            if props is not None and include_user_defined:
399
                meta.update(
400
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
401
            if until is not None:
402
                meta.update({'until_timestamp': tstamp})
403
            meta.update({'name': account, 'count': count, 'bytes': bytes})
404
            if self.using_external_quotaholder:
405
                external_quota = self._get_account_quotas(account)
406
                meta['bytes'] = external_quota.get('usage', 0)
407
        meta.update({'modified': modified})
408
        return meta
409

    
410
    @debug_method
411
    @backend_method
412
    def update_account_meta(self, user, account, domain, meta, replace=False):
413
        """Update the metadata associated with the account for the domain."""
414

    
415
        if user != account:
416
            raise NotAllowedError
417
        path, node = self._lookup_account(account, True)
418
        self._put_metadata(user, node, domain, meta, replace,
419
                           update_statistics_ancestors_depth=-1)
420

    
421
    @debug_method
422
    @backend_method
423
    def get_account_groups(self, user, account):
424
        """Return a dictionary with the user groups defined for the account."""
425

    
426
        if user != account:
427
            if account not in self._allowed_accounts(user):
428
                raise NotAllowedError
429
            return {}
430
        self._lookup_account(account, True)
431
        return self.permissions.group_dict(account)
432

    
433
    @debug_method
434
    @backend_method
435
    def update_account_groups(self, user, account, groups, replace=False):
436
        """Update the groups associated with the account."""
437

    
438
        if user != account:
439
            raise NotAllowedError
440
        self._lookup_account(account, True)
441
        self._check_groups(groups)
442
        if replace:
443
            self.permissions.group_destroy(account)
444
        for k, v in groups.iteritems():
445
            if not replace:  # If not already deleted.
446
                self.permissions.group_delete(account, k)
447
            if v:
448
                self.permissions.group_addmany(account, k, v)
449

    
450
    @debug_method
451
    @backend_method
452
    def get_account_policy(self, user, account):
453
        """Return a dictionary with the account policy."""
454

    
455
        if user != account:
456
            if account not in self._allowed_accounts(user):
457
                raise NotAllowedError
458
            return {}
459
        path, node = self._lookup_account(account, True)
460
        policy = self._get_policy(node, is_account_policy=True)
461
        if self.using_external_quotaholder:
462
            external_quota = self._get_account_quotas(account)
463
            policy['quota'] = external_quota.get('limit', 0)
464
        return policy
465

    
466
    @debug_method
467
    @backend_method
468
    def update_account_policy(self, user, account, policy, replace=False):
469
        """Update the policy associated with the account."""
470

    
471
        if user != account:
472
            raise NotAllowedError
473
        path, node = self._lookup_account(account, True)
474
        self._check_policy(policy, is_account_policy=True)
475
        self._put_policy(node, policy, replace, is_account_policy=True)
476

    
477
    @debug_method
478
    @backend_method
479
    def put_account(self, user, account, policy=None):
480
        """Create a new account with the given name."""
481

    
482
        policy = policy or {}
483
        if user != account:
484
            raise NotAllowedError
485
        node = self.node.node_lookup(account)
486
        if node is not None:
487
            raise AccountExists('Account already exists')
488
        if policy:
489
            self._check_policy(policy, is_account_policy=True)
490
        node = self._put_path(user, self.ROOTNODE, account,
491
                              update_statistics_ancestors_depth=-1)
492
        self._put_policy(node, policy, True, is_account_policy=True)
493

    
494
    @debug_method
495
    @backend_method
496
    def delete_account(self, user, account):
497
        """Delete the account with the given name."""
498

    
499
        if user != account:
500
            raise NotAllowedError
501
        node = self.node.node_lookup(account)
502
        if node is None:
503
            return
504
        if not self.node.node_remove(node,
505
                                     update_statistics_ancestors_depth=-1):
506
            raise AccountNotEmpty('Account is not empty')
507
        self.permissions.group_destroy(account)
508

    
509
    @debug_method
510
    @backend_method
511
    @list_method
512
    def list_containers(self, user, account, marker=None, limit=10000,
513
                        shared=False, until=None, public=False):
514
        """Return a list of containers existing under an account."""
515

    
516
        if user != account:
517
            if until or account not in self._allowed_accounts(user):
518
                raise NotAllowedError
519
            return self._allowed_containers(user, account)
520
        if shared or public:
521
            allowed = set()
522
            if shared:
523
                allowed.update([x.split('/', 2)[1] for x in
524
                               self.permissions.access_list_shared(account)])
525
            if public:
526
                allowed.update([x[0].split('/', 2)[1] for x in
527
                               self.permissions.public_list(account)])
528
            return sorted(allowed)
529
        node = self.node.node_lookup(account)
530
        return [x[0] for x in self._list_object_properties(
531
            node, account, '', '/', marker, limit, False, None, [], until)]
532

    
533
    @debug_method
534
    @backend_method
535
    def list_container_meta(self, user, account, container, domain,
536
                            until=None):
537
        """Return a list of the container's object meta keys for a domain."""
538

    
539
        allowed = []
540
        if user != account:
541
            if until:
542
                raise NotAllowedError
543
            allowed = self.permissions.access_list_paths(
544
                user, '/'.join((account, container)))
545
            if not allowed:
546
                raise NotAllowedError
547
        path, node = self._lookup_container(account, container)
548
        before = until if until is not None else inf
549
        allowed = self._get_formatted_paths(allowed)
550
        return self.node.latest_attribute_keys(node, domain, before,
551
                                               CLUSTER_DELETED, allowed)
552

    
553
    @debug_method
554
    @backend_method
555
    def get_container_meta(self, user, account, container, domain, until=None,
556
                           include_user_defined=True):
557
        """Return a dictionary with the container metadata for the domain."""
558

    
559
        if user != account:
560
            if until or container not in self._allowed_containers(user,
561
                                                                  account):
562
                raise NotAllowedError
563
        path, node = self._lookup_container(account, container)
564
        props = self._get_properties(node, until)
565
        mtime = props[self.MTIME]
566
        count, bytes, tstamp = self._get_statistics(node, until)
567
        tstamp = max(tstamp, mtime)
568
        if until is None:
569
            modified = tstamp
570
        else:
571
            modified = self._get_statistics(
572
                node)[2]  # Overall last modification.
573
            modified = max(modified, mtime)
574

    
575
        if user != account:
576
            meta = {'name': container}
577
        else:
578
            meta = {}
579
            if include_user_defined:
580
                meta.update(
581
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
582
            if until is not None:
583
                meta.update({'until_timestamp': tstamp})
584
            meta.update({'name': container, 'count': count, 'bytes': bytes})
585
        meta.update({'modified': modified})
586
        return meta
587

    
588
    @debug_method
589
    @backend_method
590
    def update_container_meta(self, user, account, container, domain, meta,
591
                              replace=False):
592
        """Update the metadata associated with the container for the domain."""
593

    
594
        if user != account:
595
            raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597
        src_version_id, dest_version_id = self._put_metadata(
598
            user, node, domain, meta, replace,
599
            update_statistics_ancestors_depth=0)
600
        if src_version_id is not None:
601
            versioning = self._get_policy(
602
                node, is_account_policy=False)['versioning']
603
            if versioning != 'auto':
604
                self.node.version_remove(src_version_id,
605
                                         update_statistics_ancestors_depth=0)
606

    
607
    @debug_method
608
    @backend_method
609
    def get_container_policy(self, user, account, container):
610
        """Return a dictionary with the container policy."""
611

    
612
        if user != account:
613
            if container not in self._allowed_containers(user, account):
614
                raise NotAllowedError
615
            return {}
616
        path, node = self._lookup_container(account, container)
617
        return self._get_policy(node, is_account_policy=False)
618

    
619
    @debug_method
620
    @backend_method
621
    def update_container_policy(self, user, account, container, policy,
622
                                replace=False):
623
        """Update the policy associated with the container."""
624

    
625
        if user != account:
626
            raise NotAllowedError
627
        path, node = self._lookup_container(account, container)
628
        self._check_policy(policy, is_account_policy=False)
629
        self._put_policy(node, policy, replace, is_account_policy=False)
630

    
631
    @debug_method
632
    @backend_method
633
    def put_container(self, user, account, container, policy=None):
634
        """Create a new container with the given name."""
635

    
636
        policy = policy or {}
637
        if user != account:
638
            raise NotAllowedError
639
        try:
640
            path, node = self._lookup_container(account, container)
641
        except NameError:
642
            pass
643
        else:
644
            raise ContainerExists('Container already exists')
645
        if policy:
646
            self._check_policy(policy, is_account_policy=False)
647
        path = '/'.join((account, container))
648
        node = self._put_path(
649
            user, self._lookup_account(account, True)[1], path,
650
            update_statistics_ancestors_depth=-1)
651
        self._put_policy(node, policy, True, is_account_policy=False)
652

    
653
    @debug_method
654
    @backend_method
655
    def delete_container(self, user, account, container, until=None, prefix='',
656
                         delimiter=None):
657
        """Delete/purge the container with the given name."""
658

    
659
        if user != account:
660
            raise NotAllowedError
661
        path, node = self._lookup_container(account, container)
662

    
663
        if until is not None:
664
            hashes, size, serials = self.node.node_purge_children(
665
                node, until, CLUSTER_HISTORY,
666
                update_statistics_ancestors_depth=0)
667
            for h in hashes:
668
                self.store.map_delete(h)
669
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
670
                                          update_statistics_ancestors_depth=0)
671
            if not self.free_versioning:
672
                self._report_size_change(
673
                    user, account, -size, {
674
                        'action': 'container purge',
675
                        'path': path,
676
                        'versions': ','.join(str(i) for i in serials)
677
                    }
678
                )
679
            return
680

    
681
        if not delimiter:
682
            if self._get_statistics(node)[0] > 0:
683
                raise ContainerNotEmpty('Container is not empty')
684
            hashes, size, serials = self.node.node_purge_children(
685
                node, inf, CLUSTER_HISTORY,
686
                update_statistics_ancestors_depth=0)
687
            for h in hashes:
688
                self.store.map_delete(h)
689
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
690
                                          update_statistics_ancestors_depth=0)
691
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
692
            if not self.free_versioning:
693
                self._report_size_change(
694
                    user, account, -size, {
695
                        'action': 'container purge',
696
                        'path': path,
697
                        'versions': ','.join(str(i) for i in serials)
698
                    }
699
                )
700
        else:
701
            # remove only contents
702
            src_names = self._list_objects_no_limit(
703
                user, account, container, prefix='', delimiter=None,
704
                virtual=False, domain=None, keys=[], shared=False, until=None,
705
                size_range=None, all_props=True, public=False)
706
            paths = []
707
            for t in src_names:
708
                path = '/'.join((account, container, t[0]))
709
                node = t[2]
710
                if not self._exists(node):
711
                    continue
712
                src_version_id, dest_version_id = self._put_version_duplicate(
713
                    user, node, size=0, type='', hash=None, checksum='',
714
                    cluster=CLUSTER_DELETED,
715
                    update_statistics_ancestors_depth=1)
716
                del_size = self._apply_versioning(
717
                    account, container, src_version_id,
718
                    update_statistics_ancestors_depth=1)
719
                self._report_size_change(
720
                    user, account, -del_size, {
721
                        'action': 'object delete',
722
                        'path': path,
723
                        'versions': ','.join([str(dest_version_id)])})
724
                self._report_object_change(
725
                    user, account, path, details={'action': 'object delete'})
726
                paths.append(path)
727
            self.permissions.access_clear_bulk(paths)
728

    
729
    def _list_objects(self, user, account, container, prefix, delimiter,
730
                      marker, limit, virtual, domain, keys, shared, until,
731
                      size_range, all_props, public):
732
        if user != account and until:
733
            raise NotAllowedError
734

    
735
        objects = []
736
        if shared and public:
737
            # get shared first
738
            shared_paths = self._list_object_permissions(
739
                user, account, container, prefix, shared=True, public=False)
740
            if shared_paths:
741
                path, node = self._lookup_container(account, container)
742
                shared_paths = self._get_formatted_paths(shared_paths)
743
                objects = set(self._list_object_properties(
744
                    node, path, prefix, delimiter, marker, limit, virtual,
745
                    domain, keys, until, size_range, shared_paths, all_props))
746

    
747
            # get public
748
            objects |= set(self._list_public_object_properties(
749
                user, account, container, prefix, all_props))
750
            objects = list(objects)
751

    
752
            objects.sort(key=lambda x: x[0])
753
        elif public:
754
            objects = self._list_public_object_properties(
755
                user, account, container, prefix, all_props)
756
        else:
757
            allowed = self._list_object_permissions(
758
                user, account, container, prefix, shared, public=False)
759
            if shared and not allowed:
760
                return []
761
            path, node = self._lookup_container(account, container)
762
            allowed = self._get_formatted_paths(allowed)
763
            objects = self._list_object_properties(
764
                node, path, prefix, delimiter, marker, limit, virtual, domain,
765
                keys, until, size_range, allowed, all_props)
766

    
767
        # apply limits
768
        start, limit = self._list_limits(objects, marker, limit)
769
        return objects[start:start + limit]
770

    
771
    def _list_public_object_properties(self, user, account, container, prefix,
772
                                       all_props):
773
        public = self._list_object_permissions(
774
            user, account, container, prefix, shared=False, public=True)
775
        paths, nodes = self._lookup_objects(public)
776
        path = '/'.join((account, container))
777
        cont_prefix = path + '/'
778
        paths = [x[len(cont_prefix):] for x in paths]
779
        objects = [(p,) + props for p, props in
780
                   zip(paths, self.node.version_lookup_bulk(
781
                       nodes, all_props=all_props, order_by_path=True))]
782
        return objects
783

    
784
    def _list_objects_no_limit(self, user, account, container, prefix,
785
                               delimiter, virtual, domain, keys, shared, until,
786
                               size_range, all_props, public):
787
        objects = []
788
        while True:
789
            marker = objects[-1] if objects else None
790
            limit = 10000
791
            l = self._list_objects(
792
                user, account, container, prefix, delimiter, marker, limit,
793
                virtual, domain, keys, shared, until, size_range, all_props,
794
                public)
795
            objects.extend(l)
796
            if not l or len(l) < limit:
797
                break
798
        return objects
799

    
800
    def _list_object_permissions(self, user, account, container, prefix,
801
                                 shared, public):
802
        allowed = []
803
        path = '/'.join((account, container, prefix)).rstrip('/')
804
        if user != account:
805
            allowed = self.permissions.access_list_paths(user, path)
806
            if not allowed:
807
                raise NotAllowedError
808
        else:
809
            allowed = set()
810
            if shared:
811
                allowed.update(self.permissions.access_list_shared(path))
812
            if public:
813
                allowed.update(
814
                    [x[0] for x in self.permissions.public_list(path)])
815
            allowed = sorted(allowed)
816
            if not allowed:
817
                return []
818
        return allowed
819

    
820
    @debug_method
821
    @backend_method
822
    def list_objects(self, user, account, container, prefix='', delimiter=None,
823
                     marker=None, limit=10000, virtual=True, domain=None,
824
                     keys=None, shared=False, until=None, size_range=None,
825
                     public=False):
826
        """List (object name, object version_id) under a container."""
827

    
828
        keys = keys or []
829
        return self._list_objects(
830
            user, account, container, prefix, delimiter, marker, limit,
831
            virtual, domain, keys, shared, until, size_range, False, public)
832

    
833
    @debug_method
834
    @backend_method
835
    def list_object_meta(self, user, account, container, prefix='',
836
                         delimiter=None, marker=None, limit=10000,
837
                         virtual=True, domain=None, keys=None, shared=False,
838
                         until=None, size_range=None, public=False):
839
        """Return a list of metadata dicts of objects under a container."""
840

    
841
        keys = keys or []
842
        props = self._list_objects(
843
            user, account, container, prefix, delimiter, marker, limit,
844
            virtual, domain, keys, shared, until, size_range, True, public)
845
        objects = []
846
        for p in props:
847
            if len(p) == 2:
848
                objects.append({'subdir': p[0]})
849
            else:
850
                objects.append({
851
                    'name': p[0],
852
                    'bytes': p[self.SIZE + 1],
853
                    'type': p[self.TYPE + 1],
854
                    'hash': p[self.HASH + 1],
855
                    'version': p[self.SERIAL + 1],
856
                    'version_timestamp': p[self.MTIME + 1],
857
                    'modified': p[self.MTIME + 1] if until is None else None,
858
                    'modified_by': p[self.MUSER + 1],
859
                    'uuid': p[self.UUID + 1],
860
                    'checksum': p[self.CHECKSUM + 1]})
861
        return objects
862

    
863
    @debug_method
864
    @backend_method
865
    def list_object_permissions(self, user, account, container, prefix=''):
866
        """Return a list of paths enforce permissions under a container."""
867

    
868
        return self._list_object_permissions(user, account, container, prefix,
869
                                             True, False)
870

    
871
    @debug_method
872
    @backend_method
873
    def list_object_public(self, user, account, container, prefix=''):
874
        """Return a mapping of object paths to public ids under a container."""
875

    
876
        public = {}
877
        for path, p in self.permissions.public_list('/'.join((account,
878
                                                              container,
879
                                                              prefix))):
880
            public[path] = p
881
        return public
882

    
883
    @debug_method
884
    @backend_method
885
    def get_object_meta(self, user, account, container, name, domain,
886
                        version=None, include_user_defined=True):
887
        """Return a dictionary with the object metadata for the domain."""
888

    
889
        self._can_read(user, account, container, name)
890
        path, node = self._lookup_object(account, container, name)
891
        props = self._get_version(node, version)
892
        if version is None:
893
            modified = props[self.MTIME]
894
        else:
895
            try:
896
                modified = self._get_version(
897
                    node)[self.MTIME]  # Overall last modification.
898
            except NameError:  # Object may be deleted.
899
                del_props = self.node.version_lookup(
900
                    node, inf, CLUSTER_DELETED)
901
                if del_props is None:
902
                    raise ItemNotExists('Object does not exist')
903
                modified = del_props[self.MTIME]
904

    
905
        meta = {}
906
        if include_user_defined:
907
            meta.update(
908
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
909
        meta.update({'name': name,
910
                     'bytes': props[self.SIZE],
911
                     'type': props[self.TYPE],
912
                     'hash': props[self.HASH],
913
                     'version': props[self.SERIAL],
914
                     'version_timestamp': props[self.MTIME],
915
                     'modified': modified,
916
                     'modified_by': props[self.MUSER],
917
                     'uuid': props[self.UUID],
918
                     'checksum': props[self.CHECKSUM]})
919
        return meta
920

    
921
    @debug_method
922
    @backend_method
923
    def update_object_meta(self, user, account, container, name, domain, meta,
924
                           replace=False):
925
        """Update object metadata for a domain and return the new version."""
926

    
927
        self._can_write(user, account, container, name)
928

    
929
        path, node = self._lookup_object(account, container, name,
930
                                         lock_container=True)
931
        src_version_id, dest_version_id = self._put_metadata(
932
            user, node, domain, meta, replace,
933
            update_statistics_ancestors_depth=1)
934
        self._apply_versioning(account, container, src_version_id,
935
                               update_statistics_ancestors_depth=1)
936
        return dest_version_id
937

    
938
    @debug_method
939
    @backend_method
940
    def get_object_permissions_bulk(self, user, account, container, names):
941
        """Return the action allowed on the object, the path
942
        from which the object gets its permissions from,
943
        along with a dictionary containing the permissions."""
944

    
945
        permissions_path = self._get_permissions_path_bulk(account, container,
946
                                                           names)
947
        access_objects = self.permissions.access_check_bulk(permissions_path,
948
                                                            user)
949
        #group_parents = access_objects['group_parents']
950
        nobject_permissions = {}
951
        cpath = '/'.join((account, container, ''))
952
        cpath_idx = len(cpath)
953
        for path in permissions_path:
954
            allowed = 1
955
            name = path[cpath_idx:]
956
            if user != account:
957
                try:
958
                    allowed = access_objects[path]
959
                except KeyError:
960
                    raise NotAllowedError
961
            access_dict, allowed = \
962
                self.permissions.access_get_for_bulk(access_objects[path])
963
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
964
                                         access_dict)
965
        self._lookup_objects(permissions_path)
966
        return nobject_permissions
967

    
968
    @debug_method
969
    @backend_method
970
    def get_object_permissions(self, user, account, container, name):
971
        """Return the action allowed on the object, the path
972
        from which the object gets its permissions from,
973
        along with a dictionary containing the permissions."""
974

    
975
        allowed = 'write'
976
        permissions_path = self._get_permissions_path(account, container, name)
977
        if user != account:
978
            if self.permissions.access_check(permissions_path, self.WRITE,
979
                                             user):
980
                allowed = 'write'
981
            elif self.permissions.access_check(permissions_path, self.READ,
982
                                               user):
983
                allowed = 'read'
984
            else:
985
                raise NotAllowedError
986
        self._lookup_object(account, container, name)
987
        return (allowed,
988
                permissions_path,
989
                self.permissions.access_get(permissions_path))
990

    
991
    @debug_method
992
    @backend_method
993
    def update_object_permissions(self, user, account, container, name,
994
                                  permissions):
995
        """Update the permissions associated with the object."""
996

    
997
        if user != account:
998
            raise NotAllowedError
999
        path = self._lookup_object(account, container, name,
1000
                                   lock_container=True)[0]
1001
        self._check_permissions(path, permissions)
1002
        try:
1003
            self.permissions.access_set(path, permissions)
1004
        except:
1005
            raise ValueError
1006
        else:
1007
            self._report_sharing_change(user, account, path, {'members':
1008
                                        self.permissions.access_members(path)})
1009

    
1010
    @debug_method
1011
    @backend_method
1012
    def get_object_public(self, user, account, container, name):
1013
        """Return the public id of the object if applicable."""
1014

    
1015
        self._can_read(user, account, container, name)
1016
        path = self._lookup_object(account, container, name)[0]
1017
        p = self.permissions.public_get(path)
1018
        return p
1019

    
1020
    @debug_method
1021
    @backend_method
1022
    def update_object_public(self, user, account, container, name, public):
1023
        """Update the public status of the object."""
1024

    
1025
        self._can_write(user, account, container, name)
1026
        path = self._lookup_object(account, container, name,
1027
                                   lock_container=True)[0]
1028
        if not public:
1029
            self.permissions.public_unset(path)
1030
        else:
1031
            self.permissions.public_set(
1032
                path, self.public_url_security, self.public_url_alphabet)
1033

    
1034
    @debug_method
1035
    @backend_method
1036
    def get_object_hashmap(self, user, account, container, name, version=None):
1037
        """Return the object's size and a list with partial hashes."""
1038

    
1039
        self._can_read(user, account, container, name)
1040
        path, node = self._lookup_object(account, container, name)
1041
        props = self._get_version(node, version)
1042
        if props[self.HASH] is None:
1043
            return 0, ()
1044
        if props[self.HASH].startswith('archip:'):
1045
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1046
                                                     props[self.SIZE])
1047
            return props[self.SIZE], [x for x in hashmap]
1048
        else:
1049
            hashmap = self.store.map_get(self._unhexlify_hash(
1050
                props[self.HASH]))
1051
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1052

    
1053
    def _update_object_hash(self, user, account, container, name, size, type,
1054
                            hash, checksum, domain, meta, replace_meta,
1055
                            permissions, src_node=None, src_version_id=None,
1056
                            is_copy=False, report_size_change=True):
1057
        if permissions is not None and user != account:
1058
            raise NotAllowedError
1059
        self._can_write(user, account, container, name)
1060
        if permissions is not None:
1061
            path = '/'.join((account, container, name))
1062
            self._check_permissions(path, permissions)
1063

    
1064
        account_path, account_node = self._lookup_account(account, True)
1065
        container_path, container_node = self._lookup_container(
1066
            account, container)
1067

    
1068
        path, node = self._put_object_node(
1069
            container_path, container_node, name)
1070
        pre_version_id, dest_version_id = self._put_version_duplicate(
1071
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1072
            checksum=checksum, is_copy=is_copy,
1073
            update_statistics_ancestors_depth=1)
1074

    
1075
        # Handle meta.
1076
        if src_version_id is None:
1077
            src_version_id = pre_version_id
1078
        self._put_metadata_duplicate(
1079
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1080

    
1081
        del_size = self._apply_versioning(account, container, pre_version_id,
1082
                                          update_statistics_ancestors_depth=1)
1083
        size_delta = size - del_size
1084
        if size_delta > 0:
1085
            # Check account quota.
1086
            if not self.using_external_quotaholder:
1087
                account_quota = long(self._get_policy(
1088
                    account_node, is_account_policy=True)['quota'])
1089
                account_usage = self._get_statistics(account_node,
1090
                                                     compute=True)[1]
1091
                if (account_quota > 0 and account_usage > account_quota):
1092
                    raise QuotaError(
1093
                        'Account quota exceeded: limit: %s, usage: %s' % (
1094
                            account_quota, account_usage))
1095

    
1096
            # Check container quota.
1097
            container_quota = long(self._get_policy(
1098
                container_node, is_account_policy=False)['quota'])
1099
            container_usage = self._get_statistics(container_node)[1]
1100
            if (container_quota > 0 and container_usage > container_quota):
1101
                # This must be executed in a transaction, so the version is
1102
                # never created if it fails.
1103
                raise QuotaError(
1104
                    'Container quota exceeded: limit: %s, usage: %s' % (
1105
                        container_quota, container_usage
1106
                    )
1107
                )
1108

    
1109
        if report_size_change:
1110
            self._report_size_change(
1111
                user, account, size_delta,
1112
                {'action': 'object update', 'path': path,
1113
                 'versions': ','.join([str(dest_version_id)])})
1114
        if permissions is not None:
1115
            self.permissions.access_set(path, permissions)
1116
            self._report_sharing_change(
1117
                user, account, path,
1118
                {'members': self.permissions.access_members(path)})
1119

    
1120
        self._report_object_change(
1121
            user, account, path,
1122
            details={'version': dest_version_id, 'action': 'object update'})
1123
        return dest_version_id
1124

    
1125
    @debug_method
1126
    @backend_method
1127
    def register_object_map(self, user, account, container, name, size, type,
1128
                            mapfile, checksum='', domain='pithos', meta=None,
1129
                            replace_meta=False, permissions=None):
1130
        """Register an object mapfile without providing any data.
1131

1132
        Lock the container path, create a node pointing to the object path,
1133
        create a version pointing to the mapfile
1134
        and issue the size change in the quotaholder.
1135

1136
        :param user: the user account which performs the action
1137

1138
        :param account: the account under which the object resides
1139

1140
        :param container: the container under which the object resides
1141

1142
        :param name: the object name
1143

1144
        :param size: the object size
1145

1146
        :param type: the object mimetype
1147

1148
        :param mapfile: the mapfile pointing to the object data
1149

1150
        :param checkcum: the md5 checksum (optional)
1151

1152
        :param domain: the object domain
1153

1154
        :param meta: a dict with custom object metadata
1155

1156
        :param replace_meta: replace existing metadata or not
1157

1158
        :param permissions: a dict with the read and write object permissions
1159

1160
        :returns: the new object uuid
1161

1162
        :raises: ItemNotExists, NotAllowedError, QuotaError
1163
        """
1164

    
1165
        meta = meta or {}
1166
        try:
1167
            self.lock_container_path = True
1168
            self.put_container(user, account, container, policy=None)
1169
        except ContainerExists:
1170
            pass
1171
        finally:
1172
            self.lock_container_path = False
1173
        dest_version_id = self._update_object_hash(
1174
            user, account, container, name, size, type, mapfile, checksum,
1175
            domain, meta, replace_meta, permissions)
1176
        return self.node.version_get_properties(dest_version_id,
1177
                                                keys=('uuid',))[0]
1178

    
1179
    @debug_method
1180
    def update_object_hashmap(self, user, account, container, name, size, type,
1181
                              hashmap, checksum, domain, meta=None,
1182
                              replace_meta=False, permissions=None):
1183
        """Create/update an object's hashmap and return the new version."""
1184

    
1185
        for h in hashmap:
1186
            if h.startswith('archip_'):
1187
                raise IllegalOperationError(
1188
                    'Cannot update Archipelago Volume hashmap.')
1189
        meta = meta or {}
1190
        if size == 0:  # No such thing as an empty hashmap.
1191
            hashmap = [self.put_block('')]
1192
        map = HashMap(self.block_size, self.hash_algorithm)
1193
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1194
        missing = self.store.block_search(map)
1195
        if missing:
1196
            ie = IndexError()
1197
            ie.data = [binascii.hexlify(x) for x in missing]
1198
            raise ie
1199

    
1200
        hash = map.hash()
1201
        hexlified = binascii.hexlify(hash)
1202
        # _update_object_hash() locks destination path
1203
        dest_version_id = self._update_object_hash(
1204
            user, account, container, name, size, type, hexlified, checksum,
1205
            domain, meta, replace_meta, permissions)
1206
        self.store.map_put(hash, map)
1207
        return dest_version_id, hexlified
1208

    
1209
    @debug_method
1210
    @backend_method
1211
    def update_object_checksum(self, user, account, container, name, version,
1212
                               checksum):
1213
        """Update an object's checksum."""
1214

    
1215
        # Update objects with greater version and same hashmap
1216
        # and size (fix metadata updates).
1217
        self._can_write(user, account, container, name)
1218
        path, node = self._lookup_object(account, container, name,
1219
                                         lock_container=True)
1220
        props = self._get_version(node, version)
1221
        versions = self.node.node_get_versions(node)
1222
        for x in versions:
1223
            if (x[self.SERIAL] >= int(version) and
1224
                x[self.HASH] == props[self.HASH] and
1225
                    x[self.SIZE] == props[self.SIZE]):
1226
                self.node.version_put_property(
1227
                    x[self.SERIAL], 'checksum', checksum)
1228

    
1229
    def _copy_object(self, user, src_account, src_container, src_name,
1230
                     dest_account, dest_container, dest_name, type,
1231
                     dest_domain=None, dest_meta=None, replace_meta=False,
1232
                     permissions=None, src_version=None, is_move=False,
1233
                     delimiter=None):
1234

    
1235
        report_size_change = not is_move
1236
        dest_meta = dest_meta or {}
1237
        dest_version_ids = []
1238
        self._can_read(user, src_account, src_container, src_name)
1239

    
1240
        src_container_path = '/'.join((src_account, src_container))
1241
        dest_container_path = '/'.join((dest_account, dest_container))
1242
        # Lock container paths in alphabetical order
1243
        if src_container_path < dest_container_path:
1244
            self._lookup_container(src_account, src_container)
1245
            self._lookup_container(dest_account, dest_container)
1246
        else:
1247
            self._lookup_container(dest_account, dest_container)
1248
            self._lookup_container(src_account, src_container)
1249

    
1250
        path, node = self._lookup_object(src_account, src_container, src_name)
1251
        # TODO: Will do another fetch of the properties in duplicate version...
1252
        props = self._get_version(
1253
            node, src_version)  # Check to see if source exists.
1254
        src_version_id = props[self.SERIAL]
1255
        hash = props[self.HASH]
1256
        size = props[self.SIZE]
1257
        is_copy = not is_move and (src_account, src_container, src_name) != (
1258
            dest_account, dest_container, dest_name)  # New uuid.
1259
        dest_version_ids.append(self._update_object_hash(
1260
            user, dest_account, dest_container, dest_name, size, type, hash,
1261
            None, dest_domain, dest_meta, replace_meta, permissions,
1262
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1263
            report_size_change=report_size_change))
1264
        if is_move and ((src_account, src_container, src_name) !=
1265
                        (dest_account, dest_container, dest_name)):
1266
            self._delete_object(user, src_account, src_container, src_name,
1267
                                report_size_change=report_size_change)
1268

    
1269
        if delimiter:
1270
            prefix = (src_name + delimiter if not
1271
                      src_name.endswith(delimiter) else src_name)
1272
            src_names = self._list_objects_no_limit(
1273
                user, src_account, src_container, prefix, delimiter=None,
1274
                virtual=False, domain=None, keys=[], shared=False, until=None,
1275
                size_range=None, all_props=True, public=False)
1276
            src_names.sort(key=lambda x: x[2])  # order by nodes
1277
            paths = [elem[0] for elem in src_names]
1278
            nodes = [elem[2] for elem in src_names]
1279
            # TODO: Will do another fetch of the properties
1280
            # in duplicate version...
1281
            props = self._get_versions(nodes)  # Check to see if source exists.
1282

    
1283
            for prop, path, node in zip(props, paths, nodes):
1284
                src_version_id = prop[self.SERIAL]
1285
                hash = prop[self.HASH]
1286
                vtype = prop[self.TYPE]
1287
                size = prop[self.SIZE]
1288
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1289
                    delimiter) else dest_name
1290
                vdest_name = path.replace(prefix, dest_prefix, 1)
1291
                # _update_object_hash() locks destination path
1292
                dest_version_ids.append(self._update_object_hash(
1293
                    user, dest_account, dest_container, vdest_name, size,
1294
                    vtype, hash, None, dest_domain, meta={},
1295
                    replace_meta=False, permissions=None, src_node=node,
1296
                    src_version_id=src_version_id, is_copy=is_copy,
1297
                    report_size_change=report_size_change))
1298
                if is_move and ((src_account, src_container, src_name) !=
1299
                                (dest_account, dest_container, dest_name)):
1300
                    self._delete_object(user, src_account, src_container, path,
1301
                                        report_size_change=report_size_change)
1302
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1303
                dest_version_ids)
1304

    
1305
    @debug_method
1306
    @backend_method
1307
    def copy_object(self, user, src_account, src_container, src_name,
1308
                    dest_account, dest_container, dest_name, type, domain,
1309
                    meta=None, replace_meta=False, permissions=None,
1310
                    src_version=None, delimiter=None):
1311
        """Copy an object's data and metadata."""
1312

    
1313
        meta = meta or {}
1314
        dest_version_id = self._copy_object(
1315
            user, src_account, src_container, src_name, dest_account,
1316
            dest_container, dest_name, type, domain, meta, replace_meta,
1317
            permissions, src_version, False, delimiter)
1318
        return dest_version_id
1319

    
1320
    @debug_method
1321
    @backend_method
1322
    def move_object(self, user, src_account, src_container, src_name,
1323
                    dest_account, dest_container, dest_name, type, domain,
1324
                    meta=None, replace_meta=False, permissions=None,
1325
                    delimiter=None):
1326
        """Move an object's data and metadata."""
1327

    
1328
        meta = meta or {}
1329
        if user != src_account:
1330
            raise NotAllowedError
1331
        dest_version_id = self._move_object(
1332
            user, src_account, src_container, src_name, dest_account,
1333
            dest_container, dest_name, type, domain, meta, replace_meta,
1334
            permissions, None, delimiter=delimiter)
1335
        return dest_version_id
1336

    
1337
    def _delete_object(self, user, account, container, name, until=None,
1338
                       delimiter=None, report_size_change=True):
1339
        if user != account:
1340
            raise NotAllowedError
1341

    
1342
        # lookup object and lock container path also
1343
        path, node = self._lookup_object(account, container, name,
1344
                                         lock_container=True)
1345

    
1346
        if until is not None:
1347
            if node is None:
1348
                return
1349
            hashes = []
1350
            size = 0
1351
            serials = []
1352
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1353
                                           update_statistics_ancestors_depth=1)
1354
            hashes += h
1355
            size += s
1356
            serials += v
1357
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1358
                                           update_statistics_ancestors_depth=1)
1359
            hashes += h
1360
            if not self.free_versioning:
1361
                size += s
1362
            serials += v
1363
            for h in hashes:
1364
                self.store.map_delete(h)
1365
            self.node.node_purge(node, until, CLUSTER_DELETED,
1366
                                 update_statistics_ancestors_depth=1)
1367
            try:
1368
                self._get_version(node)
1369
            except NameError:
1370
                self.permissions.access_clear(path)
1371
            self._report_size_change(
1372
                user, account, -size, {
1373
                    'action': 'object purge',
1374
                    'path': path,
1375
                    'versions': ','.join(str(i) for i in serials)
1376
                }
1377
            )
1378
            return
1379

    
1380
        if not self._exists(node):
1381
            raise ItemNotExists('Object is deleted.')
1382

    
1383
        src_version_id, dest_version_id = self._put_version_duplicate(
1384
            user, node, size=0, type='', hash=None, checksum='',
1385
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1386
        del_size = self._apply_versioning(account, container, src_version_id,
1387
                                          update_statistics_ancestors_depth=1)
1388
        if report_size_change:
1389
            self._report_size_change(
1390
                user, account, -del_size,
1391
                {'action': 'object delete',
1392
                 'path': path,
1393
                 'versions': ','.join([str(dest_version_id)])})
1394
        self._report_object_change(
1395
            user, account, path, details={'action': 'object delete'})
1396
        self.permissions.access_clear(path)
1397

    
1398
        if delimiter:
1399
            prefix = name + delimiter if not name.endswith(delimiter) else name
1400
            src_names = self._list_objects_no_limit(
1401
                user, account, container, prefix, delimiter=None,
1402
                virtual=False, domain=None, keys=[], shared=False, until=None,
1403
                size_range=None, all_props=True, public=False)
1404
            paths = []
1405
            for t in src_names:
1406
                path = '/'.join((account, container, t[0]))
1407
                node = t[2]
1408
                if not self._exists(node):
1409
                    continue
1410
                src_version_id, dest_version_id = self._put_version_duplicate(
1411
                    user, node, size=0, type='', hash=None, checksum='',
1412
                    cluster=CLUSTER_DELETED,
1413
                    update_statistics_ancestors_depth=1)
1414
                del_size = self._apply_versioning(
1415
                    account, container, src_version_id,
1416
                    update_statistics_ancestors_depth=1)
1417
                if report_size_change:
1418
                    self._report_size_change(
1419
                        user, account, -del_size,
1420
                        {'action': 'object delete',
1421
                         'path': path,
1422
                         'versions': ','.join([str(dest_version_id)])})
1423
                self._report_object_change(
1424
                    user, account, path, details={'action': 'object delete'})
1425
                paths.append(path)
1426
            self.permissions.access_clear_bulk(paths)
1427

    
1428
    @debug_method
1429
    @backend_method
1430
    def delete_object(self, user, account, container, name, until=None,
1431
                      prefix='', delimiter=None):
1432
        """Delete/purge an object."""
1433

    
1434
        self._delete_object(user, account, container, name, until, delimiter)
1435

    
1436
    @debug_method
1437
    @backend_method
1438
    def list_versions(self, user, account, container, name):
1439
        """Return a list of all object (version, version_timestamp) tuples."""
1440

    
1441
        self._can_read(user, account, container, name)
1442
        path, node = self._lookup_object(account, container, name)
1443
        versions = self.node.node_get_versions(node)
1444
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1445
                x[self.CLUSTER] != CLUSTER_DELETED]
1446

    
1447
    @debug_method
1448
    @backend_method
1449
    def get_uuid(self, user, uuid, check_permissions=True):
1450
        """Return the (account, container, name) for the UUID given."""
1451

    
1452
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1453
        if info is None:
1454
            raise NameError
1455
        path, serial = info
1456
        account, container, name = path.split('/', 2)
1457
        if check_permissions:
1458
            self._can_read(user, account, container, name)
1459
        return (account, container, name)
1460

    
1461
    @debug_method
1462
    @backend_method
1463
    def get_public(self, user, public):
1464
        """Return the (account, container, name) for the public id given."""
1465

    
1466
        path = self.permissions.public_path(public)
1467
        if path is None:
1468
            raise NameError
1469
        account, container, name = path.split('/', 2)
1470
        self._can_read(user, account, container, name)
1471
        return (account, container, name)
1472

    
1473
    def get_block(self, hash):
1474
        """Return a block's data."""
1475

    
1476
        logger.debug("get_block: %s", hash)
1477
        if hash.startswith('archip_'):
1478
            block = self.store.block_get_archipelago(hash)
1479
        else:
1480
            block = self.store.block_get(self._unhexlify_hash(hash))
1481
        if not block:
1482
            raise ItemNotExists('Block does not exist')
1483
        return block
1484

    
1485
    def put_block(self, data):
1486
        """Store a block and return the hash."""
1487

    
1488
        logger.debug("put_block: %s", len(data))
1489
        return binascii.hexlify(self.store.block_put(data))
1490

    
1491
    def update_block(self, hash, data, offset=0):
1492
        """Update a known block and return the hash."""
1493

    
1494
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1495
        if hash.startswith('archip_'):
1496
            raise IllegalOperationError(
1497
                'Cannot update an Archipelago Volume block.')
1498
        if offset == 0 and len(data) == self.block_size:
1499
            return self.put_block(data)
1500
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1501
        return binascii.hexlify(h)
1502

    
1503
    # Path functions.
1504

    
1505
    def _generate_uuid(self):
1506
        return str(uuidlib.uuid4())
1507

    
1508
    def _put_object_node(self, path, parent, name):
1509
        path = '/'.join((path, name))
1510
        node = self.node.node_lookup(path)
1511
        if node is None:
1512
            node = self.node.node_create(parent, path)
1513
        return path, node
1514

    
1515
    def _put_path(self, user, parent, path,
1516
                  update_statistics_ancestors_depth=None):
1517
        node = self.node.node_create(parent, path)
1518
        self.node.version_create(node, None, 0, '', None, user,
1519
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1520
                                 update_statistics_ancestors_depth)
1521
        return node
1522

    
1523
    def _lookup_account(self, account, create=True):
1524
        node = self.node.node_lookup(account)
1525
        if node is None and create:
1526
            node = self._put_path(
1527
                account, self.ROOTNODE, account,
1528
                update_statistics_ancestors_depth=-1)  # User is account.
1529
        return account, node
1530

    
1531
    def _lookup_container(self, account, container):
1532
        for_update = True if self.lock_container_path else False
1533
        path = '/'.join((account, container))
1534
        node = self.node.node_lookup(path, for_update)
1535
        if node is None:
1536
            raise ItemNotExists('Container does not exist')
1537
        return path, node
1538

    
1539
    def _lookup_object(self, account, container, name, lock_container=False):
1540
        if lock_container:
1541
            self._lookup_container(account, container)
1542

    
1543
        path = '/'.join((account, container, name))
1544
        node = self.node.node_lookup(path)
1545
        if node is None:
1546
            raise ItemNotExists('Object does not exist')
1547
        return path, node
1548

    
1549
    def _lookup_objects(self, paths):
1550
        nodes = self.node.node_lookup_bulk(paths)
1551
        return paths, nodes
1552

    
1553
    def _get_properties(self, node, until=None):
1554
        """Return properties until the timestamp given."""
1555

    
1556
        before = until if until is not None else inf
1557
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1558
        if props is None and until is not None:
1559
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1560
        if props is None:
1561
            raise ItemNotExists('Path does not exist')
1562
        return props
1563

    
1564
    def _get_statistics(self, node, until=None, compute=False):
1565
        """Return (count, sum of size, timestamp) of everything under node."""
1566

    
1567
        if until is not None:
1568
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1569
        elif compute:
1570
            stats = self.node.statistics_latest(node,
1571
                                                except_cluster=CLUSTER_DELETED)
1572
        else:
1573
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1574
        if stats is None:
1575
            stats = (0, 0, 0)
1576
        return stats
1577

    
1578
    def _get_version(self, node, version=None):
1579
        if version is None:
1580
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1581
            if props is None:
1582
                raise ItemNotExists('Object does not exist')
1583
        else:
1584
            try:
1585
                version = int(version)
1586
            except ValueError:
1587
                raise VersionNotExists('Version does not exist')
1588
            props = self.node.version_get_properties(version, node=node)
1589
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1590
                raise VersionNotExists('Version does not exist')
1591
        return props
1592

    
1593
    def _get_versions(self, nodes):
1594
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1595

    
1596
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1597
                               type=None, hash=None, checksum=None,
1598
                               cluster=CLUSTER_NORMAL, is_copy=False,
1599
                               update_statistics_ancestors_depth=None):
1600
        """Create a new version of the node."""
1601

    
1602
        props = self.node.version_lookup(
1603
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1604
        if props is not None:
1605
            src_version_id = props[self.SERIAL]
1606
            src_hash = props[self.HASH]
1607
            src_size = props[self.SIZE]
1608
            src_type = props[self.TYPE]
1609
            src_checksum = props[self.CHECKSUM]
1610
        else:
1611
            src_version_id = None
1612
            src_hash = None
1613
            src_size = 0
1614
            src_type = ''
1615
            src_checksum = ''
1616
        if size is None:  # Set metadata.
1617
            hash = src_hash  # This way hash can be set to None
1618
                             # (account or container).
1619
            size = src_size
1620
        if type is None:
1621
            type = src_type
1622
        if checksum is None:
1623
            checksum = src_checksum
1624
        uuid = self._generate_uuid(
1625
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1626

    
1627
        if src_node is None:
1628
            pre_version_id = src_version_id
1629
        else:
1630
            pre_version_id = None
1631
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1632
            if props is not None:
1633
                pre_version_id = props[self.SERIAL]
1634
        if pre_version_id is not None:
1635
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1636
                                        update_statistics_ancestors_depth)
1637

    
1638
        dest_version_id, mtime = self.node.version_create(
1639
            node, hash, size, type, src_version_id, user, uuid, checksum,
1640
            cluster, update_statistics_ancestors_depth)
1641

    
1642
        self.node.attribute_unset_is_latest(node, dest_version_id)
1643

    
1644
        return pre_version_id, dest_version_id
1645

    
1646
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1647
                                node, meta, replace=False):
1648
        if src_version_id is not None:
1649
            self.node.attribute_copy(src_version_id, dest_version_id)
1650
        if not replace:
1651
            self.node.attribute_del(dest_version_id, domain, (
1652
                k for k, v in meta.iteritems() if v == ''))
1653
            self.node.attribute_set(dest_version_id, domain, node, (
1654
                (k, v) for k, v in meta.iteritems() if v != ''))
1655
        else:
1656
            self.node.attribute_del(dest_version_id, domain)
1657
            self.node.attribute_set(dest_version_id, domain, node, ((
1658
                k, v) for k, v in meta.iteritems()))
1659

    
1660
    def _put_metadata(self, user, node, domain, meta, replace=False,
1661
                      update_statistics_ancestors_depth=None):
1662
        """Create a new version and store metadata."""
1663

    
1664
        src_version_id, dest_version_id = self._put_version_duplicate(
1665
            user, node,
1666
            update_statistics_ancestors_depth=
1667
            update_statistics_ancestors_depth)
1668
        self._put_metadata_duplicate(
1669
            src_version_id, dest_version_id, domain, node, meta, replace)
1670
        return src_version_id, dest_version_id
1671

    
1672
    def _list_limits(self, listing, marker, limit):
1673
        start = 0
1674
        if marker:
1675
            try:
1676
                start = listing.index(marker) + 1
1677
            except ValueError:
1678
                pass
1679
        if not limit or limit > 10000:
1680
            limit = 10000
1681
        return start, limit
1682

    
1683
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1684
                                marker=None, limit=10000, virtual=True,
1685
                                domain=None, keys=None, until=None,
1686
                                size_range=None, allowed=None,
1687
                                all_props=False):
1688
        keys = keys or []
1689
        allowed = allowed or []
1690
        cont_prefix = path + '/'
1691
        prefix = cont_prefix + prefix
1692
        start = cont_prefix + marker if marker else None
1693
        before = until if until is not None else inf
1694
        filterq = keys if domain else []
1695
        sizeq = size_range
1696

    
1697
        objects, prefixes = self.node.latest_version_list(
1698
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1699
            allowed, domain, filterq, sizeq, all_props)
1700
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1701
        objects.sort(key=lambda x: x[0])
1702
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1703
        return objects
1704

    
1705
    # Reporting functions.
1706

    
1707
    @debug_method
1708
    @backend_method
1709
    def _report_size_change(self, user, account, size, details=None):
1710
        details = details or {}
1711

    
1712
        if size == 0:
1713
            return
1714

    
1715
        account_node = self._lookup_account(account, True)[1]
1716
        total = self._get_statistics(account_node, compute=True)[1]
1717
        details.update({'user': user, 'total': total})
1718
        self.messages.append(
1719
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1720
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1721

    
1722
        if not self.using_external_quotaholder:
1723
            return
1724

    
1725
        try:
1726
            name = details['path'] if 'path' in details else ''
1727
            serial = self.astakosclient.issue_one_commission(
1728
                holder=account,
1729
                source=DEFAULT_SOURCE,
1730
                provisions={'pithos.diskspace': size},
1731
                name=name)
1732
        except BaseException, e:
1733
            raise QuotaError(e)
1734
        else:
1735
            self.serials.append(serial)
1736

    
1737
    @debug_method
1738
    @backend_method
1739
    def _report_object_change(self, user, account, path, details=None):
1740
        details = details or {}
1741
        details.update({'user': user})
1742
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1743
                              account, QUEUE_INSTANCE_ID, 'object', path,
1744
                              details))
1745

    
1746
    @debug_method
1747
    @backend_method
1748
    def _report_sharing_change(self, user, account, path, details=None):
1749
        details = details or {}
1750
        details.update({'user': user})
1751
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1752
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1753
                              details))
1754

    
1755
    # Policy functions.
1756

    
1757
    def _check_policy(self, policy, is_account_policy=True):
1758
        default_policy = self.default_account_policy \
1759
            if is_account_policy else self.default_container_policy
1760
        for k in policy.keys():
1761
            if policy[k] == '':
1762
                policy[k] = default_policy.get(k)
1763
        for k, v in policy.iteritems():
1764
            if k == 'quota':
1765
                q = int(v)  # May raise ValueError.
1766
                if q < 0:
1767
                    raise ValueError
1768
            elif k == 'versioning':
1769
                if v not in ['auto', 'none']:
1770
                    raise ValueError
1771
            else:
1772
                raise ValueError
1773

    
1774
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1775
        default_policy = self.default_account_policy \
1776
            if is_account_policy else self.default_container_policy
1777
        if replace:
1778
            for k, v in default_policy.iteritems():
1779
                if k not in policy:
1780
                    policy[k] = v
1781
        self.node.policy_set(node, policy)
1782

    
1783
    def _get_policy(self, node, is_account_policy=True):
1784
        default_policy = self.default_account_policy \
1785
            if is_account_policy else self.default_container_policy
1786
        policy = default_policy.copy()
1787
        policy.update(self.node.policy_get(node))
1788
        return policy
1789

    
1790
    def _apply_versioning(self, account, container, version_id,
1791
                          update_statistics_ancestors_depth=None):
1792
        """Delete the provided version if such is the policy.
1793
           Return size of object removed.
1794
        """
1795

    
1796
        if version_id is None:
1797
            return 0
1798
        path, node = self._lookup_container(account, container)
1799
        versioning = self._get_policy(
1800
            node, is_account_policy=False)['versioning']
1801
        if versioning != 'auto':
1802
            hash, size = self.node.version_remove(
1803
                version_id, update_statistics_ancestors_depth)
1804
            self.store.map_delete(hash)
1805
            return size
1806
        elif self.free_versioning:
1807
            return self.node.version_get_properties(
1808
                version_id, keys=('size',))[0]
1809
        return 0
1810

    
1811
    # Access control functions.
1812

    
1813
    def _check_groups(self, groups):
1814
        # raise ValueError('Bad characters in groups')
1815
        pass
1816

    
1817
    def _check_permissions(self, path, permissions):
1818
        # raise ValueError('Bad characters in permissions')
1819
        pass
1820

    
1821
    def _get_formatted_paths(self, paths):
1822
        formatted = []
1823
        if len(paths) == 0:
1824
            return formatted
1825
        props = self.node.get_props(paths)
1826
        if props:
1827
            for prop in props:
1828
                if prop[1].split(';', 1)[0].strip() in (
1829
                        'application/directory', 'application/folder'):
1830
                    formatted.append((prop[0].rstrip('/') + '/',
1831
                                      self.MATCH_PREFIX))
1832
                formatted.append((prop[0], self.MATCH_EXACT))
1833
        return formatted
1834

    
1835
    def _get_permissions_path(self, account, container, name):
1836
        path = '/'.join((account, container, name))
1837
        permission_paths = self.permissions.access_inherit(path)
1838
        permission_paths.sort()
1839
        permission_paths.reverse()
1840
        for p in permission_paths:
1841
            if p == path:
1842
                return p
1843
            else:
1844
                if p.count('/') < 2:
1845
                    continue
1846
                node = self.node.node_lookup(p)
1847
                props = None
1848
                if node is not None:
1849
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1850
                if props is not None:
1851
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1852
                            'application/directory', 'application/folder'):
1853
                        return p
1854
        return None
1855

    
1856
    def _get_permissions_path_bulk(self, account, container, names):
1857
        formatted_paths = []
1858
        for name in names:
1859
            path = '/'.join((account, container, name))
1860
            formatted_paths.append(path)
1861
        permission_paths = self.permissions.access_inherit_bulk(
1862
            formatted_paths)
1863
        permission_paths.sort()
1864
        permission_paths.reverse()
1865
        permission_paths_list = []
1866
        lookup_list = []
1867
        for p in permission_paths:
1868
            if p in formatted_paths:
1869
                permission_paths_list.append(p)
1870
            else:
1871
                if p.count('/') < 2:
1872
                    continue
1873
                lookup_list.append(p)
1874

    
1875
        if len(lookup_list) > 0:
1876
            props = self.node.get_props(lookup_list)
1877
            if props:
1878
                for prop in props:
1879
                    if prop[1].split(';', 1)[0].strip() in (
1880
                            'application/directory', 'application/folder'):
1881
                        permission_paths_list.append(prop[0])
1882

    
1883
        if len(permission_paths_list) > 0:
1884
            return permission_paths_list
1885

    
1886
        return None
1887

    
1888
    def _can_read(self, user, account, container, name):
1889
        if user == account:
1890
            return True
1891
        path = '/'.join((account, container, name))
1892
        if self.permissions.public_get(path) is not None:
1893
            return True
1894
        path = self._get_permissions_path(account, container, name)
1895
        if not path:
1896
            raise NotAllowedError
1897
        if (not self.permissions.access_check(path, self.READ, user) and not
1898
                self.permissions.access_check(path, self.WRITE, user)):
1899
            raise NotAllowedError
1900

    
1901
    def _can_write(self, user, account, container, name):
1902
        if user == account:
1903
            return True
1904
        path = '/'.join((account, container, name))
1905
        path = self._get_permissions_path(account, container, name)
1906
        if not path:
1907
            raise NotAllowedError
1908
        if not self.permissions.access_check(path, self.WRITE, user):
1909
            raise NotAllowedError
1910

    
1911
    def _allowed_accounts(self, user):
1912
        allow = set()
1913
        for path in self.permissions.access_list_paths(user):
1914
            allow.add(path.split('/', 1)[0])
1915
        return sorted(allow)
1916

    
1917
    def _allowed_containers(self, user, account):
1918
        allow = set()
1919
        for path in self.permissions.access_list_paths(user, account):
1920
            allow.add(path.split('/', 2)[1])
1921
        return sorted(allow)
1922

    
1923
    # Domain functions
1924

    
1925
    @debug_method
1926
    @backend_method
1927
    def get_domain_objects(self, domain, user=None):
1928
        allowed_paths = self.permissions.access_list_paths(
1929
            user, include_owned=user is not None, include_containers=False)
1930
        if not allowed_paths:
1931
            return []
1932
        obj_list = self.node.domain_object_list(
1933
            domain, allowed_paths, CLUSTER_NORMAL)
1934
        return [(path,
1935
                 self._build_metadata(props, user_defined_meta),
1936
                 self.permissions.access_get(path)) for
1937
                path, props, user_defined_meta in obj_list]
1938

    
1939
    # util functions
1940

    
1941
    def _build_metadata(self, props, user_defined=None,
1942
                        include_user_defined=True):
1943
        meta = {'bytes': props[self.SIZE],
1944
                'type': props[self.TYPE],
1945
                'hash': props[self.HASH],
1946
                'version': props[self.SERIAL],
1947
                'version_timestamp': props[self.MTIME],
1948
                'modified_by': props[self.MUSER],
1949
                'uuid': props[self.UUID],
1950
                'checksum': props[self.CHECKSUM]}
1951
        if include_user_defined and user_defined is not None:
1952
            meta.update(user_defined)
1953
        return meta
1954

    
1955
    def _exists(self, node):
1956
        try:
1957
            self._get_version(node)
1958
        except ItemNotExists:
1959
            return False
1960
        else:
1961
            return True
1962

    
1963
    def _unhexlify_hash(self, hash):
1964
        try:
1965
            return binascii.unhexlify(hash)
1966
        except TypeError:
1967
            raise InvalidHash(hash)