Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ e5b77cde

History | View | Annotate | Download (74.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
124

    
125
logger = logging.getLogger(__name__)
126

    
127

    
128
def backend_method(func):
129
    @wraps(func)
130
    def wrapper(self, *args, **kw):
131
        # if we are inside a database transaction
132
        # just proceed with the method execution
133
        # otherwise manage a new transaction
134
        if self.in_transaction:
135
            return func(self, *args, **kw)
136

    
137
        try:
138
            self.pre_exec()
139
            result = func(self, *args, **kw)
140
            success_status = True
141
            return result
142
        except:
143
            success_status = False
144
            raise
145
        finally:
146
            self.post_exec(success_status)
147
    return wrapper
148

    
149

    
150
def debug_method(func):
151
    @wraps(func)
152
    def wrapper(self, *args, **kw):
153
        try:
154
            result = func(self, *args, **kw)
155
            return result
156
        except:
157
            result = format_exc()
158
            raise
159
        finally:
160
            all_args = map(repr, args)
161
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
162
            logger.debug(">>> %s(%s) <<< %s" % (
163
                func.__name__, ', '.join(all_args).rstrip(', '), result))
164
    return wrapper
165

    
166

    
167
def list_method(func):
168
    @wraps(func)
169
    def wrapper(self, *args, **kw):
170
        marker = kw.get('marker')
171
        limit = kw.get('limit')
172
        result = func(self, *args, **kw)
173
        start, limit = self._list_limits(result, marker, limit)
174
        return result[start:start + limit]
175
    return wrapper
176

    
177

    
178
class ModularBackend(BaseBackend):
179
    """A modular backend.
180

181
    Uses modules for SQL functions and storage.
182
    """
183

    
184
    def __init__(self, db_module=None, db_connection=None,
185
                 block_module=None, block_path=None, block_umask=None,
186
                 block_size=None, hash_algorithm=None,
187
                 queue_module=None, queue_hosts=None, queue_exchange=None,
188
                 astakos_auth_url=None, service_token=None,
189
                 astakosclient_poolsize=None,
190
                 free_versioning=True, block_params=None,
191
                 public_url_security=None,
192
                 public_url_alphabet=None,
193
                 account_quota_policy=None,
194
                 container_quota_policy=None,
195
                 container_versioning_policy=None):
196
        db_module = db_module or DEFAULT_DB_MODULE
197
        db_connection = db_connection or DEFAULT_DB_CONNECTION
198
        block_module = block_module or DEFAULT_BLOCK_MODULE
199
        block_path = block_path or DEFAULT_BLOCK_PATH
200
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
201
        block_params = block_params or DEFAULT_BLOCK_PARAMS
202
        block_size = block_size or DEFAULT_BLOCK_SIZE
203
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
204
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
205
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
206
        container_quota_policy = container_quota_policy \
207
            or DEFAULT_CONTAINER_QUOTA
208
        container_versioning_policy = container_versioning_policy \
209
            or DEFAULT_CONTAINER_VERSIONING
210

    
211
        self.default_account_policy = {'quota': account_quota_policy}
212
        self.default_container_policy = {
213
            'quota': container_quota_policy,
214
            'versioning': container_versioning_policy
215
        }
216
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
217
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
218

    
219
        self.public_url_security = (public_url_security or
220
                                    DEFAULT_PUBLIC_URL_SECURITY)
221
        self.public_url_alphabet = (public_url_alphabet or
222
                                    DEFAULT_PUBLIC_URL_ALPHABET)
223

    
224
        self.hash_algorithm = hash_algorithm
225
        self.block_size = block_size
226
        self.free_versioning = free_versioning
227

    
228
        def load_module(m):
229
            __import__(m)
230
            return sys.modules[m]
231

    
232
        self.db_module = load_module(db_module)
233
        self.wrapper = self.db_module.DBWrapper(db_connection)
234
        params = {'wrapper': self.wrapper}
235
        self.permissions = self.db_module.Permissions(**params)
236
        self.config = self.db_module.Config(**params)
237
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
238
        for x in ['READ', 'WRITE']:
239
            setattr(self, x, getattr(self.db_module, x))
240
        self.node = self.db_module.Node(**params)
241
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
242
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
243
                  'MATCH_PREFIX', 'MATCH_EXACT']:
244
            setattr(self, x, getattr(self.db_module, x))
245

    
246
        self.ALLOWED = ['read', 'write']
247

    
248
        self.block_module = load_module(block_module)
249
        self.block_params = block_params
250
        params = {'path': block_path,
251
                  'block_size': self.block_size,
252
                  'hash_algorithm': self.hash_algorithm,
253
                  'umask': block_umask}
254
        params.update(self.block_params)
255
        self.store = self.block_module.Store(**params)
256

    
257
        if queue_module and queue_hosts:
258
            self.queue_module = load_module(queue_module)
259
            params = {'hosts': queue_hosts,
260
                      'exchange': queue_exchange,
261
                      'client_id': QUEUE_CLIENT_ID}
262
            self.queue = self.queue_module.Queue(**params)
263
        else:
264
            class NoQueue:
265
                def send(self, *args):
266
                    pass
267

    
268
                def close(self):
269
                    pass
270

    
271
            self.queue = NoQueue()
272

    
273
        self.astakos_auth_url = astakos_auth_url
274
        self.service_token = service_token
275

    
276
        if not astakos_auth_url or not AstakosClient:
277
            self.astakosclient = DisabledAstakosClient(
278
                service_token, astakos_auth_url,
279
                use_pool=True,
280
                pool_size=astakosclient_poolsize)
281
        else:
282
            self.astakosclient = AstakosClient(
283
                service_token, astakos_auth_url,
284
                use_pool=True,
285
                pool_size=astakosclient_poolsize)
286

    
287
        self.serials = []
288
        self.messages = []
289

    
290
        self._move_object = partial(self._copy_object, is_move=True)
291

    
292
        self.lock_container_path = False
293

    
294
        self.in_transaction = False
295

    
296
    def pre_exec(self, lock_container_path=False):
297
        self.lock_container_path = lock_container_path
298
        self.wrapper.execute()
299
        self.serials = []
300
        self.in_transaction = True
301

    
302
    def post_exec(self, success_status=True):
303
        if success_status:
304
            # send messages produced
305
            for m in self.messages:
306
                self.queue.send(*m)
307

    
308
            # register serials
309
            if self.serials:
310
                self.commission_serials.insert_many(
311
                    self.serials)
312

    
313
                # commit to ensure that the serials are registered
314
                # even if resolve commission fails
315
                self.wrapper.commit()
316

    
317
                # start new transaction
318
                self.wrapper.execute()
319

    
320
                r = self.astakosclient.resolve_commissions(
321
                    accept_serials=self.serials,
322
                    reject_serials=[])
323
                self.commission_serials.delete_many(
324
                    r['accepted'])
325

    
326
            self.wrapper.commit()
327
        else:
328
            if self.serials:
329
                r = self.astakosclient.resolve_commissions(
330
                    accept_serials=[],
331
                    reject_serials=self.serials)
332
                self.commission_serials.delete_many(
333
                    r['rejected'])
334
            self.wrapper.rollback()
335
        self.in_transaction = False
336

    
337
    def close(self):
338
        self.wrapper.close()
339
        self.queue.close()
340

    
341
    @property
342
    def using_external_quotaholder(self):
343
        return not isinstance(self.astakosclient, DisabledAstakosClient)
344

    
345
    @debug_method
346
    @backend_method
347
    @list_method
348
    def list_accounts(self, user, marker=None, limit=10000):
349
        """Return a list of accounts the user can access."""
350

    
351
        return self._allowed_accounts(user)
352

    
353
    def _get_account_quotas(self, account):
354
        """Get account usage from astakos."""
355

    
356
        quotas = self.astakosclient.service_get_quotas(account)[account]
357
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
358
                                                  {})
359

    
360
    @debug_method
361
    @backend_method
362
    def get_account_meta(self, user, account, domain, until=None,
363
                         include_user_defined=True):
364
        """Return a dictionary with the account metadata for the domain."""
365

    
366
        path, node = self._lookup_account(account, user == account)
367
        if user != account:
368
            if until or (node is None) or (account not
369
                                           in self._allowed_accounts(user)):
370
                raise NotAllowedError
371
        try:
372
            props = self._get_properties(node, until)
373
            mtime = props[self.MTIME]
374
        except NameError:
375
            props = None
376
            mtime = until
377
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
378
        tstamp = max(tstamp, mtime)
379
        if until is None:
380
            modified = tstamp
381
        else:
382
            modified = self._get_statistics(
383
                node, compute=True)[2]  # Overall last modification.
384
            modified = max(modified, mtime)
385

    
386
        if user != account:
387
            meta = {'name': account}
388
        else:
389
            meta = {}
390
            if props is not None and include_user_defined:
391
                meta.update(
392
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
393
            if until is not None:
394
                meta.update({'until_timestamp': tstamp})
395
            meta.update({'name': account, 'count': count, 'bytes': bytes})
396
            if self.using_external_quotaholder:
397
                external_quota = self._get_account_quotas(account)
398
                meta['bytes'] = external_quota.get('usage', 0)
399
        meta.update({'modified': modified})
400
        return meta
401

    
402
    @debug_method
403
    @backend_method
404
    def update_account_meta(self, user, account, domain, meta, replace=False):
405
        """Update the metadata associated with the account for the domain."""
406

    
407
        if user != account:
408
            raise NotAllowedError
409
        path, node = self._lookup_account(account, True)
410
        self._put_metadata(user, node, domain, meta, replace,
411
                           update_statistics_ancestors_depth=-1)
412

    
413
    @debug_method
414
    @backend_method
415
    def get_account_groups(self, user, account):
416
        """Return a dictionary with the user groups defined for the account."""
417

    
418
        if user != account:
419
            if account not in self._allowed_accounts(user):
420
                raise NotAllowedError
421
            return {}
422
        self._lookup_account(account, True)
423
        return self.permissions.group_dict(account)
424

    
425
    @debug_method
426
    @backend_method
427
    def update_account_groups(self, user, account, groups, replace=False):
428
        """Update the groups associated with the account."""
429

    
430
        if user != account:
431
            raise NotAllowedError
432
        self._lookup_account(account, True)
433
        self._check_groups(groups)
434
        if replace:
435
            self.permissions.group_destroy(account)
436
        for k, v in groups.iteritems():
437
            if not replace:  # If not already deleted.
438
                self.permissions.group_delete(account, k)
439
            if v:
440
                self.permissions.group_addmany(account, k, v)
441

    
442
    @debug_method
443
    @backend_method
444
    def get_account_policy(self, user, account):
445
        """Return a dictionary with the account policy."""
446

    
447
        if user != account:
448
            if account not in self._allowed_accounts(user):
449
                raise NotAllowedError
450
            return {}
451
        path, node = self._lookup_account(account, True)
452
        policy = self._get_policy(node, is_account_policy=True)
453
        if self.using_external_quotaholder:
454
            external_quota = self._get_account_quotas(account)
455
            policy['quota'] = external_quota.get('limit', 0)
456
        return policy
457

    
458
    @debug_method
459
    @backend_method
460
    def update_account_policy(self, user, account, policy, replace=False):
461
        """Update the policy associated with the account."""
462

    
463
        if user != account:
464
            raise NotAllowedError
465
        path, node = self._lookup_account(account, True)
466
        self._check_policy(policy, is_account_policy=True)
467
        self._put_policy(node, policy, replace, is_account_policy=True)
468

    
469
    @debug_method
470
    @backend_method
471
    def put_account(self, user, account, policy=None):
472
        """Create a new account with the given name."""
473

    
474
        policy = policy or {}
475
        if user != account:
476
            raise NotAllowedError
477
        node = self.node.node_lookup(account)
478
        if node is not None:
479
            raise AccountExists('Account already exists')
480
        if policy:
481
            self._check_policy(policy, is_account_policy=True)
482
        node = self._put_path(user, self.ROOTNODE, account,
483
                              update_statistics_ancestors_depth=-1)
484
        self._put_policy(node, policy, True, is_account_policy=True)
485

    
486
    @debug_method
487
    @backend_method
488
    def delete_account(self, user, account):
489
        """Delete the account with the given name."""
490

    
491
        if user != account:
492
            raise NotAllowedError
493
        node = self.node.node_lookup(account)
494
        if node is None:
495
            return
496
        if not self.node.node_remove(node,
497
                                     update_statistics_ancestors_depth=-1):
498
            raise AccountNotEmpty('Account is not empty')
499
        self.permissions.group_destroy(account)
500

    
501
    @debug_method
502
    @backend_method
503
    @list_method
504
    def list_containers(self, user, account, marker=None, limit=10000,
505
                        shared=False, until=None, public=False):
506
        """Return a list of containers existing under an account."""
507

    
508
        if user != account:
509
            if until or account not in self._allowed_accounts(user):
510
                raise NotAllowedError
511
            return self._allowed_containers(user, account)
512
        if shared or public:
513
            allowed = set()
514
            if shared:
515
                allowed.update([x.split('/', 2)[1] for x in
516
                               self.permissions.access_list_shared(account)])
517
            if public:
518
                allowed.update([x[0].split('/', 2)[1] for x in
519
                               self.permissions.public_list(account)])
520
            return sorted(allowed)
521
        node = self.node.node_lookup(account)
522
        return [x[0] for x in self._list_object_properties(
523
            node, account, '', '/', marker, limit, False, None, [], until)]
524

    
525
    @debug_method
526
    @backend_method
527
    def list_container_meta(self, user, account, container, domain,
528
                            until=None):
529
        """Return a list of the container's object meta keys for a domain."""
530

    
531
        allowed = []
532
        if user != account:
533
            if until:
534
                raise NotAllowedError
535
            allowed = self.permissions.access_list_paths(
536
                user, '/'.join((account, container)))
537
            if not allowed:
538
                raise NotAllowedError
539
        path, node = self._lookup_container(account, container)
540
        before = until if until is not None else inf
541
        allowed = self._get_formatted_paths(allowed)
542
        return self.node.latest_attribute_keys(node, domain, before,
543
                                               CLUSTER_DELETED, allowed)
544

    
545
    @debug_method
546
    @backend_method
547
    def get_container_meta(self, user, account, container, domain, until=None,
548
                           include_user_defined=True):
549
        """Return a dictionary with the container metadata for the domain."""
550

    
551
        if user != account:
552
            if until or container not in self._allowed_containers(user,
553
                                                                  account):
554
                raise NotAllowedError
555
        path, node = self._lookup_container(account, container)
556
        props = self._get_properties(node, until)
557
        mtime = props[self.MTIME]
558
        count, bytes, tstamp = self._get_statistics(node, until)
559
        tstamp = max(tstamp, mtime)
560
        if until is None:
561
            modified = tstamp
562
        else:
563
            modified = self._get_statistics(
564
                node)[2]  # Overall last modification.
565
            modified = max(modified, mtime)
566

    
567
        if user != account:
568
            meta = {'name': container}
569
        else:
570
            meta = {}
571
            if include_user_defined:
572
                meta.update(
573
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
574
            if until is not None:
575
                meta.update({'until_timestamp': tstamp})
576
            meta.update({'name': container, 'count': count, 'bytes': bytes})
577
        meta.update({'modified': modified})
578
        return meta
579

    
580
    @debug_method
581
    @backend_method
582
    def update_container_meta(self, user, account, container, domain, meta,
583
                              replace=False):
584
        """Update the metadata associated with the container for the domain."""
585

    
586
        if user != account:
587
            raise NotAllowedError
588
        path, node = self._lookup_container(account, container)
589
        src_version_id, dest_version_id = self._put_metadata(
590
            user, node, domain, meta, replace,
591
            update_statistics_ancestors_depth=0)
592
        if src_version_id is not None:
593
            versioning = self._get_policy(
594
                node, is_account_policy=False)['versioning']
595
            if versioning != 'auto':
596
                self.node.version_remove(src_version_id,
597
                                         update_statistics_ancestors_depth=0)
598

    
599
    @debug_method
600
    @backend_method
601
    def get_container_policy(self, user, account, container):
602
        """Return a dictionary with the container policy."""
603

    
604
        if user != account:
605
            if container not in self._allowed_containers(user, account):
606
                raise NotAllowedError
607
            return {}
608
        path, node = self._lookup_container(account, container)
609
        return self._get_policy(node, is_account_policy=False)
610

    
611
    @debug_method
612
    @backend_method
613
    def update_container_policy(self, user, account, container, policy,
614
                                replace=False):
615
        """Update the policy associated with the container."""
616

    
617
        if user != account:
618
            raise NotAllowedError
619
        path, node = self._lookup_container(account, container)
620
        self._check_policy(policy, is_account_policy=False)
621
        self._put_policy(node, policy, replace, is_account_policy=False)
622

    
623
    @debug_method
624
    @backend_method
625
    def put_container(self, user, account, container, policy=None):
626
        """Create a new container with the given name."""
627

    
628
        policy = policy or {}
629
        if user != account:
630
            raise NotAllowedError
631
        try:
632
            path, node = self._lookup_container(account, container)
633
        except NameError:
634
            pass
635
        else:
636
            raise ContainerExists('Container already exists')
637
        if policy:
638
            self._check_policy(policy, is_account_policy=False)
639
        path = '/'.join((account, container))
640
        node = self._put_path(
641
            user, self._lookup_account(account, True)[1], path,
642
            update_statistics_ancestors_depth=-1)
643
        self._put_policy(node, policy, True, is_account_policy=False)
644

    
645
    @debug_method
646
    @backend_method
647
    def delete_container(self, user, account, container, until=None, prefix='',
648
                         delimiter=None):
649
        """Delete/purge the container with the given name."""
650

    
651
        if user != account:
652
            raise NotAllowedError
653
        path, node = self._lookup_container(account, container)
654

    
655
        if until is not None:
656
            hashes, size, serials = self.node.node_purge_children(
657
                node, until, CLUSTER_HISTORY,
658
                update_statistics_ancestors_depth=0)
659
            for h in hashes:
660
                self.store.map_delete(h)
661
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
662
                                          update_statistics_ancestors_depth=0)
663
            if not self.free_versioning:
664
                self._report_size_change(
665
                    user, account, -size, {
666
                        'action': 'container purge',
667
                        'path': path,
668
                        'versions': ','.join(str(i) for i in serials)
669
                    }
670
                )
671
            return
672

    
673
        if not delimiter:
674
            if self._get_statistics(node)[0] > 0:
675
                raise ContainerNotEmpty('Container is not empty')
676
            hashes, size, serials = self.node.node_purge_children(
677
                node, inf, CLUSTER_HISTORY,
678
                update_statistics_ancestors_depth=0)
679
            for h in hashes:
680
                self.store.map_delete(h)
681
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
682
                                          update_statistics_ancestors_depth=0)
683
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
684
            if not self.free_versioning:
685
                self._report_size_change(
686
                    user, account, -size, {
687
                        'action': 'container purge',
688
                        'path': path,
689
                        'versions': ','.join(str(i) for i in serials)
690
                    }
691
                )
692
        else:
693
            # remove only contents
694
            src_names = self._list_objects_no_limit(
695
                user, account, container, prefix='', delimiter=None,
696
                virtual=False, domain=None, keys=[], shared=False, until=None,
697
                size_range=None, all_props=True, public=False)
698
            paths = []
699
            for t in src_names:
700
                path = '/'.join((account, container, t[0]))
701
                node = t[2]
702
                if not self._exists(node):
703
                    continue
704
                src_version_id, dest_version_id = self._put_version_duplicate(
705
                    user, node, size=0, type='', hash=None, checksum='',
706
                    cluster=CLUSTER_DELETED,
707
                    update_statistics_ancestors_depth=1)
708
                del_size = self._apply_versioning(
709
                    account, container, src_version_id,
710
                    update_statistics_ancestors_depth=1)
711
                self._report_size_change(
712
                    user, account, -del_size, {
713
                        'action': 'object delete',
714
                        'path': path,
715
                        'versions': ','.join([str(dest_version_id)])})
716
                self._report_object_change(
717
                    user, account, path, details={'action': 'object delete'})
718
                paths.append(path)
719
            self.permissions.access_clear_bulk(paths)
720

    
721
    def _list_objects(self, user, account, container, prefix, delimiter,
722
                      marker, limit, virtual, domain, keys, shared, until,
723
                      size_range, all_props, public):
724
        if user != account and until:
725
            raise NotAllowedError
726
        if shared and public:
727
            # get shared first
728
            shared_paths = self._list_object_permissions(
729
                user, account, container, prefix, shared=True, public=False)
730
            objects = set()
731
            if shared_paths:
732
                path, node = self._lookup_container(account, container)
733
                shared_paths = self._get_formatted_paths(shared_paths)
734
                objects |= set(self._list_object_properties(
735
                    node, path, prefix, delimiter, marker, limit, virtual,
736
                    domain, keys, until, size_range, shared_paths, all_props))
737

    
738
            # get public
739
            objects |= set(self._list_public_object_properties(
740
                user, account, container, prefix, all_props))
741
            objects = list(objects)
742

    
743
            objects.sort(key=lambda x: x[0])
744
            start, limit = self._list_limits(
745
                [x[0] for x in objects], marker, limit)
746
            return objects[start:start + limit]
747
        elif public:
748
            objects = self._list_public_object_properties(
749
                user, account, container, prefix, all_props)
750
            start, limit = self._list_limits(
751
                [x[0] for x in objects], marker, limit)
752
            return objects[start:start + limit]
753

    
754
        allowed = self._list_object_permissions(
755
            user, account, container, prefix, shared, public)
756
        if shared and not allowed:
757
            return []
758
        path, node = self._lookup_container(account, container)
759
        allowed = self._get_formatted_paths(allowed)
760
        objects = self._list_object_properties(
761
            node, path, prefix, delimiter, marker, limit, virtual, domain,
762
            keys, until, size_range, allowed, all_props)
763
        start, limit = self._list_limits(
764
            [x[0] for x in objects], marker, limit)
765
        return objects[start:start + limit]
766

    
767
    def _list_public_object_properties(self, user, account, container, prefix,
768
                                       all_props):
769
        public = self._list_object_permissions(
770
            user, account, container, prefix, shared=False, public=True)
771
        paths, nodes = self._lookup_objects(public)
772
        path = '/'.join((account, container))
773
        cont_prefix = path + '/'
774
        paths = [x[len(cont_prefix):] for x in paths]
775
        objects = [(p,) + props for p, props in
776
                   zip(paths, self.node.version_lookup_bulk(
777
                       nodes, all_props=all_props, order_by_path=True))]
778
        return objects
779

    
780
    def _list_objects_no_limit(self, user, account, container, prefix,
781
                               delimiter, virtual, domain, keys, shared, until,
782
                               size_range, all_props, public):
783
        objects = []
784
        while True:
785
            marker = objects[-1] if objects else None
786
            limit = 10000
787
            l = self._list_objects(
788
                user, account, container, prefix, delimiter, marker, limit,
789
                virtual, domain, keys, shared, until, size_range, all_props,
790
                public)
791
            objects.extend(l)
792
            if not l or len(l) < limit:
793
                break
794
        return objects
795

    
796
    def _list_object_permissions(self, user, account, container, prefix,
797
                                 shared, public):
798
        allowed = []
799
        path = '/'.join((account, container, prefix)).rstrip('/')
800
        if user != account:
801
            allowed = self.permissions.access_list_paths(user, path)
802
            if not allowed:
803
                raise NotAllowedError
804
        else:
805
            allowed = set()
806
            if shared:
807
                allowed.update(self.permissions.access_list_shared(path))
808
            if public:
809
                allowed.update(
810
                    [x[0] for x in self.permissions.public_list(path)])
811
            allowed = sorted(allowed)
812
            if not allowed:
813
                return []
814
        return allowed
815

    
816
    @debug_method
817
    @backend_method
818
    def list_objects(self, user, account, container, prefix='', delimiter=None,
819
                     marker=None, limit=10000, virtual=True, domain=None,
820
                     keys=None, shared=False, until=None, size_range=None,
821
                     public=False):
822
        """List (object name, object version_id) under a container."""
823

    
824
        keys = keys or []
825
        return self._list_objects(
826
            user, account, container, prefix, delimiter, marker, limit,
827
            virtual, domain, keys, shared, until, size_range, False, public)
828

    
829
    @debug_method
830
    @backend_method
831
    def list_object_meta(self, user, account, container, prefix='',
832
                         delimiter=None, marker=None, limit=10000,
833
                         virtual=True, domain=None, keys=None, shared=False,
834
                         until=None, size_range=None, public=False):
835
        """Return a list of metadata dicts of objects under a container."""
836

    
837
        keys = keys or []
838
        props = self._list_objects(
839
            user, account, container, prefix, delimiter, marker, limit,
840
            virtual, domain, keys, shared, until, size_range, True, public)
841
        objects = []
842
        for p in props:
843
            if len(p) == 2:
844
                objects.append({'subdir': p[0]})
845
            else:
846
                objects.append({
847
                    'name': p[0],
848
                    'bytes': p[self.SIZE + 1],
849
                    'type': p[self.TYPE + 1],
850
                    'hash': p[self.HASH + 1],
851
                    'version': p[self.SERIAL + 1],
852
                    'version_timestamp': p[self.MTIME + 1],
853
                    'modified': p[self.MTIME + 1] if until is None else None,
854
                    'modified_by': p[self.MUSER + 1],
855
                    'uuid': p[self.UUID + 1],
856
                    'checksum': p[self.CHECKSUM + 1]})
857
        return objects
858

    
859
    @debug_method
860
    @backend_method
861
    def list_object_permissions(self, user, account, container, prefix=''):
862
        """Return a list of paths enforce permissions under a container."""
863

    
864
        return self._list_object_permissions(user, account, container, prefix,
865
                                             True, False)
866

    
867
    @debug_method
868
    @backend_method
869
    def list_object_public(self, user, account, container, prefix=''):
870
        """Return a mapping of object paths to public ids under a container."""
871

    
872
        public = {}
873
        for path, p in self.permissions.public_list('/'.join((account,
874
                                                              container,
875
                                                              prefix))):
876
            public[path] = p
877
        return public
878

    
879
    @debug_method
880
    @backend_method
881
    def get_object_meta(self, user, account, container, name, domain,
882
                        version=None, include_user_defined=True):
883
        """Return a dictionary with the object metadata for the domain."""
884

    
885
        self._can_read(user, account, container, name)
886
        path, node = self._lookup_object(account, container, name)
887
        props = self._get_version(node, version)
888
        if version is None:
889
            modified = props[self.MTIME]
890
        else:
891
            try:
892
                modified = self._get_version(
893
                    node)[self.MTIME]  # Overall last modification.
894
            except NameError:  # Object may be deleted.
895
                del_props = self.node.version_lookup(
896
                    node, inf, CLUSTER_DELETED)
897
                if del_props is None:
898
                    raise ItemNotExists('Object does not exist')
899
                modified = del_props[self.MTIME]
900

    
901
        meta = {}
902
        if include_user_defined:
903
            meta.update(
904
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
905
        meta.update({'name': name,
906
                     'bytes': props[self.SIZE],
907
                     'type': props[self.TYPE],
908
                     'hash': props[self.HASH],
909
                     'version': props[self.SERIAL],
910
                     'version_timestamp': props[self.MTIME],
911
                     'modified': modified,
912
                     'modified_by': props[self.MUSER],
913
                     'uuid': props[self.UUID],
914
                     'checksum': props[self.CHECKSUM]})
915
        return meta
916

    
917
    @debug_method
918
    @backend_method
919
    def update_object_meta(self, user, account, container, name, domain, meta,
920
                           replace=False):
921
        """Update object metadata for a domain and return the new version."""
922

    
923
        self._can_write(user, account, container, name)
924

    
925
        path, node = self._lookup_object(account, container, name,
926
                                         lock_container=True)
927
        src_version_id, dest_version_id = self._put_metadata(
928
            user, node, domain, meta, replace,
929
            update_statistics_ancestors_depth=1)
930
        self._apply_versioning(account, container, src_version_id,
931
                               update_statistics_ancestors_depth=1)
932
        return dest_version_id
933

    
934
    @debug_method
935
    @backend_method
936
    def get_object_permissions_bulk(self, user, account, container, names):
937
        """Return the action allowed on the object, the path
938
        from which the object gets its permissions from,
939
        along with a dictionary containing the permissions."""
940

    
941
        permissions_path = self._get_permissions_path_bulk(account, container,
942
                                                           names)
943
        access_objects = self.permissions.access_check_bulk(permissions_path,
944
                                                            user)
945
        #group_parents = access_objects['group_parents']
946
        nobject_permissions = {}
947
        cpath = '/'.join((account, container, ''))
948
        cpath_idx = len(cpath)
949
        for path in permissions_path:
950
            allowed = 1
951
            name = path[cpath_idx:]
952
            if user != account:
953
                try:
954
                    allowed = access_objects[path]
955
                except KeyError:
956
                    raise NotAllowedError
957
            access_dict, allowed = \
958
                self.permissions.access_get_for_bulk(access_objects[path])
959
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
960
                                         access_dict)
961
        self._lookup_objects(permissions_path)
962
        return nobject_permissions
963

    
964
    @debug_method
965
    @backend_method
966
    def get_object_permissions(self, user, account, container, name):
967
        """Return the action allowed on the object, the path
968
        from which the object gets its permissions from,
969
        along with a dictionary containing the permissions."""
970

    
971
        allowed = 'write'
972
        permissions_path = self._get_permissions_path(account, container, name)
973
        if user != account:
974
            if self.permissions.access_check(permissions_path, self.WRITE,
975
                                             user):
976
                allowed = 'write'
977
            elif self.permissions.access_check(permissions_path, self.READ,
978
                                               user):
979
                allowed = 'read'
980
            else:
981
                raise NotAllowedError
982
        self._lookup_object(account, container, name)
983
        return (allowed,
984
                permissions_path,
985
                self.permissions.access_get(permissions_path))
986

    
987
    @debug_method
988
    @backend_method
989
    def update_object_permissions(self, user, account, container, name,
990
                                  permissions):
991
        """Update the permissions associated with the object."""
992

    
993
        if user != account:
994
            raise NotAllowedError
995
        path = self._lookup_object(account, container, name,
996
                                   lock_container=True)[0]
997
        self._check_permissions(path, permissions)
998
        try:
999
            self.permissions.access_set(path, permissions)
1000
        except:
1001
            raise ValueError
1002
        else:
1003
            self._report_sharing_change(user, account, path, {'members':
1004
                                        self.permissions.access_members(path)})
1005

    
1006
    @debug_method
1007
    @backend_method
1008
    def get_object_public(self, user, account, container, name):
1009
        """Return the public id of the object if applicable."""
1010

    
1011
        self._can_read(user, account, container, name)
1012
        path = self._lookup_object(account, container, name)[0]
1013
        p = self.permissions.public_get(path)
1014
        return p
1015

    
1016
    @debug_method
1017
    @backend_method
1018
    def update_object_public(self, user, account, container, name, public):
1019
        """Update the public status of the object."""
1020

    
1021
        self._can_write(user, account, container, name)
1022
        path = self._lookup_object(account, container, name,
1023
                                   lock_container=True)[0]
1024
        if not public:
1025
            self.permissions.public_unset(path)
1026
        else:
1027
            self.permissions.public_set(
1028
                path, self.public_url_security, self.public_url_alphabet)
1029

    
1030
    @debug_method
1031
    @backend_method
1032
    def get_object_hashmap(self, user, account, container, name, version=None):
1033
        """Return the object's size and a list with partial hashes."""
1034

    
1035
        self._can_read(user, account, container, name)
1036
        path, node = self._lookup_object(account, container, name)
1037
        props = self._get_version(node, version)
1038
        if props[self.HASH] is None:
1039
            return 0, ()
1040
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1041
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1042

    
1043
    def _update_object_hash(self, user, account, container, name, size, type,
1044
                            hash, checksum, domain, meta, replace_meta,
1045
                            permissions, src_node=None, src_version_id=None,
1046
                            is_copy=False, report_size_change=True):
1047
        if permissions is not None and user != account:
1048
            raise NotAllowedError
1049
        self._can_write(user, account, container, name)
1050
        if permissions is not None:
1051
            path = '/'.join((account, container, name))
1052
            self._check_permissions(path, permissions)
1053

    
1054
        account_path, account_node = self._lookup_account(account, True)
1055
        container_path, container_node = self._lookup_container(
1056
            account, container)
1057

    
1058
        path, node = self._put_object_node(
1059
            container_path, container_node, name)
1060
        pre_version_id, dest_version_id = self._put_version_duplicate(
1061
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1062
            checksum=checksum, is_copy=is_copy,
1063
            update_statistics_ancestors_depth=1)
1064

    
1065
        # Handle meta.
1066
        if src_version_id is None:
1067
            src_version_id = pre_version_id
1068
        self._put_metadata_duplicate(
1069
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1070

    
1071
        del_size = self._apply_versioning(account, container, pre_version_id,
1072
                                          update_statistics_ancestors_depth=1)
1073
        size_delta = size - del_size
1074
        if size_delta > 0:
1075
            # Check account quota.
1076
            if not self.using_external_quotaholder:
1077
                account_quota = long(self._get_policy(
1078
                    account_node, is_account_policy=True)['quota'])
1079
                account_usage = self._get_statistics(account_node,
1080
                                                     compute=True)[1]
1081
                if (account_quota > 0 and account_usage > account_quota):
1082
                    raise QuotaError(
1083
                        'Account quota exceeded: limit: %s, usage: %s' % (
1084
                            account_quota, account_usage))
1085

    
1086
            # Check container quota.
1087
            container_quota = long(self._get_policy(
1088
                container_node, is_account_policy=False)['quota'])
1089
            container_usage = self._get_statistics(container_node)[1]
1090
            if (container_quota > 0 and container_usage > container_quota):
1091
                # This must be executed in a transaction, so the version is
1092
                # never created if it fails.
1093
                raise QuotaError(
1094
                    'Container quota exceeded: limit: %s, usage: %s' % (
1095
                        container_quota, container_usage
1096
                    )
1097
                )
1098

    
1099
        if report_size_change:
1100
            self._report_size_change(
1101
                user, account, size_delta,
1102
                {'action': 'object update', 'path': path,
1103
                 'versions': ','.join([str(dest_version_id)])})
1104
        if permissions is not None:
1105
            self.permissions.access_set(path, permissions)
1106
            self._report_sharing_change(
1107
                user, account, path,
1108
                {'members': self.permissions.access_members(path)})
1109

    
1110
        self._report_object_change(
1111
            user, account, path,
1112
            details={'version': dest_version_id, 'action': 'object update'})
1113
        return dest_version_id
1114

    
1115
    @debug_method
1116
    def update_object_hashmap(self, user, account, container, name, size, type,
1117
                              hashmap, checksum, domain, meta=None,
1118
                              replace_meta=False, permissions=None):
1119
        """Create/update an object's hashmap and return the new version."""
1120

    
1121
        meta = meta or {}
1122
        if size == 0:  # No such thing as an empty hashmap.
1123
            hashmap = [self.put_block('')]
1124
        map = HashMap(self.block_size, self.hash_algorithm)
1125
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1126
        missing = self.store.block_search(map)
1127
        if missing:
1128
            ie = IndexError()
1129
            ie.data = [binascii.hexlify(x) for x in missing]
1130
            raise ie
1131

    
1132
        hash = map.hash()
1133
        hexlified = binascii.hexlify(hash)
1134
        # _update_object_hash() locks destination path
1135
        dest_version_id = self._update_object_hash(
1136
            user, account, container, name, size, type, hexlified, checksum,
1137
            domain, meta, replace_meta, permissions)
1138
        self.store.map_put(hash, map)
1139
        return dest_version_id, hexlified
1140

    
1141
    @debug_method
1142
    @backend_method
1143
    def update_object_checksum(self, user, account, container, name, version,
1144
                               checksum):
1145
        """Update an object's checksum."""
1146

    
1147
        # Update objects with greater version and same hashmap
1148
        # and size (fix metadata updates).
1149
        self._can_write(user, account, container, name)
1150
        path, node = self._lookup_object(account, container, name,
1151
                                         lock_container=True)
1152
        props = self._get_version(node, version)
1153
        versions = self.node.node_get_versions(node)
1154
        for x in versions:
1155
            if (x[self.SERIAL] >= int(version) and
1156
                x[self.HASH] == props[self.HASH] and
1157
                    x[self.SIZE] == props[self.SIZE]):
1158
                self.node.version_put_property(
1159
                    x[self.SERIAL], 'checksum', checksum)
1160

    
1161
    def _copy_object(self, user, src_account, src_container, src_name,
1162
                     dest_account, dest_container, dest_name, type,
1163
                     dest_domain=None, dest_meta=None, replace_meta=False,
1164
                     permissions=None, src_version=None, is_move=False,
1165
                     delimiter=None):
1166

    
1167
        report_size_change = not is_move
1168
        dest_meta = dest_meta or {}
1169
        dest_version_ids = []
1170
        self._can_read(user, src_account, src_container, src_name)
1171

    
1172
        src_container_path = '/'.join((src_account, src_container))
1173
        dest_container_path = '/'.join((dest_account, dest_container))
1174
        # Lock container paths in alphabetical order
1175
        if src_container_path < dest_container_path:
1176
            self._lookup_container(src_account, src_container)
1177
            self._lookup_container(dest_account, dest_container)
1178
        else:
1179
            self._lookup_container(dest_account, dest_container)
1180
            self._lookup_container(src_account, src_container)
1181

    
1182
        path, node = self._lookup_object(src_account, src_container, src_name)
1183
        # TODO: Will do another fetch of the properties in duplicate version...
1184
        props = self._get_version(
1185
            node, src_version)  # Check to see if source exists.
1186
        src_version_id = props[self.SERIAL]
1187
        hash = props[self.HASH]
1188
        size = props[self.SIZE]
1189
        is_copy = not is_move and (src_account, src_container, src_name) != (
1190
            dest_account, dest_container, dest_name)  # New uuid.
1191
        dest_version_ids.append(self._update_object_hash(
1192
            user, dest_account, dest_container, dest_name, size, type, hash,
1193
            None, dest_domain, dest_meta, replace_meta, permissions,
1194
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1195
            report_size_change=report_size_change))
1196
        if is_move and ((src_account, src_container, src_name) !=
1197
                        (dest_account, dest_container, dest_name)):
1198
            self._delete_object(user, src_account, src_container, src_name,
1199
                                report_size_change=report_size_change)
1200

    
1201
        if delimiter:
1202
            prefix = (src_name + delimiter if not
1203
                      src_name.endswith(delimiter) else src_name)
1204
            src_names = self._list_objects_no_limit(
1205
                user, src_account, src_container, prefix, delimiter=None,
1206
                virtual=False, domain=None, keys=[], shared=False, until=None,
1207
                size_range=None, all_props=True, public=False)
1208
            src_names.sort(key=lambda x: x[2])  # order by nodes
1209
            paths = [elem[0] for elem in src_names]
1210
            nodes = [elem[2] for elem in src_names]
1211
            # TODO: Will do another fetch of the properties
1212
            # in duplicate version...
1213
            props = self._get_versions(nodes)  # Check to see if source exists.
1214

    
1215
            for prop, path, node in zip(props, paths, nodes):
1216
                src_version_id = prop[self.SERIAL]
1217
                hash = prop[self.HASH]
1218
                vtype = prop[self.TYPE]
1219
                size = prop[self.SIZE]
1220
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1221
                    delimiter) else dest_name
1222
                vdest_name = path.replace(prefix, dest_prefix, 1)
1223
                # _update_object_hash() locks destination path
1224
                dest_version_ids.append(self._update_object_hash(
1225
                    user, dest_account, dest_container, vdest_name, size,
1226
                    vtype, hash, None, dest_domain, meta={},
1227
                    replace_meta=False, permissions=None, src_node=node,
1228
                    src_version_id=src_version_id, is_copy=is_copy,
1229
                    report_size_change=report_size_change))
1230
                if is_move and ((src_account, src_container, src_name) !=
1231
                                (dest_account, dest_container, dest_name)):
1232
                    self._delete_object(user, src_account, src_container, path,
1233
                                        report_size_change=report_size_change)
1234
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1235
                dest_version_ids)
1236

    
1237
    @debug_method
1238
    @backend_method
1239
    def copy_object(self, user, src_account, src_container, src_name,
1240
                    dest_account, dest_container, dest_name, type, domain,
1241
                    meta=None, replace_meta=False, permissions=None,
1242
                    src_version=None, delimiter=None):
1243
        """Copy an object's data and metadata."""
1244

    
1245
        meta = meta or {}
1246
        dest_version_id = self._copy_object(
1247
            user, src_account, src_container, src_name, dest_account,
1248
            dest_container, dest_name, type, domain, meta, replace_meta,
1249
            permissions, src_version, False, delimiter)
1250
        return dest_version_id
1251

    
1252
    @debug_method
1253
    @backend_method
1254
    def move_object(self, user, src_account, src_container, src_name,
1255
                    dest_account, dest_container, dest_name, type, domain,
1256
                    meta=None, replace_meta=False, permissions=None,
1257
                    delimiter=None):
1258
        """Move an object's data and metadata."""
1259

    
1260
        meta = meta or {}
1261
        if user != src_account:
1262
            raise NotAllowedError
1263
        dest_version_id = self._move_object(
1264
            user, src_account, src_container, src_name, dest_account,
1265
            dest_container, dest_name, type, domain, meta, replace_meta,
1266
            permissions, None, delimiter=delimiter)
1267
        return dest_version_id
1268

    
1269
    def _delete_object(self, user, account, container, name, until=None,
1270
                       delimiter=None, report_size_change=True):
1271
        if user != account:
1272
            raise NotAllowedError
1273

    
1274
        # lookup object and lock container path also
1275
        path, node = self._lookup_object(account, container, name,
1276
                                         lock_container=True)
1277

    
1278
        if until is not None:
1279
            if node is None:
1280
                return
1281
            hashes = []
1282
            size = 0
1283
            serials = []
1284
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1285
                                           update_statistics_ancestors_depth=1)
1286
            hashes += h
1287
            size += s
1288
            serials += v
1289
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1290
                                           update_statistics_ancestors_depth=1)
1291
            hashes += h
1292
            if not self.free_versioning:
1293
                size += s
1294
            serials += v
1295
            for h in hashes:
1296
                self.store.map_delete(h)
1297
            self.node.node_purge(node, until, CLUSTER_DELETED,
1298
                                 update_statistics_ancestors_depth=1)
1299
            try:
1300
                self._get_version(node)
1301
            except NameError:
1302
                self.permissions.access_clear(path)
1303
            self._report_size_change(
1304
                user, account, -size, {
1305
                    'action': 'object purge',
1306
                    'path': path,
1307
                    'versions': ','.join(str(i) for i in serials)
1308
                }
1309
            )
1310
            return
1311

    
1312
        if not self._exists(node):
1313
            raise ItemNotExists('Object is deleted.')
1314

    
1315
        src_version_id, dest_version_id = self._put_version_duplicate(
1316
            user, node, size=0, type='', hash=None, checksum='',
1317
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1318
        del_size = self._apply_versioning(account, container, src_version_id,
1319
                                          update_statistics_ancestors_depth=1)
1320
        if report_size_change:
1321
            self._report_size_change(
1322
                user, account, -del_size,
1323
                {'action': 'object delete',
1324
                 'path': path,
1325
                 'versions': ','.join([str(dest_version_id)])})
1326
        self._report_object_change(
1327
            user, account, path, details={'action': 'object delete'})
1328
        self.permissions.access_clear(path)
1329

    
1330
        if delimiter:
1331
            prefix = name + delimiter if not name.endswith(delimiter) else name
1332
            src_names = self._list_objects_no_limit(
1333
                user, account, container, prefix, delimiter=None,
1334
                virtual=False, domain=None, keys=[], shared=False, until=None,
1335
                size_range=None, all_props=True, public=False)
1336
            paths = []
1337
            for t in src_names:
1338
                path = '/'.join((account, container, t[0]))
1339
                node = t[2]
1340
                if not self._exists(node):
1341
                    continue
1342
                src_version_id, dest_version_id = self._put_version_duplicate(
1343
                    user, node, size=0, type='', hash=None, checksum='',
1344
                    cluster=CLUSTER_DELETED,
1345
                    update_statistics_ancestors_depth=1)
1346
                del_size = self._apply_versioning(
1347
                    account, container, src_version_id,
1348
                    update_statistics_ancestors_depth=1)
1349
                if report_size_change:
1350
                    self._report_size_change(
1351
                        user, account, -del_size,
1352
                        {'action': 'object delete',
1353
                         'path': path,
1354
                         'versions': ','.join([str(dest_version_id)])})
1355
                self._report_object_change(
1356
                    user, account, path, details={'action': 'object delete'})
1357
                paths.append(path)
1358
            self.permissions.access_clear_bulk(paths)
1359

    
1360
    @debug_method
1361
    @backend_method
1362
    def delete_object(self, user, account, container, name, until=None,
1363
                      prefix='', delimiter=None):
1364
        """Delete/purge an object."""
1365

    
1366
        self._delete_object(user, account, container, name, until, delimiter)
1367

    
1368
    @debug_method
1369
    @backend_method
1370
    def list_versions(self, user, account, container, name):
1371
        """Return a list of all object (version, version_timestamp) tuples."""
1372

    
1373
        self._can_read(user, account, container, name)
1374
        path, node = self._lookup_object(account, container, name)
1375
        versions = self.node.node_get_versions(node)
1376
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1377
                x[self.CLUSTER] != CLUSTER_DELETED]
1378

    
1379
    @debug_method
1380
    @backend_method
1381
    def get_uuid(self, user, uuid, check_permissions=True):
1382
        """Return the (account, container, name) for the UUID given."""
1383

    
1384
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1385
        if info is None:
1386
            raise NameError
1387
        path, serial = info
1388
        account, container, name = path.split('/', 2)
1389
        if check_permissions:
1390
            self._can_read(user, account, container, name)
1391
        return (account, container, name)
1392

    
1393
    @debug_method
1394
    @backend_method
1395
    def get_public(self, user, public):
1396
        """Return the (account, container, name) for the public id given."""
1397

    
1398
        path = self.permissions.public_path(public)
1399
        if path is None:
1400
            raise NameError
1401
        account, container, name = path.split('/', 2)
1402
        self._can_read(user, account, container, name)
1403
        return (account, container, name)
1404

    
1405
    def get_block(self, hash):
1406
        """Return a block's data."""
1407

    
1408
        logger.debug("get_block: %s", hash)
1409
        block = self.store.block_get(self._unhexlify_hash(hash))
1410
        if not block:
1411
            raise ItemNotExists('Block does not exist')
1412
        return block
1413

    
1414
    def put_block(self, data):
1415
        """Store a block and return the hash."""
1416

    
1417
        logger.debug("put_block: %s", len(data))
1418
        return binascii.hexlify(self.store.block_put(data))
1419

    
1420
    def update_block(self, hash, data, offset=0):
1421
        """Update a known block and return the hash."""
1422

    
1423
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1424
        if offset == 0 and len(data) == self.block_size:
1425
            return self.put_block(data)
1426
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1427
        return binascii.hexlify(h)
1428

    
1429
    # Path functions.
1430

    
1431
    def _generate_uuid(self):
1432
        return str(uuidlib.uuid4())
1433

    
1434
    def _put_object_node(self, path, parent, name):
1435
        path = '/'.join((path, name))
1436
        node = self.node.node_lookup(path)
1437
        if node is None:
1438
            node = self.node.node_create(parent, path)
1439
        return path, node
1440

    
1441
    def _put_path(self, user, parent, path,
1442
                  update_statistics_ancestors_depth=None):
1443
        node = self.node.node_create(parent, path)
1444
        self.node.version_create(node, None, 0, '', None, user,
1445
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1446
                                 update_statistics_ancestors_depth)
1447
        return node
1448

    
1449
    def _lookup_account(self, account, create=True):
1450
        node = self.node.node_lookup(account)
1451
        if node is None and create:
1452
            node = self._put_path(
1453
                account, self.ROOTNODE, account,
1454
                update_statistics_ancestors_depth=-1)  # User is account.
1455
        return account, node
1456

    
1457
    def _lookup_container(self, account, container):
1458
        for_update = True if self.lock_container_path else False
1459
        path = '/'.join((account, container))
1460
        node = self.node.node_lookup(path, for_update)
1461
        if node is None:
1462
            raise ItemNotExists('Container does not exist')
1463
        return path, node
1464

    
1465
    def _lookup_object(self, account, container, name, lock_container=False):
1466
        if lock_container:
1467
            self._lookup_container(account, container)
1468

    
1469
        path = '/'.join((account, container, name))
1470
        node = self.node.node_lookup(path)
1471
        if node is None:
1472
            raise ItemNotExists('Object does not exist')
1473
        return path, node
1474

    
1475
    def _lookup_objects(self, paths):
1476
        nodes = self.node.node_lookup_bulk(paths)
1477
        return paths, nodes
1478

    
1479
    def _get_properties(self, node, until=None):
1480
        """Return properties until the timestamp given."""
1481

    
1482
        before = until if until is not None else inf
1483
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1484
        if props is None and until is not None:
1485
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1486
        if props is None:
1487
            raise ItemNotExists('Path does not exist')
1488
        return props
1489

    
1490
    def _get_statistics(self, node, until=None, compute=False):
1491
        """Return (count, sum of size, timestamp) of everything under node."""
1492

    
1493
        if until is not None:
1494
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1495
        elif compute:
1496
            stats = self.node.statistics_latest(node,
1497
                                                except_cluster=CLUSTER_DELETED)
1498
        else:
1499
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1500
        if stats is None:
1501
            stats = (0, 0, 0)
1502
        return stats
1503

    
1504
    def _get_version(self, node, version=None):
1505
        if version is None:
1506
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1507
            if props is None:
1508
                raise ItemNotExists('Object does not exist')
1509
        else:
1510
            try:
1511
                version = int(version)
1512
            except ValueError:
1513
                raise VersionNotExists('Version does not exist')
1514
            props = self.node.version_get_properties(version, node=node)
1515
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1516
                raise VersionNotExists('Version does not exist')
1517
        return props
1518

    
1519
    def _get_versions(self, nodes):
1520
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1521

    
1522
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1523
                               type=None, hash=None, checksum=None,
1524
                               cluster=CLUSTER_NORMAL, is_copy=False,
1525
                               update_statistics_ancestors_depth=None):
1526
        """Create a new version of the node."""
1527

    
1528
        props = self.node.version_lookup(
1529
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1530
        if props is not None:
1531
            src_version_id = props[self.SERIAL]
1532
            src_hash = props[self.HASH]
1533
            src_size = props[self.SIZE]
1534
            src_type = props[self.TYPE]
1535
            src_checksum = props[self.CHECKSUM]
1536
        else:
1537
            src_version_id = None
1538
            src_hash = None
1539
            src_size = 0
1540
            src_type = ''
1541
            src_checksum = ''
1542
        if size is None:  # Set metadata.
1543
            hash = src_hash  # This way hash can be set to None
1544
                             # (account or container).
1545
            size = src_size
1546
        if type is None:
1547
            type = src_type
1548
        if checksum is None:
1549
            checksum = src_checksum
1550
        uuid = self._generate_uuid(
1551
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1552

    
1553
        if src_node is None:
1554
            pre_version_id = src_version_id
1555
        else:
1556
            pre_version_id = None
1557
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1558
            if props is not None:
1559
                pre_version_id = props[self.SERIAL]
1560
        if pre_version_id is not None:
1561
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1562
                                        update_statistics_ancestors_depth)
1563

    
1564
        dest_version_id, mtime = self.node.version_create(
1565
            node, hash, size, type, src_version_id, user, uuid, checksum,
1566
            cluster, update_statistics_ancestors_depth)
1567

    
1568
        self.node.attribute_unset_is_latest(node, dest_version_id)
1569

    
1570
        return pre_version_id, dest_version_id
1571

    
1572
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1573
                                node, meta, replace=False):
1574
        if src_version_id is not None:
1575
            self.node.attribute_copy(src_version_id, dest_version_id)
1576
        if not replace:
1577
            self.node.attribute_del(dest_version_id, domain, (
1578
                k for k, v in meta.iteritems() if v == ''))
1579
            self.node.attribute_set(dest_version_id, domain, node, (
1580
                (k, v) for k, v in meta.iteritems() if v != ''))
1581
        else:
1582
            self.node.attribute_del(dest_version_id, domain)
1583
            self.node.attribute_set(dest_version_id, domain, node, ((
1584
                k, v) for k, v in meta.iteritems()))
1585

    
1586
    def _put_metadata(self, user, node, domain, meta, replace=False,
1587
                      update_statistics_ancestors_depth=None):
1588
        """Create a new version and store metadata."""
1589

    
1590
        src_version_id, dest_version_id = self._put_version_duplicate(
1591
            user, node,
1592
            update_statistics_ancestors_depth=
1593
            update_statistics_ancestors_depth)
1594
        self._put_metadata_duplicate(
1595
            src_version_id, dest_version_id, domain, node, meta, replace)
1596
        return src_version_id, dest_version_id
1597

    
1598
    def _list_limits(self, listing, marker, limit):
1599
        start = 0
1600
        if marker:
1601
            try:
1602
                start = listing.index(marker) + 1
1603
            except ValueError:
1604
                pass
1605
        if not limit or limit > 10000:
1606
            limit = 10000
1607
        return start, limit
1608

    
1609
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1610
                                marker=None, limit=10000, virtual=True,
1611
                                domain=None, keys=None, until=None,
1612
                                size_range=None, allowed=None,
1613
                                all_props=False):
1614
        keys = keys or []
1615
        allowed = allowed or []
1616
        cont_prefix = path + '/'
1617
        prefix = cont_prefix + prefix
1618
        start = cont_prefix + marker if marker else None
1619
        before = until if until is not None else inf
1620
        filterq = keys if domain else []
1621
        sizeq = size_range
1622

    
1623
        objects, prefixes = self.node.latest_version_list(
1624
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1625
            allowed, domain, filterq, sizeq, all_props)
1626
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1627
        objects.sort(key=lambda x: x[0])
1628
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1629
        return objects
1630

    
1631
    # Reporting functions.
1632

    
1633
    @debug_method
1634
    @backend_method
1635
    def _report_size_change(self, user, account, size, details=None):
1636
        details = details or {}
1637

    
1638
        if size == 0:
1639
            return
1640

    
1641
        account_node = self._lookup_account(account, True)[1]
1642
        total = self._get_statistics(account_node, compute=True)[1]
1643
        details.update({'user': user, 'total': total})
1644
        self.messages.append(
1645
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1646
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1647

    
1648
        if not self.using_external_quotaholder:
1649
            return
1650

    
1651
        try:
1652
            name = details['path'] if 'path' in details else ''
1653
            serial = self.astakosclient.issue_one_commission(
1654
                holder=account,
1655
                source=DEFAULT_SOURCE,
1656
                provisions={'pithos.diskspace': size},
1657
                name=name)
1658
        except BaseException, e:
1659
            raise QuotaError(e)
1660
        else:
1661
            self.serials.append(serial)
1662

    
1663
    @debug_method
1664
    @backend_method
1665
    def _report_object_change(self, user, account, path, details=None):
1666
        details = details or {}
1667
        details.update({'user': user})
1668
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1669
                              account, QUEUE_INSTANCE_ID, 'object', path,
1670
                              details))
1671

    
1672
    @debug_method
1673
    @backend_method
1674
    def _report_sharing_change(self, user, account, path, details=None):
1675
        details = details or {}
1676
        details.update({'user': user})
1677
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1678
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1679
                              details))
1680

    
1681
    # Policy functions.
1682

    
1683
    def _check_policy(self, policy, is_account_policy=True):
1684
        default_policy = self.default_account_policy \
1685
            if is_account_policy else self.default_container_policy
1686
        for k in policy.keys():
1687
            if policy[k] == '':
1688
                policy[k] = default_policy.get(k)
1689
        for k, v in policy.iteritems():
1690
            if k == 'quota':
1691
                q = int(v)  # May raise ValueError.
1692
                if q < 0:
1693
                    raise ValueError
1694
            elif k == 'versioning':
1695
                if v not in ['auto', 'none']:
1696
                    raise ValueError
1697
            else:
1698
                raise ValueError
1699

    
1700
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1701
        default_policy = self.default_account_policy \
1702
            if is_account_policy else self.default_container_policy
1703
        if replace:
1704
            for k, v in default_policy.iteritems():
1705
                if k not in policy:
1706
                    policy[k] = v
1707
        self.node.policy_set(node, policy)
1708

    
1709
    def _get_policy(self, node, is_account_policy=True):
1710
        default_policy = self.default_account_policy \
1711
            if is_account_policy else self.default_container_policy
1712
        policy = default_policy.copy()
1713
        policy.update(self.node.policy_get(node))
1714
        return policy
1715

    
1716
    def _apply_versioning(self, account, container, version_id,
1717
                          update_statistics_ancestors_depth=None):
1718
        """Delete the provided version if such is the policy.
1719
           Return size of object removed.
1720
        """
1721

    
1722
        if version_id is None:
1723
            return 0
1724
        path, node = self._lookup_container(account, container)
1725
        versioning = self._get_policy(
1726
            node, is_account_policy=False)['versioning']
1727
        if versioning != 'auto':
1728
            hash, size = self.node.version_remove(
1729
                version_id, update_statistics_ancestors_depth)
1730
            self.store.map_delete(hash)
1731
            return size
1732
        elif self.free_versioning:
1733
            return self.node.version_get_properties(
1734
                version_id, keys=('size',))[0]
1735
        return 0
1736

    
1737
    # Access control functions.
1738

    
1739
    def _check_groups(self, groups):
1740
        # raise ValueError('Bad characters in groups')
1741
        pass
1742

    
1743
    def _check_permissions(self, path, permissions):
1744
        # raise ValueError('Bad characters in permissions')
1745
        pass
1746

    
1747
    def _get_formatted_paths(self, paths):
1748
        formatted = []
1749
        if len(paths) == 0:
1750
            return formatted
1751
        props = self.node.get_props(paths)
1752
        if props:
1753
            for prop in props:
1754
                if prop[1].split(';', 1)[0].strip() in (
1755
                        'application/directory', 'application/folder'):
1756
                    formatted.append((prop[0].rstrip('/') + '/',
1757
                                      self.MATCH_PREFIX))
1758
                formatted.append((prop[0], self.MATCH_EXACT))
1759
        return formatted
1760

    
1761
    def _get_permissions_path(self, account, container, name):
1762
        path = '/'.join((account, container, name))
1763
        permission_paths = self.permissions.access_inherit(path)
1764
        permission_paths.sort()
1765
        permission_paths.reverse()
1766
        for p in permission_paths:
1767
            if p == path:
1768
                return p
1769
            else:
1770
                if p.count('/') < 2:
1771
                    continue
1772
                node = self.node.node_lookup(p)
1773
                props = None
1774
                if node is not None:
1775
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1776
                if props is not None:
1777
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1778
                            'application/directory', 'application/folder'):
1779
                        return p
1780
        return None
1781

    
1782
    def _get_permissions_path_bulk(self, account, container, names):
1783
        formatted_paths = []
1784
        for name in names:
1785
            path = '/'.join((account, container, name))
1786
            formatted_paths.append(path)
1787
        permission_paths = self.permissions.access_inherit_bulk(
1788
            formatted_paths)
1789
        permission_paths.sort()
1790
        permission_paths.reverse()
1791
        permission_paths_list = []
1792
        lookup_list = []
1793
        for p in permission_paths:
1794
            if p in formatted_paths:
1795
                permission_paths_list.append(p)
1796
            else:
1797
                if p.count('/') < 2:
1798
                    continue
1799
                lookup_list.append(p)
1800

    
1801
        if len(lookup_list) > 0:
1802
            props = self.node.get_props(lookup_list)
1803
            if props:
1804
                for prop in props:
1805
                    if prop[1].split(';', 1)[0].strip() in (
1806
                            'application/directory', 'application/folder'):
1807
                        permission_paths_list.append(prop[0])
1808

    
1809
        if len(permission_paths_list) > 0:
1810
            return permission_paths_list
1811

    
1812
        return None
1813

    
1814
    def _can_read(self, user, account, container, name):
1815
        if user == account:
1816
            return True
1817
        path = '/'.join((account, container, name))
1818
        if self.permissions.public_get(path) is not None:
1819
            return True
1820
        path = self._get_permissions_path(account, container, name)
1821
        if not path:
1822
            raise NotAllowedError
1823
        if (not self.permissions.access_check(path, self.READ, user) and not
1824
                self.permissions.access_check(path, self.WRITE, user)):
1825
            raise NotAllowedError
1826

    
1827
    def _can_write(self, user, account, container, name):
1828
        if user == account:
1829
            return True
1830
        path = '/'.join((account, container, name))
1831
        path = self._get_permissions_path(account, container, name)
1832
        if not path:
1833
            raise NotAllowedError
1834
        if not self.permissions.access_check(path, self.WRITE, user):
1835
            raise NotAllowedError
1836

    
1837
    def _allowed_accounts(self, user):
1838
        allow = set()
1839
        for path in self.permissions.access_list_paths(user):
1840
            allow.add(path.split('/', 1)[0])
1841
        return sorted(allow)
1842

    
1843
    def _allowed_containers(self, user, account):
1844
        allow = set()
1845
        for path in self.permissions.access_list_paths(user, account):
1846
            allow.add(path.split('/', 2)[1])
1847
        return sorted(allow)
1848

    
1849
    # Domain functions
1850

    
1851
    @debug_method
1852
    @backend_method
1853
    def get_domain_objects(self, domain, user=None):
1854
        allowed_paths = self.permissions.access_list_paths(
1855
            user, include_owned=user is not None, include_containers=False)
1856
        if not allowed_paths:
1857
            return []
1858
        obj_list = self.node.domain_object_list(
1859
            domain, allowed_paths, CLUSTER_NORMAL)
1860
        return [(path,
1861
                 self._build_metadata(props, user_defined_meta),
1862
                 self.permissions.access_get(path)) for
1863
                path, props, user_defined_meta in obj_list]
1864

    
1865
    # util functions
1866

    
1867
    def _build_metadata(self, props, user_defined=None,
1868
                        include_user_defined=True):
1869
        meta = {'bytes': props[self.SIZE],
1870
                'type': props[self.TYPE],
1871
                'hash': props[self.HASH],
1872
                'version': props[self.SERIAL],
1873
                'version_timestamp': props[self.MTIME],
1874
                'modified_by': props[self.MUSER],
1875
                'uuid': props[self.UUID],
1876
                'checksum': props[self.CHECKSUM]}
1877
        if include_user_defined and user_defined is not None:
1878
            meta.update(user_defined)
1879
        return meta
1880

    
1881
    def _exists(self, node):
1882
        try:
1883
            self._get_version(node)
1884
        except ItemNotExists:
1885
            return False
1886
        else:
1887
            return True
1888

    
1889
    def _unhexlify_hash(self, hash):
1890
        try:
1891
            return binascii.unhexlify(hash)
1892
        except TypeError:
1893
            raise InvalidHash(hash)