Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 9c6ddb46

History | View | Annotate | Download (74.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
124

    
125
logger = logging.getLogger(__name__)
126

    
127

    
128
def backend_method(func):
129
    @wraps(func)
130
    def wrapper(self, *args, **kw):
131
        # if we are inside a database transaction
132
        # just proceed with the method execution
133
        # otherwise manage a new transaction
134
        if self.in_transaction:
135
            return func(self, *args, **kw)
136

    
137
        try:
138
            self.pre_exec()
139
            result = func(self, *args, **kw)
140
            success_status = True
141
            return result
142
        except:
143
            success_status = False
144
            raise
145
        finally:
146
            self.post_exec(success_status)
147
    return wrapper
148

    
149

    
150
def debug_method(func):
151
    @wraps(func)
152
    def wrapper(self, *args, **kw):
153
        try:
154
            result = func(self, *args, **kw)
155
            return result
156
        except:
157
            result = format_exc()
158
            raise
159
        finally:
160
            all_args = map(repr, args)
161
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
162
            logger.debug(">>> %s(%s) <<< %s" % (
163
                func.__name__, ', '.join(all_args).rstrip(', '), result))
164
    return wrapper
165

    
166

    
167
def list_method(func):
168
    @wraps(func)
169
    def wrapper(self, *args, **kw):
170
        marker = kw.get('marker')
171
        limit = kw.get('limit')
172
        result = func(self, *args, **kw)
173
        start, limit = self._list_limits(result, marker, limit)
174
        return result[start:start + limit]
175
    return wrapper
176

    
177

    
178
class ModularBackend(BaseBackend):
179
    """A modular backend.
180

181
    Uses modules for SQL functions and storage.
182
    """
183

    
184
    def __init__(self, db_module=None, db_connection=None,
185
                 block_module=None, block_path=None, block_umask=None,
186
                 block_size=None, hash_algorithm=None,
187
                 queue_module=None, queue_hosts=None, queue_exchange=None,
188
                 astakos_auth_url=None, service_token=None,
189
                 astakosclient_poolsize=None,
190
                 free_versioning=True, block_params=None,
191
                 public_url_security=None,
192
                 public_url_alphabet=None,
193
                 account_quota_policy=None,
194
                 container_quota_policy=None,
195
                 container_versioning_policy=None):
196
        db_module = db_module or DEFAULT_DB_MODULE
197
        db_connection = db_connection or DEFAULT_DB_CONNECTION
198
        block_module = block_module or DEFAULT_BLOCK_MODULE
199
        block_path = block_path or DEFAULT_BLOCK_PATH
200
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
201
        block_params = block_params or DEFAULT_BLOCK_PARAMS
202
        block_size = block_size or DEFAULT_BLOCK_SIZE
203
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
204
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
205
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
206
        container_quota_policy = container_quota_policy \
207
            or DEFAULT_CONTAINER_QUOTA
208
        container_versioning_policy = container_versioning_policy \
209
            or DEFAULT_CONTAINER_VERSIONING
210

    
211
        self.default_account_policy = {'quota': account_quota_policy}
212
        self.default_container_policy = {
213
            'quota': container_quota_policy,
214
            'versioning': container_versioning_policy
215
        }
216
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
217
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
218

    
219
        self.public_url_security = (public_url_security or
220
                                    DEFAULT_PUBLIC_URL_SECURITY)
221
        self.public_url_alphabet = (public_url_alphabet or
222
                                    DEFAULT_PUBLIC_URL_ALPHABET)
223

    
224
        self.hash_algorithm = hash_algorithm
225
        self.block_size = block_size
226
        self.free_versioning = free_versioning
227

    
228
        def load_module(m):
229
            __import__(m)
230
            return sys.modules[m]
231

    
232
        self.db_module = load_module(db_module)
233
        self.wrapper = self.db_module.DBWrapper(db_connection)
234
        params = {'wrapper': self.wrapper}
235
        self.permissions = self.db_module.Permissions(**params)
236
        self.config = self.db_module.Config(**params)
237
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
238
        for x in ['READ', 'WRITE']:
239
            setattr(self, x, getattr(self.db_module, x))
240
        self.node = self.db_module.Node(**params)
241
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
242
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
243
                  'MATCH_PREFIX', 'MATCH_EXACT']:
244
            setattr(self, x, getattr(self.db_module, x))
245

    
246
        self.ALLOWED = ['read', 'write']
247

    
248
        self.block_module = load_module(block_module)
249
        self.block_params = block_params
250
        params = {'path': block_path,
251
                  'block_size': self.block_size,
252
                  'hash_algorithm': self.hash_algorithm,
253
                  'umask': block_umask}
254
        params.update(self.block_params)
255
        self.store = self.block_module.Store(**params)
256

    
257
        if queue_module and queue_hosts:
258
            self.queue_module = load_module(queue_module)
259
            params = {'hosts': queue_hosts,
260
                      'exchange': queue_exchange,
261
                      'client_id': QUEUE_CLIENT_ID}
262
            self.queue = self.queue_module.Queue(**params)
263
        else:
264
            class NoQueue:
265
                def send(self, *args):
266
                    pass
267

    
268
                def close(self):
269
                    pass
270

    
271
            self.queue = NoQueue()
272

    
273
        self.astakos_auth_url = astakos_auth_url
274
        self.service_token = service_token
275

    
276
        if not astakos_auth_url or not AstakosClient:
277
            self.astakosclient = DisabledAstakosClient(
278
                service_token, astakos_auth_url,
279
                use_pool=True,
280
                pool_size=astakosclient_poolsize)
281
        else:
282
            self.astakosclient = AstakosClient(
283
                service_token, astakos_auth_url,
284
                use_pool=True,
285
                pool_size=astakosclient_poolsize)
286

    
287
        self.serials = []
288
        self.messages = []
289

    
290
        self._move_object = partial(self._copy_object, is_move=True)
291

    
292
        self.lock_container_path = False
293

    
294
        self.in_transaction = False
295

    
296
    def pre_exec(self, lock_container_path=False):
297
        self.lock_container_path = lock_container_path
298
        self.wrapper.execute()
299
        self.serials = []
300
        self.in_transaction = True
301

    
302
    def post_exec(self, success_status=True):
303
        if success_status:
304
            # send messages produced
305
            for m in self.messages:
306
                self.queue.send(*m)
307

    
308
            # register serials
309
            if self.serials:
310
                self.commission_serials.insert_many(
311
                    self.serials)
312

    
313
                # commit to ensure that the serials are registered
314
                # even if resolve commission fails
315
                self.wrapper.commit()
316

    
317
                # start new transaction
318
                self.wrapper.execute()
319

    
320
                r = self.astakosclient.resolve_commissions(
321
                    accept_serials=self.serials,
322
                    reject_serials=[])
323
                self.commission_serials.delete_many(
324
                    r['accepted'])
325

    
326
            self.wrapper.commit()
327
        else:
328
            if self.serials:
329
                r = self.astakosclient.resolve_commissions(
330
                    accept_serials=[],
331
                    reject_serials=self.serials)
332
                self.commission_serials.delete_many(
333
                    r['rejected'])
334
            self.wrapper.rollback()
335
        self.in_transaction = False
336

    
337
    def close(self):
338
        self.wrapper.close()
339
        self.queue.close()
340

    
341
    @property
342
    def using_external_quotaholder(self):
343
        return not isinstance(self.astakosclient, DisabledAstakosClient)
344

    
345
    @debug_method
346
    @backend_method
347
    @list_method
348
    def list_accounts(self, user, marker=None, limit=10000):
349
        """Return a list of accounts the user can access."""
350

    
351
        return self._allowed_accounts(user)
352

    
353
    def _get_account_quotas(self, account):
354
        """Get account usage from astakos."""
355

    
356
        quotas = self.astakosclient.service_get_quotas(account)[account]
357
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
358
                                                  {})
359

    
360
    @debug_method
361
    @backend_method
362
    def get_account_meta(self, user, account, domain, until=None,
363
                         include_user_defined=True):
364
        """Return a dictionary with the account metadata for the domain."""
365

    
366
        path, node = self._lookup_account(account, user == account)
367
        if user != account:
368
            if until or (node is None) or (account not
369
                                           in self._allowed_accounts(user)):
370
                raise NotAllowedError
371
        try:
372
            props = self._get_properties(node, until)
373
            mtime = props[self.MTIME]
374
        except NameError:
375
            props = None
376
            mtime = until
377
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
378
        tstamp = max(tstamp, mtime)
379
        if until is None:
380
            modified = tstamp
381
        else:
382
            modified = self._get_statistics(
383
                node, compute=True)[2]  # Overall last modification.
384
            modified = max(modified, mtime)
385

    
386
        if user != account:
387
            meta = {'name': account}
388
        else:
389
            meta = {}
390
            if props is not None and include_user_defined:
391
                meta.update(
392
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
393
            if until is not None:
394
                meta.update({'until_timestamp': tstamp})
395
            meta.update({'name': account, 'count': count, 'bytes': bytes})
396
            if self.using_external_quotaholder:
397
                external_quota = self._get_account_quotas(account)
398
                meta['bytes'] = external_quota.get('usage', 0)
399
        meta.update({'modified': modified})
400
        return meta
401

    
402
    @debug_method
403
    @backend_method
404
    def update_account_meta(self, user, account, domain, meta, replace=False):
405
        """Update the metadata associated with the account for the domain."""
406

    
407
        if user != account:
408
            raise NotAllowedError
409
        path, node = self._lookup_account(account, True)
410
        self._put_metadata(user, node, domain, meta, replace,
411
                           update_statistics_ancestors_depth=-1)
412

    
413
    @debug_method
414
    @backend_method
415
    def get_account_groups(self, user, account):
416
        """Return a dictionary with the user groups defined for the account."""
417

    
418
        if user != account:
419
            if account not in self._allowed_accounts(user):
420
                raise NotAllowedError
421
            return {}
422
        self._lookup_account(account, True)
423
        return self.permissions.group_dict(account)
424

    
425
    @debug_method
426
    @backend_method
427
    def update_account_groups(self, user, account, groups, replace=False):
428
        """Update the groups associated with the account."""
429

    
430
        if user != account:
431
            raise NotAllowedError
432
        self._lookup_account(account, True)
433
        self._check_groups(groups)
434
        if replace:
435
            self.permissions.group_destroy(account)
436
        for k, v in groups.iteritems():
437
            if not replace:  # If not already deleted.
438
                self.permissions.group_delete(account, k)
439
            if v:
440
                self.permissions.group_addmany(account, k, v)
441

    
442
    @debug_method
443
    @backend_method
444
    def get_account_policy(self, user, account):
445
        """Return a dictionary with the account policy."""
446

    
447
        if user != account:
448
            if account not in self._allowed_accounts(user):
449
                raise NotAllowedError
450
            return {}
451
        path, node = self._lookup_account(account, True)
452
        policy = self._get_policy(node, is_account_policy=True)
453
        if self.using_external_quotaholder:
454
            external_quota = self._get_account_quotas(account)
455
            policy['quota'] = external_quota.get('limit', 0)
456
        return policy
457

    
458
    @debug_method
459
    @backend_method
460
    def update_account_policy(self, user, account, policy, replace=False):
461
        """Update the policy associated with the account."""
462

    
463
        if user != account:
464
            raise NotAllowedError
465
        path, node = self._lookup_account(account, True)
466
        self._check_policy(policy, is_account_policy=True)
467
        self._put_policy(node, policy, replace, is_account_policy=True)
468

    
469
    @debug_method
470
    @backend_method
471
    def put_account(self, user, account, policy=None):
472
        """Create a new account with the given name."""
473

    
474
        policy = policy or {}
475
        if user != account:
476
            raise NotAllowedError
477
        node = self.node.node_lookup(account)
478
        if node is not None:
479
            raise AccountExists('Account already exists')
480
        if policy:
481
            self._check_policy(policy, is_account_policy=True)
482
        node = self._put_path(user, self.ROOTNODE, account,
483
                              update_statistics_ancestors_depth=-1)
484
        self._put_policy(node, policy, True, is_account_policy=True)
485

    
486
    @debug_method
487
    @backend_method
488
    def delete_account(self, user, account):
489
        """Delete the account with the given name."""
490

    
491
        if user != account:
492
            raise NotAllowedError
493
        node = self.node.node_lookup(account)
494
        if node is None:
495
            return
496
        if not self.node.node_remove(node,
497
                                     update_statistics_ancestors_depth=-1):
498
            raise AccountNotEmpty('Account is not empty')
499
        self.permissions.group_destroy(account)
500

    
501
    @debug_method
502
    @backend_method
503
    @list_method
504
    def list_containers(self, user, account, marker=None, limit=10000,
505
                        shared=False, until=None, public=False):
506
        """Return a list of containers existing under an account."""
507

    
508
        if user != account:
509
            if until or account not in self._allowed_accounts(user):
510
                raise NotAllowedError
511
            return self._allowed_containers(user, account)
512
        if shared or public:
513
            allowed = set()
514
            if shared:
515
                allowed.update([x.split('/', 2)[1] for x in
516
                               self.permissions.access_list_shared(account)])
517
            if public:
518
                allowed.update([x[0].split('/', 2)[1] for x in
519
                               self.permissions.public_list(account)])
520
            return sorted(allowed)
521
        node = self.node.node_lookup(account)
522
        return [x[0] for x in self._list_object_properties(
523
            node, account, '', '/', marker, limit, False, None, [], until)]
524

    
525
    @debug_method
526
    @backend_method
527
    def list_container_meta(self, user, account, container, domain,
528
                            until=None):
529
        """Return a list of the container's object meta keys for a domain."""
530

    
531
        allowed = []
532
        if user != account:
533
            if until:
534
                raise NotAllowedError
535
            allowed = self.permissions.access_list_paths(
536
                user, '/'.join((account, container)))
537
            if not allowed:
538
                raise NotAllowedError
539
        path, node = self._lookup_container(account, container)
540
        before = until if until is not None else inf
541
        allowed = self._get_formatted_paths(allowed)
542
        return self.node.latest_attribute_keys(node, domain, before,
543
                                               CLUSTER_DELETED, allowed)
544

    
545
    @debug_method
546
    @backend_method
547
    def get_container_meta(self, user, account, container, domain, until=None,
548
                           include_user_defined=True):
549
        """Return a dictionary with the container metadata for the domain."""
550

    
551
        if user != account:
552
            if until or container not in self._allowed_containers(user,
553
                                                                  account):
554
                raise NotAllowedError
555
        path, node = self._lookup_container(account, container)
556
        props = self._get_properties(node, until)
557
        mtime = props[self.MTIME]
558
        count, bytes, tstamp = self._get_statistics(node, until)
559
        tstamp = max(tstamp, mtime)
560
        if until is None:
561
            modified = tstamp
562
        else:
563
            modified = self._get_statistics(
564
                node)[2]  # Overall last modification.
565
            modified = max(modified, mtime)
566

    
567
        if user != account:
568
            meta = {'name': container}
569
        else:
570
            meta = {}
571
            if include_user_defined:
572
                meta.update(
573
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
574
            if until is not None:
575
                meta.update({'until_timestamp': tstamp})
576
            meta.update({'name': container, 'count': count, 'bytes': bytes})
577
        meta.update({'modified': modified})
578
        return meta
579

    
580
    @debug_method
581
    @backend_method
582
    def update_container_meta(self, user, account, container, domain, meta,
583
                              replace=False):
584
        """Update the metadata associated with the container for the domain."""
585

    
586
        if user != account:
587
            raise NotAllowedError
588
        path, node = self._lookup_container(account, container)
589
        src_version_id, dest_version_id = self._put_metadata(
590
            user, node, domain, meta, replace,
591
            update_statistics_ancestors_depth=0)
592
        if src_version_id is not None:
593
            versioning = self._get_policy(
594
                node, is_account_policy=False)['versioning']
595
            if versioning != 'auto':
596
                self.node.version_remove(src_version_id,
597
                                         update_statistics_ancestors_depth=0)
598

    
599
    @debug_method
600
    @backend_method
601
    def get_container_policy(self, user, account, container):
602
        """Return a dictionary with the container policy."""
603

    
604
        if user != account:
605
            if container not in self._allowed_containers(user, account):
606
                raise NotAllowedError
607
            return {}
608
        path, node = self._lookup_container(account, container)
609
        return self._get_policy(node, is_account_policy=False)
610

    
611
    @debug_method
612
    @backend_method
613
    def update_container_policy(self, user, account, container, policy,
614
                                replace=False):
615
        """Update the policy associated with the container."""
616

    
617
        if user != account:
618
            raise NotAllowedError
619
        path, node = self._lookup_container(account, container)
620
        self._check_policy(policy, is_account_policy=False)
621
        self._put_policy(node, policy, replace, is_account_policy=False)
622

    
623
    @debug_method
624
    @backend_method
625
    def put_container(self, user, account, container, policy=None):
626
        """Create a new container with the given name."""
627

    
628
        policy = policy or {}
629
        if user != account:
630
            raise NotAllowedError
631
        try:
632
            path, node = self._lookup_container(account, container)
633
        except NameError:
634
            pass
635
        else:
636
            raise ContainerExists('Container already exists')
637
        if policy:
638
            self._check_policy(policy, is_account_policy=False)
639
        path = '/'.join((account, container))
640
        node = self._put_path(
641
            user, self._lookup_account(account, True)[1], path,
642
            update_statistics_ancestors_depth=-1)
643
        self._put_policy(node, policy, True, is_account_policy=False)
644

    
645
    @debug_method
646
    @backend_method
647
    def delete_container(self, user, account, container, until=None, prefix='',
648
                         delimiter=None):
649
        """Delete/purge the container with the given name."""
650

    
651
        if user != account:
652
            raise NotAllowedError
653
        path, node = self._lookup_container(account, container)
654

    
655
        if until is not None:
656
            hashes, size, serials = self.node.node_purge_children(
657
                node, until, CLUSTER_HISTORY,
658
                update_statistics_ancestors_depth=0)
659
            for h in hashes:
660
                self.store.map_delete(h)
661
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
662
                                          update_statistics_ancestors_depth=0)
663
            if not self.free_versioning:
664
                self._report_size_change(
665
                    user, account, -size, {
666
                        'action': 'container purge',
667
                        'path': path,
668
                        'versions': ','.join(str(i) for i in serials)
669
                    }
670
                )
671
            return
672

    
673
        if not delimiter:
674
            if self._get_statistics(node)[0] > 0:
675
                raise ContainerNotEmpty('Container is not empty')
676
            hashes, size, serials = self.node.node_purge_children(
677
                node, inf, CLUSTER_HISTORY,
678
                update_statistics_ancestors_depth=0)
679
            for h in hashes:
680
                self.store.map_delete(h)
681
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
682
                                          update_statistics_ancestors_depth=0)
683
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
684
            if not self.free_versioning:
685
                self._report_size_change(
686
                    user, account, -size, {
687
                        'action': 'container purge',
688
                        'path': path,
689
                        'versions': ','.join(str(i) for i in serials)
690
                    }
691
                )
692
        else:
693
            # remove only contents
694
            src_names = self._list_objects_no_limit(
695
                user, account, container, prefix='', delimiter=None,
696
                virtual=False, domain=None, keys=[], shared=False, until=None,
697
                size_range=None, all_props=True, public=False)
698
            paths = []
699
            for t in src_names:
700
                path = '/'.join((account, container, t[0]))
701
                node = t[2]
702
                if not self._exists(node):
703
                    continue
704
                src_version_id, dest_version_id = self._put_version_duplicate(
705
                    user, node, size=0, type='', hash=None, checksum='',
706
                    cluster=CLUSTER_DELETED,
707
                    update_statistics_ancestors_depth=1)
708
                del_size = self._apply_versioning(
709
                    account, container, src_version_id,
710
                    update_statistics_ancestors_depth=1)
711
                self._report_size_change(
712
                    user, account, -del_size, {
713
                        'action': 'object delete',
714
                        'path': path,
715
                        'versions': ','.join([str(dest_version_id)])})
716
                self._report_object_change(
717
                    user, account, path, details={'action': 'object delete'})
718
                paths.append(path)
719
            self.permissions.access_clear_bulk(paths)
720

    
721
    def _list_objects(self, user, account, container, prefix, delimiter,
722
                      marker, limit, virtual, domain, keys, shared, until,
723
                      size_range, all_props, public):
724
        if user != account and until:
725
            raise NotAllowedError
726

    
727
        objects = []
728
        if shared and public:
729
            # get shared first
730
            shared_paths = self._list_object_permissions(
731
                user, account, container, prefix, shared=True, public=False)
732
            if shared_paths:
733
                path, node = self._lookup_container(account, container)
734
                shared_paths = self._get_formatted_paths(shared_paths)
735
                objects = set(self._list_object_properties(
736
                    node, path, prefix, delimiter, marker, limit, virtual,
737
                    domain, keys, until, size_range, shared_paths, all_props))
738

    
739
            # get public
740
            objects |= set(self._list_public_object_properties(
741
                user, account, container, prefix, all_props))
742
            objects = list(objects)
743

    
744
            objects.sort(key=lambda x: x[0])
745
        elif public:
746
            objects = self._list_public_object_properties(
747
                user, account, container, prefix, all_props)
748
        else:
749
            allowed = self._list_object_permissions(
750
                user, account, container, prefix, shared, public=False)
751
            if shared and not allowed:
752
                return []
753
            path, node = self._lookup_container(account, container)
754
            allowed = self._get_formatted_paths(allowed)
755
            objects = self._list_object_properties(
756
                node, path, prefix, delimiter, marker, limit, virtual, domain,
757
                keys, until, size_range, allowed, all_props)
758

    
759
        # apply limits
760
        start, limit = self._list_limits(objects, marker, limit)
761
        return objects[start:start + limit]
762

    
763
    def _list_public_object_properties(self, user, account, container, prefix,
764
                                       all_props):
765
        public = self._list_object_permissions(
766
            user, account, container, prefix, shared=False, public=True)
767
        paths, nodes = self._lookup_objects(public)
768
        path = '/'.join((account, container))
769
        cont_prefix = path + '/'
770
        paths = [x[len(cont_prefix):] for x in paths]
771
        objects = [(p,) + props for p, props in
772
                   zip(paths, self.node.version_lookup_bulk(
773
                       nodes, all_props=all_props, order_by_path=True))]
774
        return objects
775

    
776
    def _list_objects_no_limit(self, user, account, container, prefix,
777
                               delimiter, virtual, domain, keys, shared, until,
778
                               size_range, all_props, public):
779
        objects = []
780
        while True:
781
            marker = objects[-1] if objects else None
782
            limit = 10000
783
            l = self._list_objects(
784
                user, account, container, prefix, delimiter, marker, limit,
785
                virtual, domain, keys, shared, until, size_range, all_props,
786
                public)
787
            objects.extend(l)
788
            if not l or len(l) < limit:
789
                break
790
        return objects
791

    
792
    def _list_object_permissions(self, user, account, container, prefix,
793
                                 shared, public):
794
        allowed = []
795
        path = '/'.join((account, container, prefix)).rstrip('/')
796
        if user != account:
797
            allowed = self.permissions.access_list_paths(user, path)
798
            if not allowed:
799
                raise NotAllowedError
800
        else:
801
            allowed = set()
802
            if shared:
803
                allowed.update(self.permissions.access_list_shared(path))
804
            if public:
805
                allowed.update(
806
                    [x[0] for x in self.permissions.public_list(path)])
807
            allowed = sorted(allowed)
808
            if not allowed:
809
                return []
810
        return allowed
811

    
812
    @debug_method
813
    @backend_method
814
    def list_objects(self, user, account, container, prefix='', delimiter=None,
815
                     marker=None, limit=10000, virtual=True, domain=None,
816
                     keys=None, shared=False, until=None, size_range=None,
817
                     public=False):
818
        """List (object name, object version_id) under a container."""
819

    
820
        keys = keys or []
821
        return self._list_objects(
822
            user, account, container, prefix, delimiter, marker, limit,
823
            virtual, domain, keys, shared, until, size_range, False, public)
824

    
825
    @debug_method
826
    @backend_method
827
    def list_object_meta(self, user, account, container, prefix='',
828
                         delimiter=None, marker=None, limit=10000,
829
                         virtual=True, domain=None, keys=None, shared=False,
830
                         until=None, size_range=None, public=False):
831
        """Return a list of metadata dicts of objects under a container."""
832

    
833
        keys = keys or []
834
        props = self._list_objects(
835
            user, account, container, prefix, delimiter, marker, limit,
836
            virtual, domain, keys, shared, until, size_range, True, public)
837
        objects = []
838
        for p in props:
839
            if len(p) == 2:
840
                objects.append({'subdir': p[0]})
841
            else:
842
                objects.append({
843
                    'name': p[0],
844
                    'bytes': p[self.SIZE + 1],
845
                    'type': p[self.TYPE + 1],
846
                    'hash': p[self.HASH + 1],
847
                    'version': p[self.SERIAL + 1],
848
                    'version_timestamp': p[self.MTIME + 1],
849
                    'modified': p[self.MTIME + 1] if until is None else None,
850
                    'modified_by': p[self.MUSER + 1],
851
                    'uuid': p[self.UUID + 1],
852
                    'checksum': p[self.CHECKSUM + 1]})
853
        return objects
854

    
855
    @debug_method
856
    @backend_method
857
    def list_object_permissions(self, user, account, container, prefix=''):
858
        """Return a list of paths enforce permissions under a container."""
859

    
860
        return self._list_object_permissions(user, account, container, prefix,
861
                                             True, False)
862

    
863
    @debug_method
864
    @backend_method
865
    def list_object_public(self, user, account, container, prefix=''):
866
        """Return a mapping of object paths to public ids under a container."""
867

    
868
        public = {}
869
        for path, p in self.permissions.public_list('/'.join((account,
870
                                                              container,
871
                                                              prefix))):
872
            public[path] = p
873
        return public
874

    
875
    @debug_method
876
    @backend_method
877
    def get_object_meta(self, user, account, container, name, domain,
878
                        version=None, include_user_defined=True):
879
        """Return a dictionary with the object metadata for the domain."""
880

    
881
        self._can_read(user, account, container, name)
882
        path, node = self._lookup_object(account, container, name)
883
        props = self._get_version(node, version)
884
        if version is None:
885
            modified = props[self.MTIME]
886
        else:
887
            try:
888
                modified = self._get_version(
889
                    node)[self.MTIME]  # Overall last modification.
890
            except NameError:  # Object may be deleted.
891
                del_props = self.node.version_lookup(
892
                    node, inf, CLUSTER_DELETED)
893
                if del_props is None:
894
                    raise ItemNotExists('Object does not exist')
895
                modified = del_props[self.MTIME]
896

    
897
        meta = {}
898
        if include_user_defined:
899
            meta.update(
900
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
901
        meta.update({'name': name,
902
                     'bytes': props[self.SIZE],
903
                     'type': props[self.TYPE],
904
                     'hash': props[self.HASH],
905
                     'version': props[self.SERIAL],
906
                     'version_timestamp': props[self.MTIME],
907
                     'modified': modified,
908
                     'modified_by': props[self.MUSER],
909
                     'uuid': props[self.UUID],
910
                     'checksum': props[self.CHECKSUM]})
911
        return meta
912

    
913
    @debug_method
914
    @backend_method
915
    def update_object_meta(self, user, account, container, name, domain, meta,
916
                           replace=False):
917
        """Update object metadata for a domain and return the new version."""
918

    
919
        self._can_write(user, account, container, name)
920

    
921
        path, node = self._lookup_object(account, container, name,
922
                                         lock_container=True)
923
        src_version_id, dest_version_id = self._put_metadata(
924
            user, node, domain, meta, replace,
925
            update_statistics_ancestors_depth=1)
926
        self._apply_versioning(account, container, src_version_id,
927
                               update_statistics_ancestors_depth=1)
928
        return dest_version_id
929

    
930
    @debug_method
931
    @backend_method
932
    def get_object_permissions_bulk(self, user, account, container, names):
933
        """Return the action allowed on the object, the path
934
        from which the object gets its permissions from,
935
        along with a dictionary containing the permissions."""
936

    
937
        permissions_path = self._get_permissions_path_bulk(account, container,
938
                                                           names)
939
        access_objects = self.permissions.access_check_bulk(permissions_path,
940
                                                            user)
941
        #group_parents = access_objects['group_parents']
942
        nobject_permissions = {}
943
        cpath = '/'.join((account, container, ''))
944
        cpath_idx = len(cpath)
945
        for path in permissions_path:
946
            allowed = 1
947
            name = path[cpath_idx:]
948
            if user != account:
949
                try:
950
                    allowed = access_objects[path]
951
                except KeyError:
952
                    raise NotAllowedError
953
            access_dict, allowed = \
954
                self.permissions.access_get_for_bulk(access_objects[path])
955
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
956
                                         access_dict)
957
        self._lookup_objects(permissions_path)
958
        return nobject_permissions
959

    
960
    @debug_method
961
    @backend_method
962
    def get_object_permissions(self, user, account, container, name):
963
        """Return the action allowed on the object, the path
964
        from which the object gets its permissions from,
965
        along with a dictionary containing the permissions."""
966

    
967
        allowed = 'write'
968
        permissions_path = self._get_permissions_path(account, container, name)
969
        if user != account:
970
            if self.permissions.access_check(permissions_path, self.WRITE,
971
                                             user):
972
                allowed = 'write'
973
            elif self.permissions.access_check(permissions_path, self.READ,
974
                                               user):
975
                allowed = 'read'
976
            else:
977
                raise NotAllowedError
978
        self._lookup_object(account, container, name)
979
        return (allowed,
980
                permissions_path,
981
                self.permissions.access_get(permissions_path))
982

    
983
    @debug_method
984
    @backend_method
985
    def update_object_permissions(self, user, account, container, name,
986
                                  permissions):
987
        """Update the permissions associated with the object."""
988

    
989
        if user != account:
990
            raise NotAllowedError
991
        path = self._lookup_object(account, container, name,
992
                                   lock_container=True)[0]
993
        self._check_permissions(path, permissions)
994
        try:
995
            self.permissions.access_set(path, permissions)
996
        except:
997
            raise ValueError
998
        else:
999
            self._report_sharing_change(user, account, path, {'members':
1000
                                        self.permissions.access_members(path)})
1001

    
1002
    @debug_method
1003
    @backend_method
1004
    def get_object_public(self, user, account, container, name):
1005
        """Return the public id of the object if applicable."""
1006

    
1007
        self._can_read(user, account, container, name)
1008
        path = self._lookup_object(account, container, name)[0]
1009
        p = self.permissions.public_get(path)
1010
        return p
1011

    
1012
    @debug_method
1013
    @backend_method
1014
    def update_object_public(self, user, account, container, name, public):
1015
        """Update the public status of the object."""
1016

    
1017
        self._can_write(user, account, container, name)
1018
        path = self._lookup_object(account, container, name,
1019
                                   lock_container=True)[0]
1020
        if not public:
1021
            self.permissions.public_unset(path)
1022
        else:
1023
            self.permissions.public_set(
1024
                path, self.public_url_security, self.public_url_alphabet)
1025

    
1026
    @debug_method
1027
    @backend_method
1028
    def get_object_hashmap(self, user, account, container, name, version=None):
1029
        """Return the object's size and a list with partial hashes."""
1030

    
1031
        self._can_read(user, account, container, name)
1032
        path, node = self._lookup_object(account, container, name)
1033
        props = self._get_version(node, version)
1034
        if props[self.HASH] is None:
1035
            return 0, ()
1036
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1037
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1038

    
1039
    def _update_object_hash(self, user, account, container, name, size, type,
1040
                            hash, checksum, domain, meta, replace_meta,
1041
                            permissions, src_node=None, src_version_id=None,
1042
                            is_copy=False, report_size_change=True):
1043
        if permissions is not None and user != account:
1044
            raise NotAllowedError
1045
        self._can_write(user, account, container, name)
1046
        if permissions is not None:
1047
            path = '/'.join((account, container, name))
1048
            self._check_permissions(path, permissions)
1049

    
1050
        account_path, account_node = self._lookup_account(account, True)
1051
        container_path, container_node = self._lookup_container(
1052
            account, container)
1053

    
1054
        path, node = self._put_object_node(
1055
            container_path, container_node, name)
1056
        pre_version_id, dest_version_id = self._put_version_duplicate(
1057
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1058
            checksum=checksum, is_copy=is_copy,
1059
            update_statistics_ancestors_depth=1)
1060

    
1061
        # Handle meta.
1062
        if src_version_id is None:
1063
            src_version_id = pre_version_id
1064
        self._put_metadata_duplicate(
1065
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1066

    
1067
        del_size = self._apply_versioning(account, container, pre_version_id,
1068
                                          update_statistics_ancestors_depth=1)
1069
        size_delta = size - del_size
1070
        if size_delta > 0:
1071
            # Check account quota.
1072
            if not self.using_external_quotaholder:
1073
                account_quota = long(self._get_policy(
1074
                    account_node, is_account_policy=True)['quota'])
1075
                account_usage = self._get_statistics(account_node,
1076
                                                     compute=True)[1]
1077
                if (account_quota > 0 and account_usage > account_quota):
1078
                    raise QuotaError(
1079
                        'Account quota exceeded: limit: %s, usage: %s' % (
1080
                            account_quota, account_usage))
1081

    
1082
            # Check container quota.
1083
            container_quota = long(self._get_policy(
1084
                container_node, is_account_policy=False)['quota'])
1085
            container_usage = self._get_statistics(container_node)[1]
1086
            if (container_quota > 0 and container_usage > container_quota):
1087
                # This must be executed in a transaction, so the version is
1088
                # never created if it fails.
1089
                raise QuotaError(
1090
                    'Container quota exceeded: limit: %s, usage: %s' % (
1091
                        container_quota, container_usage
1092
                    )
1093
                )
1094

    
1095
        if report_size_change:
1096
            self._report_size_change(
1097
                user, account, size_delta,
1098
                {'action': 'object update', 'path': path,
1099
                 'versions': ','.join([str(dest_version_id)])})
1100
        if permissions is not None:
1101
            self.permissions.access_set(path, permissions)
1102
            self._report_sharing_change(
1103
                user, account, path,
1104
                {'members': self.permissions.access_members(path)})
1105

    
1106
        self._report_object_change(
1107
            user, account, path,
1108
            details={'version': dest_version_id, 'action': 'object update'})
1109
        return dest_version_id
1110

    
1111
    @debug_method
1112
    def update_object_hashmap(self, user, account, container, name, size, type,
1113
                              hashmap, checksum, domain, meta=None,
1114
                              replace_meta=False, permissions=None):
1115
        """Create/update an object's hashmap and return the new version."""
1116

    
1117
        meta = meta or {}
1118
        if size == 0:  # No such thing as an empty hashmap.
1119
            hashmap = [self.put_block('')]
1120
        map = HashMap(self.block_size, self.hash_algorithm)
1121
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1122
        missing = self.store.block_search(map)
1123
        if missing:
1124
            ie = IndexError()
1125
            ie.data = [binascii.hexlify(x) for x in missing]
1126
            raise ie
1127

    
1128
        hash = map.hash()
1129
        hexlified = binascii.hexlify(hash)
1130
        # _update_object_hash() locks destination path
1131
        dest_version_id = self._update_object_hash(
1132
            user, account, container, name, size, type, hexlified, checksum,
1133
            domain, meta, replace_meta, permissions)
1134
        self.store.map_put(hash, map)
1135
        return dest_version_id, hexlified
1136

    
1137
    @debug_method
1138
    @backend_method
1139
    def update_object_checksum(self, user, account, container, name, version,
1140
                               checksum):
1141
        """Update an object's checksum."""
1142

    
1143
        # Update objects with greater version and same hashmap
1144
        # and size (fix metadata updates).
1145
        self._can_write(user, account, container, name)
1146
        path, node = self._lookup_object(account, container, name,
1147
                                         lock_container=True)
1148
        props = self._get_version(node, version)
1149
        versions = self.node.node_get_versions(node)
1150
        for x in versions:
1151
            if (x[self.SERIAL] >= int(version) and
1152
                x[self.HASH] == props[self.HASH] and
1153
                    x[self.SIZE] == props[self.SIZE]):
1154
                self.node.version_put_property(
1155
                    x[self.SERIAL], 'checksum', checksum)
1156

    
1157
    def _copy_object(self, user, src_account, src_container, src_name,
1158
                     dest_account, dest_container, dest_name, type,
1159
                     dest_domain=None, dest_meta=None, replace_meta=False,
1160
                     permissions=None, src_version=None, is_move=False,
1161
                     delimiter=None):
1162

    
1163
        report_size_change = not is_move
1164
        dest_meta = dest_meta or {}
1165
        dest_version_ids = []
1166
        self._can_read(user, src_account, src_container, src_name)
1167

    
1168
        src_container_path = '/'.join((src_account, src_container))
1169
        dest_container_path = '/'.join((dest_account, dest_container))
1170
        # Lock container paths in alphabetical order
1171
        if src_container_path < dest_container_path:
1172
            self._lookup_container(src_account, src_container)
1173
            self._lookup_container(dest_account, dest_container)
1174
        else:
1175
            self._lookup_container(dest_account, dest_container)
1176
            self._lookup_container(src_account, src_container)
1177

    
1178
        path, node = self._lookup_object(src_account, src_container, src_name)
1179
        # TODO: Will do another fetch of the properties in duplicate version...
1180
        props = self._get_version(
1181
            node, src_version)  # Check to see if source exists.
1182
        src_version_id = props[self.SERIAL]
1183
        hash = props[self.HASH]
1184
        size = props[self.SIZE]
1185
        is_copy = not is_move and (src_account, src_container, src_name) != (
1186
            dest_account, dest_container, dest_name)  # New uuid.
1187
        dest_version_ids.append(self._update_object_hash(
1188
            user, dest_account, dest_container, dest_name, size, type, hash,
1189
            None, dest_domain, dest_meta, replace_meta, permissions,
1190
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1191
            report_size_change=report_size_change))
1192
        if is_move and ((src_account, src_container, src_name) !=
1193
                        (dest_account, dest_container, dest_name)):
1194
            self._delete_object(user, src_account, src_container, src_name,
1195
                                report_size_change=report_size_change)
1196

    
1197
        if delimiter:
1198
            prefix = (src_name + delimiter if not
1199
                      src_name.endswith(delimiter) else src_name)
1200
            src_names = self._list_objects_no_limit(
1201
                user, src_account, src_container, prefix, delimiter=None,
1202
                virtual=False, domain=None, keys=[], shared=False, until=None,
1203
                size_range=None, all_props=True, public=False)
1204
            src_names.sort(key=lambda x: x[2])  # order by nodes
1205
            paths = [elem[0] for elem in src_names]
1206
            nodes = [elem[2] for elem in src_names]
1207
            # TODO: Will do another fetch of the properties
1208
            # in duplicate version...
1209
            props = self._get_versions(nodes)  # Check to see if source exists.
1210

    
1211
            for prop, path, node in zip(props, paths, nodes):
1212
                src_version_id = prop[self.SERIAL]
1213
                hash = prop[self.HASH]
1214
                vtype = prop[self.TYPE]
1215
                size = prop[self.SIZE]
1216
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1217
                    delimiter) else dest_name
1218
                vdest_name = path.replace(prefix, dest_prefix, 1)
1219
                # _update_object_hash() locks destination path
1220
                dest_version_ids.append(self._update_object_hash(
1221
                    user, dest_account, dest_container, vdest_name, size,
1222
                    vtype, hash, None, dest_domain, meta={},
1223
                    replace_meta=False, permissions=None, src_node=node,
1224
                    src_version_id=src_version_id, is_copy=is_copy,
1225
                    report_size_change=report_size_change))
1226
                if is_move and ((src_account, src_container, src_name) !=
1227
                                (dest_account, dest_container, dest_name)):
1228
                    self._delete_object(user, src_account, src_container, path,
1229
                                        report_size_change=report_size_change)
1230
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1231
                dest_version_ids)
1232

    
1233
    @debug_method
1234
    @backend_method
1235
    def copy_object(self, user, src_account, src_container, src_name,
1236
                    dest_account, dest_container, dest_name, type, domain,
1237
                    meta=None, replace_meta=False, permissions=None,
1238
                    src_version=None, delimiter=None):
1239
        """Copy an object's data and metadata."""
1240

    
1241
        meta = meta or {}
1242
        dest_version_id = self._copy_object(
1243
            user, src_account, src_container, src_name, dest_account,
1244
            dest_container, dest_name, type, domain, meta, replace_meta,
1245
            permissions, src_version, False, delimiter)
1246
        return dest_version_id
1247

    
1248
    @debug_method
1249
    @backend_method
1250
    def move_object(self, user, src_account, src_container, src_name,
1251
                    dest_account, dest_container, dest_name, type, domain,
1252
                    meta=None, replace_meta=False, permissions=None,
1253
                    delimiter=None):
1254
        """Move an object's data and metadata."""
1255

    
1256
        meta = meta or {}
1257
        if user != src_account:
1258
            raise NotAllowedError
1259
        dest_version_id = self._move_object(
1260
            user, src_account, src_container, src_name, dest_account,
1261
            dest_container, dest_name, type, domain, meta, replace_meta,
1262
            permissions, None, delimiter=delimiter)
1263
        return dest_version_id
1264

    
1265
    def _delete_object(self, user, account, container, name, until=None,
1266
                       delimiter=None, report_size_change=True):
1267
        if user != account:
1268
            raise NotAllowedError
1269

    
1270
        # lookup object and lock container path also
1271
        path, node = self._lookup_object(account, container, name,
1272
                                         lock_container=True)
1273

    
1274
        if until is not None:
1275
            if node is None:
1276
                return
1277
            hashes = []
1278
            size = 0
1279
            serials = []
1280
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1281
                                           update_statistics_ancestors_depth=1)
1282
            hashes += h
1283
            size += s
1284
            serials += v
1285
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1286
                                           update_statistics_ancestors_depth=1)
1287
            hashes += h
1288
            if not self.free_versioning:
1289
                size += s
1290
            serials += v
1291
            for h in hashes:
1292
                self.store.map_delete(h)
1293
            self.node.node_purge(node, until, CLUSTER_DELETED,
1294
                                 update_statistics_ancestors_depth=1)
1295
            try:
1296
                self._get_version(node)
1297
            except NameError:
1298
                self.permissions.access_clear(path)
1299
            self._report_size_change(
1300
                user, account, -size, {
1301
                    'action': 'object purge',
1302
                    'path': path,
1303
                    'versions': ','.join(str(i) for i in serials)
1304
                }
1305
            )
1306
            return
1307

    
1308
        if not self._exists(node):
1309
            raise ItemNotExists('Object is deleted.')
1310

    
1311
        src_version_id, dest_version_id = self._put_version_duplicate(
1312
            user, node, size=0, type='', hash=None, checksum='',
1313
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1314
        del_size = self._apply_versioning(account, container, src_version_id,
1315
                                          update_statistics_ancestors_depth=1)
1316
        if report_size_change:
1317
            self._report_size_change(
1318
                user, account, -del_size,
1319
                {'action': 'object delete',
1320
                 'path': path,
1321
                 'versions': ','.join([str(dest_version_id)])})
1322
        self._report_object_change(
1323
            user, account, path, details={'action': 'object delete'})
1324
        self.permissions.access_clear(path)
1325

    
1326
        if delimiter:
1327
            prefix = name + delimiter if not name.endswith(delimiter) else name
1328
            src_names = self._list_objects_no_limit(
1329
                user, account, container, prefix, delimiter=None,
1330
                virtual=False, domain=None, keys=[], shared=False, until=None,
1331
                size_range=None, all_props=True, public=False)
1332
            paths = []
1333
            for t in src_names:
1334
                path = '/'.join((account, container, t[0]))
1335
                node = t[2]
1336
                if not self._exists(node):
1337
                    continue
1338
                src_version_id, dest_version_id = self._put_version_duplicate(
1339
                    user, node, size=0, type='', hash=None, checksum='',
1340
                    cluster=CLUSTER_DELETED,
1341
                    update_statistics_ancestors_depth=1)
1342
                del_size = self._apply_versioning(
1343
                    account, container, src_version_id,
1344
                    update_statistics_ancestors_depth=1)
1345
                if report_size_change:
1346
                    self._report_size_change(
1347
                        user, account, -del_size,
1348
                        {'action': 'object delete',
1349
                         'path': path,
1350
                         'versions': ','.join([str(dest_version_id)])})
1351
                self._report_object_change(
1352
                    user, account, path, details={'action': 'object delete'})
1353
                paths.append(path)
1354
            self.permissions.access_clear_bulk(paths)
1355

    
1356
    @debug_method
1357
    @backend_method
1358
    def delete_object(self, user, account, container, name, until=None,
1359
                      prefix='', delimiter=None):
1360
        """Delete/purge an object."""
1361

    
1362
        self._delete_object(user, account, container, name, until, delimiter)
1363

    
1364
    @debug_method
1365
    @backend_method
1366
    def list_versions(self, user, account, container, name):
1367
        """Return a list of all object (version, version_timestamp) tuples."""
1368

    
1369
        self._can_read(user, account, container, name)
1370
        path, node = self._lookup_object(account, container, name)
1371
        versions = self.node.node_get_versions(node)
1372
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1373
                x[self.CLUSTER] != CLUSTER_DELETED]
1374

    
1375
    @debug_method
1376
    @backend_method
1377
    def get_uuid(self, user, uuid, check_permissions=True):
1378
        """Return the (account, container, name) for the UUID given."""
1379

    
1380
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1381
        if info is None:
1382
            raise NameError
1383
        path, serial = info
1384
        account, container, name = path.split('/', 2)
1385
        if check_permissions:
1386
            self._can_read(user, account, container, name)
1387
        return (account, container, name)
1388

    
1389
    @debug_method
1390
    @backend_method
1391
    def get_public(self, user, public):
1392
        """Return the (account, container, name) for the public id given."""
1393

    
1394
        path = self.permissions.public_path(public)
1395
        if path is None:
1396
            raise NameError
1397
        account, container, name = path.split('/', 2)
1398
        self._can_read(user, account, container, name)
1399
        return (account, container, name)
1400

    
1401
    def get_block(self, hash):
1402
        """Return a block's data."""
1403

    
1404
        logger.debug("get_block: %s", hash)
1405
        block = self.store.block_get(self._unhexlify_hash(hash))
1406
        if not block:
1407
            raise ItemNotExists('Block does not exist')
1408
        return block
1409

    
1410
    def put_block(self, data):
1411
        """Store a block and return the hash."""
1412

    
1413
        logger.debug("put_block: %s", len(data))
1414
        return binascii.hexlify(self.store.block_put(data))
1415

    
1416
    def update_block(self, hash, data, offset=0):
1417
        """Update a known block and return the hash."""
1418

    
1419
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1420
        if offset == 0 and len(data) == self.block_size:
1421
            return self.put_block(data)
1422
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1423
        return binascii.hexlify(h)
1424

    
1425
    # Path functions.
1426

    
1427
    def _generate_uuid(self):
1428
        return str(uuidlib.uuid4())
1429

    
1430
    def _put_object_node(self, path, parent, name):
1431
        path = '/'.join((path, name))
1432
        node = self.node.node_lookup(path)
1433
        if node is None:
1434
            node = self.node.node_create(parent, path)
1435
        return path, node
1436

    
1437
    def _put_path(self, user, parent, path,
1438
                  update_statistics_ancestors_depth=None):
1439
        node = self.node.node_create(parent, path)
1440
        self.node.version_create(node, None, 0, '', None, user,
1441
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1442
                                 update_statistics_ancestors_depth)
1443
        return node
1444

    
1445
    def _lookup_account(self, account, create=True):
1446
        node = self.node.node_lookup(account)
1447
        if node is None and create:
1448
            node = self._put_path(
1449
                account, self.ROOTNODE, account,
1450
                update_statistics_ancestors_depth=-1)  # User is account.
1451
        return account, node
1452

    
1453
    def _lookup_container(self, account, container):
1454
        for_update = True if self.lock_container_path else False
1455
        path = '/'.join((account, container))
1456
        node = self.node.node_lookup(path, for_update)
1457
        if node is None:
1458
            raise ItemNotExists('Container does not exist')
1459
        return path, node
1460

    
1461
    def _lookup_object(self, account, container, name, lock_container=False):
1462
        if lock_container:
1463
            self._lookup_container(account, container)
1464

    
1465
        path = '/'.join((account, container, name))
1466
        node = self.node.node_lookup(path)
1467
        if node is None:
1468
            raise ItemNotExists('Object does not exist')
1469
        return path, node
1470

    
1471
    def _lookup_objects(self, paths):
1472
        nodes = self.node.node_lookup_bulk(paths)
1473
        return paths, nodes
1474

    
1475
    def _get_properties(self, node, until=None):
1476
        """Return properties until the timestamp given."""
1477

    
1478
        before = until if until is not None else inf
1479
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1480
        if props is None and until is not None:
1481
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1482
        if props is None:
1483
            raise ItemNotExists('Path does not exist')
1484
        return props
1485

    
1486
    def _get_statistics(self, node, until=None, compute=False):
1487
        """Return (count, sum of size, timestamp) of everything under node."""
1488

    
1489
        if until is not None:
1490
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1491
        elif compute:
1492
            stats = self.node.statistics_latest(node,
1493
                                                except_cluster=CLUSTER_DELETED)
1494
        else:
1495
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1496
        if stats is None:
1497
            stats = (0, 0, 0)
1498
        return stats
1499

    
1500
    def _get_version(self, node, version=None):
1501
        if version is None:
1502
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1503
            if props is None:
1504
                raise ItemNotExists('Object does not exist')
1505
        else:
1506
            try:
1507
                version = int(version)
1508
            except ValueError:
1509
                raise VersionNotExists('Version does not exist')
1510
            props = self.node.version_get_properties(version, node=node)
1511
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1512
                raise VersionNotExists('Version does not exist')
1513
        return props
1514

    
1515
    def _get_versions(self, nodes):
1516
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1517

    
1518
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1519
                               type=None, hash=None, checksum=None,
1520
                               cluster=CLUSTER_NORMAL, is_copy=False,
1521
                               update_statistics_ancestors_depth=None):
1522
        """Create a new version of the node."""
1523

    
1524
        props = self.node.version_lookup(
1525
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1526
        if props is not None:
1527
            src_version_id = props[self.SERIAL]
1528
            src_hash = props[self.HASH]
1529
            src_size = props[self.SIZE]
1530
            src_type = props[self.TYPE]
1531
            src_checksum = props[self.CHECKSUM]
1532
        else:
1533
            src_version_id = None
1534
            src_hash = None
1535
            src_size = 0
1536
            src_type = ''
1537
            src_checksum = ''
1538
        if size is None:  # Set metadata.
1539
            hash = src_hash  # This way hash can be set to None
1540
                             # (account or container).
1541
            size = src_size
1542
        if type is None:
1543
            type = src_type
1544
        if checksum is None:
1545
            checksum = src_checksum
1546
        uuid = self._generate_uuid(
1547
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1548

    
1549
        if src_node is None:
1550
            pre_version_id = src_version_id
1551
        else:
1552
            pre_version_id = None
1553
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1554
            if props is not None:
1555
                pre_version_id = props[self.SERIAL]
1556
        if pre_version_id is not None:
1557
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1558
                                        update_statistics_ancestors_depth)
1559

    
1560
        dest_version_id, mtime = self.node.version_create(
1561
            node, hash, size, type, src_version_id, user, uuid, checksum,
1562
            cluster, update_statistics_ancestors_depth)
1563

    
1564
        self.node.attribute_unset_is_latest(node, dest_version_id)
1565

    
1566
        return pre_version_id, dest_version_id
1567

    
1568
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1569
                                node, meta, replace=False):
1570
        if src_version_id is not None:
1571
            self.node.attribute_copy(src_version_id, dest_version_id)
1572
        if not replace:
1573
            self.node.attribute_del(dest_version_id, domain, (
1574
                k for k, v in meta.iteritems() if v == ''))
1575
            self.node.attribute_set(dest_version_id, domain, node, (
1576
                (k, v) for k, v in meta.iteritems() if v != ''))
1577
        else:
1578
            self.node.attribute_del(dest_version_id, domain)
1579
            self.node.attribute_set(dest_version_id, domain, node, ((
1580
                k, v) for k, v in meta.iteritems()))
1581

    
1582
    def _put_metadata(self, user, node, domain, meta, replace=False,
1583
                      update_statistics_ancestors_depth=None):
1584
        """Create a new version and store metadata."""
1585

    
1586
        src_version_id, dest_version_id = self._put_version_duplicate(
1587
            user, node,
1588
            update_statistics_ancestors_depth=
1589
            update_statistics_ancestors_depth)
1590
        self._put_metadata_duplicate(
1591
            src_version_id, dest_version_id, domain, node, meta, replace)
1592
        return src_version_id, dest_version_id
1593

    
1594
    def _list_limits(self, listing, marker, limit):
1595
        start = 0
1596
        if marker:
1597
            try:
1598
                start = listing.index(marker) + 1
1599
            except ValueError:
1600
                pass
1601
        if not limit or limit > 10000:
1602
            limit = 10000
1603
        return start, limit
1604

    
1605
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1606
                                marker=None, limit=10000, virtual=True,
1607
                                domain=None, keys=None, until=None,
1608
                                size_range=None, allowed=None,
1609
                                all_props=False):
1610
        keys = keys or []
1611
        allowed = allowed or []
1612
        cont_prefix = path + '/'
1613
        prefix = cont_prefix + prefix
1614
        start = cont_prefix + marker if marker else None
1615
        before = until if until is not None else inf
1616
        filterq = keys if domain else []
1617
        sizeq = size_range
1618

    
1619
        objects, prefixes = self.node.latest_version_list(
1620
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1621
            allowed, domain, filterq, sizeq, all_props)
1622
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1623
        objects.sort(key=lambda x: x[0])
1624
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1625
        return objects
1626

    
1627
    # Reporting functions.
1628

    
1629
    @debug_method
1630
    @backend_method
1631
    def _report_size_change(self, user, account, size, details=None):
1632
        details = details or {}
1633

    
1634
        if size == 0:
1635
            return
1636

    
1637
        account_node = self._lookup_account(account, True)[1]
1638
        total = self._get_statistics(account_node, compute=True)[1]
1639
        details.update({'user': user, 'total': total})
1640
        self.messages.append(
1641
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1642
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1643

    
1644
        if not self.using_external_quotaholder:
1645
            return
1646

    
1647
        try:
1648
            name = details['path'] if 'path' in details else ''
1649
            serial = self.astakosclient.issue_one_commission(
1650
                holder=account,
1651
                source=DEFAULT_SOURCE,
1652
                provisions={'pithos.diskspace': size},
1653
                name=name)
1654
        except BaseException, e:
1655
            raise QuotaError(e)
1656
        else:
1657
            self.serials.append(serial)
1658

    
1659
    @debug_method
1660
    @backend_method
1661
    def _report_object_change(self, user, account, path, details=None):
1662
        details = details or {}
1663
        details.update({'user': user})
1664
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1665
                              account, QUEUE_INSTANCE_ID, 'object', path,
1666
                              details))
1667

    
1668
    @debug_method
1669
    @backend_method
1670
    def _report_sharing_change(self, user, account, path, details=None):
1671
        details = details or {}
1672
        details.update({'user': user})
1673
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1674
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1675
                              details))
1676

    
1677
    # Policy functions.
1678

    
1679
    def _check_policy(self, policy, is_account_policy=True):
1680
        default_policy = self.default_account_policy \
1681
            if is_account_policy else self.default_container_policy
1682
        for k in policy.keys():
1683
            if policy[k] == '':
1684
                policy[k] = default_policy.get(k)
1685
        for k, v in policy.iteritems():
1686
            if k == 'quota':
1687
                q = int(v)  # May raise ValueError.
1688
                if q < 0:
1689
                    raise ValueError
1690
            elif k == 'versioning':
1691
                if v not in ['auto', 'none']:
1692
                    raise ValueError
1693
            else:
1694
                raise ValueError
1695

    
1696
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1697
        default_policy = self.default_account_policy \
1698
            if is_account_policy else self.default_container_policy
1699
        if replace:
1700
            for k, v in default_policy.iteritems():
1701
                if k not in policy:
1702
                    policy[k] = v
1703
        self.node.policy_set(node, policy)
1704

    
1705
    def _get_policy(self, node, is_account_policy=True):
1706
        default_policy = self.default_account_policy \
1707
            if is_account_policy else self.default_container_policy
1708
        policy = default_policy.copy()
1709
        policy.update(self.node.policy_get(node))
1710
        return policy
1711

    
1712
    def _apply_versioning(self, account, container, version_id,
1713
                          update_statistics_ancestors_depth=None):
1714
        """Delete the provided version if such is the policy.
1715
           Return size of object removed.
1716
        """
1717

    
1718
        if version_id is None:
1719
            return 0
1720
        path, node = self._lookup_container(account, container)
1721
        versioning = self._get_policy(
1722
            node, is_account_policy=False)['versioning']
1723
        if versioning != 'auto':
1724
            hash, size = self.node.version_remove(
1725
                version_id, update_statistics_ancestors_depth)
1726
            self.store.map_delete(hash)
1727
            return size
1728
        elif self.free_versioning:
1729
            return self.node.version_get_properties(
1730
                version_id, keys=('size',))[0]
1731
        return 0
1732

    
1733
    # Access control functions.
1734

    
1735
    def _check_groups(self, groups):
1736
        # raise ValueError('Bad characters in groups')
1737
        pass
1738

    
1739
    def _check_permissions(self, path, permissions):
1740
        # raise ValueError('Bad characters in permissions')
1741
        pass
1742

    
1743
    def _get_formatted_paths(self, paths):
1744
        formatted = []
1745
        if len(paths) == 0:
1746
            return formatted
1747
        props = self.node.get_props(paths)
1748
        if props:
1749
            for prop in props:
1750
                if prop[1].split(';', 1)[0].strip() in (
1751
                        'application/directory', 'application/folder'):
1752
                    formatted.append((prop[0].rstrip('/') + '/',
1753
                                      self.MATCH_PREFIX))
1754
                formatted.append((prop[0], self.MATCH_EXACT))
1755
        return formatted
1756

    
1757
    def _get_permissions_path(self, account, container, name):
1758
        path = '/'.join((account, container, name))
1759
        permission_paths = self.permissions.access_inherit(path)
1760
        permission_paths.sort()
1761
        permission_paths.reverse()
1762
        for p in permission_paths:
1763
            if p == path:
1764
                return p
1765
            else:
1766
                if p.count('/') < 2:
1767
                    continue
1768
                node = self.node.node_lookup(p)
1769
                props = None
1770
                if node is not None:
1771
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1772
                if props is not None:
1773
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1774
                            'application/directory', 'application/folder'):
1775
                        return p
1776
        return None
1777

    
1778
    def _get_permissions_path_bulk(self, account, container, names):
1779
        formatted_paths = []
1780
        for name in names:
1781
            path = '/'.join((account, container, name))
1782
            formatted_paths.append(path)
1783
        permission_paths = self.permissions.access_inherit_bulk(
1784
            formatted_paths)
1785
        permission_paths.sort()
1786
        permission_paths.reverse()
1787
        permission_paths_list = []
1788
        lookup_list = []
1789
        for p in permission_paths:
1790
            if p in formatted_paths:
1791
                permission_paths_list.append(p)
1792
            else:
1793
                if p.count('/') < 2:
1794
                    continue
1795
                lookup_list.append(p)
1796

    
1797
        if len(lookup_list) > 0:
1798
            props = self.node.get_props(lookup_list)
1799
            if props:
1800
                for prop in props:
1801
                    if prop[1].split(';', 1)[0].strip() in (
1802
                            'application/directory', 'application/folder'):
1803
                        permission_paths_list.append(prop[0])
1804

    
1805
        if len(permission_paths_list) > 0:
1806
            return permission_paths_list
1807

    
1808
        return None
1809

    
1810
    def _can_read(self, user, account, container, name):
1811
        if user == account:
1812
            return True
1813
        path = '/'.join((account, container, name))
1814
        if self.permissions.public_get(path) is not None:
1815
            return True
1816
        path = self._get_permissions_path(account, container, name)
1817
        if not path:
1818
            raise NotAllowedError
1819
        if (not self.permissions.access_check(path, self.READ, user) and not
1820
                self.permissions.access_check(path, self.WRITE, user)):
1821
            raise NotAllowedError
1822

    
1823
    def _can_write(self, user, account, container, name):
1824
        if user == account:
1825
            return True
1826
        path = '/'.join((account, container, name))
1827
        path = self._get_permissions_path(account, container, name)
1828
        if not path:
1829
            raise NotAllowedError
1830
        if not self.permissions.access_check(path, self.WRITE, user):
1831
            raise NotAllowedError
1832

    
1833
    def _allowed_accounts(self, user):
1834
        allow = set()
1835
        for path in self.permissions.access_list_paths(user):
1836
            allow.add(path.split('/', 1)[0])
1837
        return sorted(allow)
1838

    
1839
    def _allowed_containers(self, user, account):
1840
        allow = set()
1841
        for path in self.permissions.access_list_paths(user, account):
1842
            allow.add(path.split('/', 2)[1])
1843
        return sorted(allow)
1844

    
1845
    # Domain functions
1846

    
1847
    @debug_method
1848
    @backend_method
1849
    def get_domain_objects(self, domain, user=None):
1850
        allowed_paths = self.permissions.access_list_paths(
1851
            user, include_owned=user is not None, include_containers=False)
1852
        if not allowed_paths:
1853
            return []
1854
        obj_list = self.node.domain_object_list(
1855
            domain, allowed_paths, CLUSTER_NORMAL)
1856
        return [(path,
1857
                 self._build_metadata(props, user_defined_meta),
1858
                 self.permissions.access_get(path)) for
1859
                path, props, user_defined_meta in obj_list]
1860

    
1861
    # util functions
1862

    
1863
    def _build_metadata(self, props, user_defined=None,
1864
                        include_user_defined=True):
1865
        meta = {'bytes': props[self.SIZE],
1866
                'type': props[self.TYPE],
1867
                'hash': props[self.HASH],
1868
                'version': props[self.SERIAL],
1869
                'version_timestamp': props[self.MTIME],
1870
                'modified_by': props[self.MUSER],
1871
                'uuid': props[self.UUID],
1872
                'checksum': props[self.CHECKSUM]}
1873
        if include_user_defined and user_defined is not None:
1874
            meta.update(user_defined)
1875
        return meta
1876

    
1877
    def _exists(self, node):
1878
        try:
1879
            self._get_version(node)
1880
        except ItemNotExists:
1881
            return False
1882
        else:
1883
            return True
1884

    
1885
    def _unhexlify_hash(self, hash):
1886
        try:
1887
            return binascii.unhexlify(hash)
1888
        except TypeError:
1889
            raise InvalidHash(hash)