Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 135f864e

History | View | Annotate | Download (74.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def backend_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        # if we are inside a database transaction
131
        # just proceed with the method execution
132
        # otherwise manage a new transaction
133
        if self.in_transaction:
134
            return func(self, *args, **kw)
135

    
136
        try:
137
            self.pre_exec()
138
            result = func(self, *args, **kw)
139
            success_status = True
140
            return result
141
        except:
142
            success_status = False
143
            raise
144
        finally:
145
            self.post_exec(success_status)
146
    return wrapper
147

    
148

    
149
def debug_method(func):
150
    @wraps(func)
151
    def wrapper(self, *args, **kw):
152
        try:
153
            result = func(self, *args, **kw)
154
            return result
155
        except:
156
            result = format_exc()
157
            raise
158
        finally:
159
            all_args = map(repr, args)
160
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
161
            logger.debug(">>> %s(%s) <<< %s" % (
162
                func.__name__, ', '.join(all_args).rstrip(', '), result))
163
    return wrapper
164

    
165

    
166
class ModularBackend(BaseBackend):
167
    """A modular backend.
168

169
    Uses modules for SQL functions and storage.
170
    """
171

    
172
    def __init__(self, db_module=None, db_connection=None,
173
                 block_module=None, block_path=None, block_umask=None,
174
                 block_size=None, hash_algorithm=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_auth_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        block_size = block_size or DEFAULT_BLOCK_SIZE
191
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
192
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
193
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
194
        container_quota_policy = container_quota_policy \
195
            or DEFAULT_CONTAINER_QUOTA
196
        container_versioning_policy = container_versioning_policy \
197
            or DEFAULT_CONTAINER_VERSIONING
198

    
199
        self.default_account_policy = {'quota': account_quota_policy}
200
        self.default_container_policy = {
201
            'quota': container_quota_policy,
202
            'versioning': container_versioning_policy
203
        }
204
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
205
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
206

    
207
        self.public_url_security = (public_url_security or
208
                                    DEFAULT_PUBLIC_URL_SECURITY)
209
        self.public_url_alphabet = (public_url_alphabet or
210
                                    DEFAULT_PUBLIC_URL_ALPHABET)
211

    
212
        self.hash_algorithm = hash_algorithm
213
        self.block_size = block_size
214
        self.free_versioning = free_versioning
215

    
216
        def load_module(m):
217
            __import__(m)
218
            return sys.modules[m]
219

    
220
        self.db_module = load_module(db_module)
221
        self.wrapper = self.db_module.DBWrapper(db_connection)
222
        params = {'wrapper': self.wrapper}
223
        self.permissions = self.db_module.Permissions(**params)
224
        self.config = self.db_module.Config(**params)
225
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
226
        for x in ['READ', 'WRITE']:
227
            setattr(self, x, getattr(self.db_module, x))
228
        self.node = self.db_module.Node(**params)
229
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
230
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
231
                  'MATCH_PREFIX', 'MATCH_EXACT']:
232
            setattr(self, x, getattr(self.db_module, x))
233

    
234
        self.ALLOWED = ['read', 'write']
235

    
236
        self.block_module = load_module(block_module)
237
        self.block_params = block_params
238
        params = {'path': block_path,
239
                  'block_size': self.block_size,
240
                  'hash_algorithm': self.hash_algorithm,
241
                  'umask': block_umask}
242
        params.update(self.block_params)
243
        self.store = self.block_module.Store(**params)
244

    
245
        if queue_module and queue_hosts:
246
            self.queue_module = load_module(queue_module)
247
            params = {'hosts': queue_hosts,
248
                      'exchange': queue_exchange,
249
                      'client_id': QUEUE_CLIENT_ID}
250
            self.queue = self.queue_module.Queue(**params)
251
        else:
252
            class NoQueue:
253
                def send(self, *args):
254
                    pass
255

    
256
                def close(self):
257
                    pass
258

    
259
            self.queue = NoQueue()
260

    
261
        self.astakos_auth_url = astakos_auth_url
262
        self.service_token = service_token
263

    
264
        if not astakos_auth_url or not AstakosClient:
265
            self.astakosclient = DisabledAstakosClient(
266
                service_token, astakos_auth_url,
267
                use_pool=True,
268
                pool_size=astakosclient_poolsize)
269
        else:
270
            self.astakosclient = AstakosClient(
271
                service_token, astakos_auth_url,
272
                use_pool=True,
273
                pool_size=astakosclient_poolsize)
274

    
275
        self.serials = []
276
        self.messages = []
277

    
278
        self._move_object = partial(self._copy_object, is_move=True)
279

    
280
        self.lock_container_path = False
281

    
282
        self.in_transaction = False
283

    
284
    def pre_exec(self, lock_container_path=False):
285
        self.lock_container_path = lock_container_path
286
        self.wrapper.execute()
287
        self.serials = []
288
        self.in_transaction = True
289

    
290
    def post_exec(self, success_status=True):
291
        if success_status:
292
            # send messages produced
293
            for m in self.messages:
294
                self.queue.send(*m)
295

    
296
            # register serials
297
            if self.serials:
298
                self.commission_serials.insert_many(
299
                    self.serials)
300

    
301
                # commit to ensure that the serials are registered
302
                # even if resolve commission fails
303
                self.wrapper.commit()
304

    
305
                # start new transaction
306
                self.wrapper.execute()
307

    
308
                r = self.astakosclient.resolve_commissions(
309
                    accept_serials=self.serials,
310
                    reject_serials=[])
311
                self.commission_serials.delete_many(
312
                    r['accepted'])
313

    
314
            self.wrapper.commit()
315
        else:
316
            if self.serials:
317
                r = self.astakosclient.resolve_commissions(
318
                    accept_serials=[],
319
                    reject_serials=self.serials)
320
                self.commission_serials.delete_many(
321
                    r['rejected'])
322
            self.wrapper.rollback()
323
        self.in_transaction = False
324

    
325
    def close(self):
326
        self.wrapper.close()
327
        self.queue.close()
328

    
329
    @property
330
    def using_external_quotaholder(self):
331
        return not isinstance(self.astakosclient, DisabledAstakosClient)
332

    
333
    @debug_method
334
    @backend_method
335
    def list_accounts(self, user, marker=None, limit=10000):
336
        """Return a list of accounts the user can access."""
337

    
338
        allowed = self._allowed_accounts(user)
339
        start, limit = self._list_limits(allowed, marker, limit)
340
        return allowed[start:start + limit]
341

    
342
    @debug_method
343
    @backend_method
344
    def get_account_meta(
345
            self, user, account, domain, until=None, include_user_defined=True,
346
            external_quota=None):
347
        """Return a dictionary with the account metadata for the domain."""
348

    
349
        path, node = self._lookup_account(account, user == account)
350
        if user != account:
351
            if until or (node is None) or (account not
352
                                           in self._allowed_accounts(user)):
353
                raise NotAllowedError
354
        try:
355
            props = self._get_properties(node, until)
356
            mtime = props[self.MTIME]
357
        except NameError:
358
            props = None
359
            mtime = until
360
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
361
        tstamp = max(tstamp, mtime)
362
        if until is None:
363
            modified = tstamp
364
        else:
365
            modified = self._get_statistics(
366
                node, compute=True)[2]  # Overall last modification.
367
            modified = max(modified, mtime)
368

    
369
        if user != account:
370
            meta = {'name': account}
371
        else:
372
            meta = {}
373
            if props is not None and include_user_defined:
374
                meta.update(
375
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
376
            if until is not None:
377
                meta.update({'until_timestamp': tstamp})
378
            meta.update({'name': account, 'count': count, 'bytes': bytes})
379
            if self.using_external_quotaholder:
380
                external_quota = external_quota or {}
381
                meta['bytes'] = external_quota.get('usage', 0)
382
        meta.update({'modified': modified})
383
        return meta
384

    
385
    @debug_method
386
    @backend_method
387
    def update_account_meta(self, user, account, domain, meta, replace=False):
388
        """Update the metadata associated with the account for the domain."""
389

    
390
        if user != account:
391
            raise NotAllowedError
392
        path, node = self._lookup_account(account, True)
393
        self._put_metadata(user, node, domain, meta, replace,
394
                           update_statistics_ancestors_depth=-1)
395

    
396
    @debug_method
397
    @backend_method
398
    def get_account_groups(self, user, account):
399
        """Return a dictionary with the user groups defined for the account."""
400

    
401
        if user != account:
402
            if account not in self._allowed_accounts(user):
403
                raise NotAllowedError
404
            return {}
405
        self._lookup_account(account, True)
406
        return self.permissions.group_dict(account)
407

    
408
    @debug_method
409
    @backend_method
410
    def update_account_groups(self, user, account, groups, replace=False):
411
        """Update the groups associated with the account."""
412

    
413
        if user != account:
414
            raise NotAllowedError
415
        self._lookup_account(account, True)
416
        self._check_groups(groups)
417
        if replace:
418
            self.permissions.group_destroy(account)
419
        for k, v in groups.iteritems():
420
            if not replace:  # If not already deleted.
421
                self.permissions.group_delete(account, k)
422
            if v:
423
                self.permissions.group_addmany(account, k, v)
424

    
425
    @debug_method
426
    @backend_method
427
    def get_account_policy(self, user, account, external_quota=None):
428
        """Return a dictionary with the account policy."""
429

    
430
        if user != account:
431
            if account not in self._allowed_accounts(user):
432
                raise NotAllowedError
433
            return {}
434
        path, node = self._lookup_account(account, True)
435
        policy = self._get_policy(node, is_account_policy=True)
436
        if self.using_external_quotaholder:
437
            external_quota = external_quota or {}
438
            policy['quota'] = external_quota.get('limit', 0)
439
        return policy
440

    
441
    @debug_method
442
    @backend_method
443
    def update_account_policy(self, user, account, policy, replace=False):
444
        """Update the policy associated with the account."""
445

    
446
        if user != account:
447
            raise NotAllowedError
448
        path, node = self._lookup_account(account, True)
449
        self._check_policy(policy, is_account_policy=True)
450
        self._put_policy(node, policy, replace, is_account_policy=True)
451

    
452
    @debug_method
453
    @backend_method
454
    def put_account(self, user, account, policy=None):
455
        """Create a new account with the given name."""
456

    
457
        policy = policy or {}
458
        if user != account:
459
            raise NotAllowedError
460
        node = self.node.node_lookup(account)
461
        if node is not None:
462
            raise AccountExists('Account already exists')
463
        if policy:
464
            self._check_policy(policy, is_account_policy=True)
465
        node = self._put_path(user, self.ROOTNODE, account,
466
                              update_statistics_ancestors_depth=-1)
467
        self._put_policy(node, policy, True, is_account_policy=True)
468

    
469
    @debug_method
470
    @backend_method
471
    def delete_account(self, user, account):
472
        """Delete the account with the given name."""
473

    
474
        if user != account:
475
            raise NotAllowedError
476
        node = self.node.node_lookup(account)
477
        if node is None:
478
            return
479
        if not self.node.node_remove(node,
480
                                     update_statistics_ancestors_depth=-1):
481
            raise AccountNotEmpty('Account is not empty')
482
        self.permissions.group_destroy(account)
483

    
484
    @debug_method
485
    @backend_method
486
    def list_containers(self, user, account, marker=None, limit=10000,
487
                        shared=False, until=None, public=False):
488
        """Return a list of containers existing under an account."""
489

    
490
        if user != account:
491
            if until or account not in self._allowed_accounts(user):
492
                raise NotAllowedError
493
            allowed = self._allowed_containers(user, account)
494
            start, limit = self._list_limits(allowed, marker, limit)
495
            return allowed[start:start + limit]
496
        if shared or public:
497
            allowed = set()
498
            if shared:
499
                allowed.update([x.split('/', 2)[1] for x in
500
                               self.permissions.access_list_shared(account)])
501
            if public:
502
                allowed.update([x[0].split('/', 2)[1] for x in
503
                               self.permissions.public_list(account)])
504
            allowed = sorted(allowed)
505
            start, limit = self._list_limits(allowed, marker, limit)
506
            return allowed[start:start + limit]
507
        node = self.node.node_lookup(account)
508
        containers = [x[0] for x in self._list_object_properties(
509
            node, account, '', '/', marker, limit, False, None, [], until)]
510
        start, limit = self._list_limits(
511
            [x[0] for x in containers], marker, limit)
512
        return containers[start:start + limit]
513

    
514
    @debug_method
515
    @backend_method
516
    def list_container_meta(self, user, account, container, domain,
517
                            until=None):
518
        """Return a list of the container's object meta keys for a domain."""
519

    
520
        allowed = []
521
        if user != account:
522
            if until:
523
                raise NotAllowedError
524
            allowed = self.permissions.access_list_paths(
525
                user, '/'.join((account, container)))
526
            if not allowed:
527
                raise NotAllowedError
528
        path, node = self._lookup_container(account, container)
529
        before = until if until is not None else inf
530
        allowed = self._get_formatted_paths(allowed)
531
        return self.node.latest_attribute_keys(node, domain, before,
532
                                               CLUSTER_DELETED, allowed)
533

    
534
    @debug_method
535
    @backend_method
536
    def get_container_meta(self, user, account, container, domain, until=None,
537
                           include_user_defined=True):
538
        """Return a dictionary with the container metadata for the domain."""
539

    
540
        if user != account:
541
            if until or container not in self._allowed_containers(user,
542
                                                                  account):
543
                raise NotAllowedError
544
        path, node = self._lookup_container(account, container)
545
        props = self._get_properties(node, until)
546
        mtime = props[self.MTIME]
547
        count, bytes, tstamp = self._get_statistics(node, until)
548
        tstamp = max(tstamp, mtime)
549
        if until is None:
550
            modified = tstamp
551
        else:
552
            modified = self._get_statistics(
553
                node)[2]  # Overall last modification.
554
            modified = max(modified, mtime)
555

    
556
        if user != account:
557
            meta = {'name': container}
558
        else:
559
            meta = {}
560
            if include_user_defined:
561
                meta.update(
562
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
563
            if until is not None:
564
                meta.update({'until_timestamp': tstamp})
565
            meta.update({'name': container, 'count': count, 'bytes': bytes})
566
        meta.update({'modified': modified})
567
        return meta
568

    
569
    @debug_method
570
    @backend_method
571
    def update_container_meta(self, user, account, container, domain, meta,
572
                              replace=False):
573
        """Update the metadata associated with the container for the domain."""
574

    
575
        if user != account:
576
            raise NotAllowedError
577
        path, node = self._lookup_container(account, container)
578
        src_version_id, dest_version_id = self._put_metadata(
579
            user, node, domain, meta, replace,
580
            update_statistics_ancestors_depth=0)
581
        if src_version_id is not None:
582
            versioning = self._get_policy(
583
                node, is_account_policy=False)['versioning']
584
            if versioning != 'auto':
585
                self.node.version_remove(src_version_id,
586
                                         update_statistics_ancestors_depth=0)
587

    
588
    @debug_method
589
    @backend_method
590
    def get_container_policy(self, user, account, container):
591
        """Return a dictionary with the container policy."""
592

    
593
        if user != account:
594
            if container not in self._allowed_containers(user, account):
595
                raise NotAllowedError
596
            return {}
597
        path, node = self._lookup_container(account, container)
598
        return self._get_policy(node, is_account_policy=False)
599

    
600
    @debug_method
601
    @backend_method
602
    def update_container_policy(self, user, account, container, policy,
603
                                replace=False):
604
        """Update the policy associated with the container."""
605

    
606
        if user != account:
607
            raise NotAllowedError
608
        path, node = self._lookup_container(account, container)
609
        self._check_policy(policy, is_account_policy=False)
610
        self._put_policy(node, policy, replace, is_account_policy=False)
611

    
612
    @debug_method
613
    @backend_method
614
    def put_container(self, user, account, container, policy=None):
615
        """Create a new container with the given name."""
616

    
617
        policy = policy or {}
618
        if user != account:
619
            raise NotAllowedError
620
        try:
621
            path, node = self._lookup_container(account, container)
622
        except NameError:
623
            pass
624
        else:
625
            raise ContainerExists('Container already exists')
626
        if policy:
627
            self._check_policy(policy, is_account_policy=False)
628
        path = '/'.join((account, container))
629
        node = self._put_path(
630
            user, self._lookup_account(account, True)[1], path,
631
            update_statistics_ancestors_depth=-1)
632
        self._put_policy(node, policy, True, is_account_policy=False)
633

    
634
    @debug_method
635
    @backend_method
636
    def delete_container(self, user, account, container, until=None, prefix='',
637
                         delimiter=None):
638
        """Delete/purge the container with the given name."""
639

    
640
        if user != account:
641
            raise NotAllowedError
642
        path, node = self._lookup_container(account, container)
643

    
644
        if until is not None:
645
            hashes, size, serials = self.node.node_purge_children(
646
                node, until, CLUSTER_HISTORY,
647
                update_statistics_ancestors_depth=0)
648
            for h in hashes:
649
                self.store.map_delete(h)
650
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
651
                                          update_statistics_ancestors_depth=0)
652
            if not self.free_versioning:
653
                self._report_size_change(
654
                    user, account, -size, {
655
                        'action': 'container purge',
656
                        'path': path,
657
                        'versions': ','.join(str(i) for i in serials)
658
                    }
659
                )
660
            return
661

    
662
        if not delimiter:
663
            if self._get_statistics(node)[0] > 0:
664
                raise ContainerNotEmpty('Container is not empty')
665
            hashes, size, serials = self.node.node_purge_children(
666
                node, inf, CLUSTER_HISTORY,
667
                update_statistics_ancestors_depth=0)
668
            for h in hashes:
669
                self.store.map_delete(h)
670
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
671
                                          update_statistics_ancestors_depth=0)
672
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
673
            if not self.free_versioning:
674
                self._report_size_change(
675
                    user, account, -size, {
676
                        'action': 'container purge',
677
                        'path': path,
678
                        'versions': ','.join(str(i) for i in serials)
679
                    }
680
                )
681
        else:
682
            # remove only contents
683
            src_names = self._list_objects_no_limit(
684
                user, account, container, prefix='', delimiter=None,
685
                virtual=False, domain=None, keys=[], shared=False, until=None,
686
                size_range=None, all_props=True, public=False)
687
            paths = []
688
            for t in src_names:
689
                path = '/'.join((account, container, t[0]))
690
                node = t[2]
691
                if not self._exists(node):
692
                    continue
693
                src_version_id, dest_version_id = self._put_version_duplicate(
694
                    user, node, size=0, type='', hash=None, checksum='',
695
                    cluster=CLUSTER_DELETED,
696
                    update_statistics_ancestors_depth=1)
697
                del_size = self._apply_versioning(
698
                    account, container, src_version_id,
699
                    update_statistics_ancestors_depth=1)
700
                self._report_size_change(
701
                    user, account, -del_size, {
702
                        'action': 'object delete',
703
                        'path': path,
704
                        'versions': ','.join([str(dest_version_id)])})
705
                self._report_object_change(
706
                    user, account, path, details={'action': 'object delete'})
707
                paths.append(path)
708
            self.permissions.access_clear_bulk(paths)
709

    
710
    def _list_objects(self, user, account, container, prefix, delimiter,
711
                      marker, limit, virtual, domain, keys, shared, until,
712
                      size_range, all_props, public):
713
        if user != account and until:
714
            raise NotAllowedError
715
        if shared and public:
716
            # get shared first
717
            shared_paths = self._list_object_permissions(
718
                user, account, container, prefix, shared=True, public=False)
719
            objects = set()
720
            if shared_paths:
721
                path, node = self._lookup_container(account, container)
722
                shared_paths = self._get_formatted_paths(shared_paths)
723
                objects |= set(self._list_object_properties(
724
                    node, path, prefix, delimiter, marker, limit, virtual,
725
                    domain, keys, until, size_range, shared_paths, all_props))
726

    
727
            # get public
728
            objects |= set(self._list_public_object_properties(
729
                user, account, container, prefix, all_props))
730
            objects = list(objects)
731

    
732
            objects.sort(key=lambda x: x[0])
733
            start, limit = self._list_limits(
734
                [x[0] for x in objects], marker, limit)
735
            return objects[start:start + limit]
736
        elif public:
737
            objects = self._list_public_object_properties(
738
                user, account, container, prefix, all_props)
739
            start, limit = self._list_limits(
740
                [x[0] for x in objects], marker, limit)
741
            return objects[start:start + limit]
742

    
743
        allowed = self._list_object_permissions(
744
            user, account, container, prefix, shared, public)
745
        if shared and not allowed:
746
            return []
747
        path, node = self._lookup_container(account, container)
748
        allowed = self._get_formatted_paths(allowed)
749
        objects = self._list_object_properties(
750
            node, path, prefix, delimiter, marker, limit, virtual, domain,
751
            keys, until, size_range, allowed, all_props)
752
        start, limit = self._list_limits(
753
            [x[0] for x in objects], marker, limit)
754
        return objects[start:start + limit]
755

    
756
    def _list_public_object_properties(self, user, account, container, prefix,
757
                                       all_props):
758
        public = self._list_object_permissions(
759
            user, account, container, prefix, shared=False, public=True)
760
        paths, nodes = self._lookup_objects(public)
761
        path = '/'.join((account, container))
762
        cont_prefix = path + '/'
763
        paths = [x[len(cont_prefix):] for x in paths]
764
        objects = [(p,) + props for p, props in
765
                   zip(paths, self.node.version_lookup_bulk(
766
                       nodes, all_props=all_props))]
767
        return objects
768

    
769
    def _list_objects_no_limit(self, user, account, container, prefix,
770
                               delimiter, virtual, domain, keys, shared, until,
771
                               size_range, all_props, public):
772
        objects = []
773
        while True:
774
            marker = objects[-1] if objects else None
775
            limit = 10000
776
            l = self._list_objects(
777
                user, account, container, prefix, delimiter, marker, limit,
778
                virtual, domain, keys, shared, until, size_range, all_props,
779
                public)
780
            objects.extend(l)
781
            if not l or len(l) < limit:
782
                break
783
        return objects
784

    
785
    def _list_object_permissions(self, user, account, container, prefix,
786
                                 shared, public):
787
        allowed = []
788
        path = '/'.join((account, container, prefix)).rstrip('/')
789
        if user != account:
790
            allowed = self.permissions.access_list_paths(user, path)
791
            if not allowed:
792
                raise NotAllowedError
793
        else:
794
            allowed = set()
795
            if shared:
796
                allowed.update(self.permissions.access_list_shared(path))
797
            if public:
798
                allowed.update(
799
                    [x[0] for x in self.permissions.public_list(path)])
800
            allowed = sorted(allowed)
801
            if not allowed:
802
                return []
803
        return allowed
804

    
805
    @debug_method
806
    @backend_method
807
    def list_objects(self, user, account, container, prefix='', delimiter=None,
808
                     marker=None, limit=10000, virtual=True, domain=None,
809
                     keys=None, shared=False, until=None, size_range=None,
810
                     public=False):
811
        """List (object name, object version_id) under a container."""
812

    
813
        keys = keys or []
814
        return self._list_objects(
815
            user, account, container, prefix, delimiter, marker, limit,
816
            virtual, domain, keys, shared, until, size_range, False, public)
817

    
818
    @debug_method
819
    @backend_method
820
    def list_object_meta(self, user, account, container, prefix='',
821
                         delimiter=None, marker=None, limit=10000,
822
                         virtual=True, domain=None, keys=None, shared=False,
823
                         until=None, size_range=None, public=False):
824
        """Return a list of metadata dicts of objects under a container."""
825

    
826
        keys = keys or []
827
        props = self._list_objects(
828
            user, account, container, prefix, delimiter, marker, limit,
829
            virtual, domain, keys, shared, until, size_range, True, public)
830
        objects = []
831
        for p in props:
832
            if len(p) == 2:
833
                objects.append({'subdir': p[0]})
834
            else:
835
                objects.append({
836
                    'name': p[0],
837
                    'bytes': p[self.SIZE + 1],
838
                    'type': p[self.TYPE + 1],
839
                    'hash': p[self.HASH + 1],
840
                    'version': p[self.SERIAL + 1],
841
                    'version_timestamp': p[self.MTIME + 1],
842
                    'modified': p[self.MTIME + 1] if until is None else None,
843
                    'modified_by': p[self.MUSER + 1],
844
                    'uuid': p[self.UUID + 1],
845
                    'checksum': p[self.CHECKSUM + 1]})
846
        return objects
847

    
848
    @debug_method
849
    @backend_method
850
    def list_object_permissions(self, user, account, container, prefix=''):
851
        """Return a list of paths enforce permissions under a container."""
852

    
853
        return self._list_object_permissions(user, account, container, prefix,
854
                                             True, False)
855

    
856
    @debug_method
857
    @backend_method
858
    def list_object_public(self, user, account, container, prefix=''):
859
        """Return a mapping of object paths to public ids under a container."""
860

    
861
        public = {}
862
        for path, p in self.permissions.public_list('/'.join((account,
863
                                                              container,
864
                                                              prefix))):
865
            public[path] = p
866
        return public
867

    
868
    @debug_method
869
    @backend_method
870
    def get_object_meta(self, user, account, container, name, domain,
871
                        version=None, include_user_defined=True):
872
        """Return a dictionary with the object metadata for the domain."""
873

    
874
        self._can_read(user, account, container, name)
875
        path, node = self._lookup_object(account, container, name)
876
        props = self._get_version(node, version)
877
        if version is None:
878
            modified = props[self.MTIME]
879
        else:
880
            try:
881
                modified = self._get_version(
882
                    node)[self.MTIME]  # Overall last modification.
883
            except NameError:  # Object may be deleted.
884
                del_props = self.node.version_lookup(
885
                    node, inf, CLUSTER_DELETED)
886
                if del_props is None:
887
                    raise ItemNotExists('Object does not exist')
888
                modified = del_props[self.MTIME]
889

    
890
        meta = {}
891
        if include_user_defined:
892
            meta.update(
893
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
894
        meta.update({'name': name,
895
                     'bytes': props[self.SIZE],
896
                     'type': props[self.TYPE],
897
                     'hash': props[self.HASH],
898
                     'version': props[self.SERIAL],
899
                     'version_timestamp': props[self.MTIME],
900
                     'modified': modified,
901
                     'modified_by': props[self.MUSER],
902
                     'uuid': props[self.UUID],
903
                     'checksum': props[self.CHECKSUM]})
904
        return meta
905

    
906
    @debug_method
907
    @backend_method
908
    def update_object_meta(self, user, account, container, name, domain, meta,
909
                           replace=False):
910
        """Update object metadata for a domain and return the new version."""
911

    
912
        self._can_write(user, account, container, name)
913

    
914
        path, node = self._lookup_object(account, container, name,
915
                                         lock_container=True)
916
        src_version_id, dest_version_id = self._put_metadata(
917
            user, node, domain, meta, replace,
918
            update_statistics_ancestors_depth=1)
919
        self._apply_versioning(account, container, src_version_id,
920
                               update_statistics_ancestors_depth=1)
921
        return dest_version_id
922

    
923
    @debug_method
924
    @backend_method
925
    def get_object_permissions_bulk(self, user, account, container, names):
926
        """Return the action allowed on the object, the path
927
        from which the object gets its permissions from,
928
        along with a dictionary containing the permissions."""
929

    
930
        permissions_path = self._get_permissions_path_bulk(account, container,
931
                                                           names)
932
        access_objects = self.permissions.access_check_bulk(permissions_path,
933
                                                            user)
934
        #group_parents = access_objects['group_parents']
935
        nobject_permissions = {}
936
        cpath = '/'.join((account, container, ''))
937
        cpath_idx = len(cpath)
938
        for path in permissions_path:
939
            allowed = 1
940
            name = path[cpath_idx:]
941
            if user != account:
942
                try:
943
                    allowed = access_objects[path]
944
                except KeyError:
945
                    raise NotAllowedError
946
            access_dict, allowed = \
947
                self.permissions.access_get_for_bulk(access_objects[path])
948
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
949
                                         access_dict)
950
        self._lookup_objects(permissions_path)
951
        return nobject_permissions
952

    
953
    @debug_method
954
    @backend_method
955
    def get_object_permissions(self, user, account, container, name):
956
        """Return the action allowed on the object, the path
957
        from which the object gets its permissions from,
958
        along with a dictionary containing the permissions."""
959

    
960
        allowed = 'write'
961
        permissions_path = self._get_permissions_path(account, container, name)
962
        if user != account:
963
            if self.permissions.access_check(permissions_path, self.WRITE,
964
                                             user):
965
                allowed = 'write'
966
            elif self.permissions.access_check(permissions_path, self.READ,
967
                                               user):
968
                allowed = 'read'
969
            else:
970
                raise NotAllowedError
971
        self._lookup_object(account, container, name)
972
        return (allowed,
973
                permissions_path,
974
                self.permissions.access_get(permissions_path))
975

    
976
    @debug_method
977
    @backend_method
978
    def update_object_permissions(self, user, account, container, name,
979
                                  permissions):
980
        """Update the permissions associated with the object."""
981

    
982
        if user != account:
983
            raise NotAllowedError
984
        path = self._lookup_object(account, container, name,
985
                                   lock_container=True)[0]
986
        self._check_permissions(path, permissions)
987
        try:
988
            self.permissions.access_set(path, permissions)
989
        except:
990
            raise ValueError
991
        else:
992
            self._report_sharing_change(user, account, path, {'members':
993
                                        self.permissions.access_members(path)})
994

    
995
    @debug_method
996
    @backend_method
997
    def get_object_public(self, user, account, container, name):
998
        """Return the public id of the object if applicable."""
999

    
1000
        self._can_read(user, account, container, name)
1001
        path = self._lookup_object(account, container, name)[0]
1002
        p = self.permissions.public_get(path)
1003
        return p
1004

    
1005
    @debug_method
1006
    @backend_method
1007
    def update_object_public(self, user, account, container, name, public):
1008
        """Update the public status of the object."""
1009

    
1010
        self._can_write(user, account, container, name)
1011
        path = self._lookup_object(account, container, name,
1012
                                   lock_container=True)[0]
1013
        if not public:
1014
            self.permissions.public_unset(path)
1015
        else:
1016
            self.permissions.public_set(
1017
                path, self.public_url_security, self.public_url_alphabet)
1018

    
1019
    @debug_method
1020
    @backend_method
1021
    def get_object_hashmap(self, user, account, container, name, version=None):
1022
        """Return the object's size and a list with partial hashes."""
1023

    
1024
        self._can_read(user, account, container, name)
1025
        path, node = self._lookup_object(account, container, name)
1026
        props = self._get_version(node, version)
1027
        if props[self.HASH] is None:
1028
            return 0, ()
1029
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1030
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1031

    
1032
    def _update_object_hash(self, user, account, container, name, size, type,
1033
                            hash, checksum, domain, meta, replace_meta,
1034
                            permissions, src_node=None, src_version_id=None,
1035
                            is_copy=False, report_size_change=True):
1036
        if permissions is not None and user != account:
1037
            raise NotAllowedError
1038
        self._can_write(user, account, container, name)
1039
        if permissions is not None:
1040
            path = '/'.join((account, container, name))
1041
            self._check_permissions(path, permissions)
1042

    
1043
        account_path, account_node = self._lookup_account(account, True)
1044
        container_path, container_node = self._lookup_container(
1045
            account, container)
1046

    
1047
        path, node = self._put_object_node(
1048
            container_path, container_node, name)
1049
        pre_version_id, dest_version_id = self._put_version_duplicate(
1050
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1051
            checksum=checksum, is_copy=is_copy,
1052
            update_statistics_ancestors_depth=1)
1053

    
1054
        # Handle meta.
1055
        if src_version_id is None:
1056
            src_version_id = pre_version_id
1057
        self._put_metadata_duplicate(
1058
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1059

    
1060
        del_size = self._apply_versioning(account, container, pre_version_id,
1061
                                          update_statistics_ancestors_depth=1)
1062
        size_delta = size - del_size
1063
        if size_delta > 0:
1064
            # Check account quota.
1065
            if not self.using_external_quotaholder:
1066
                account_quota = long(self._get_policy(
1067
                    account_node, is_account_policy=True)['quota'])
1068
                account_usage = self._get_statistics(account_node,
1069
                                                     compute=True)[1]
1070
                if (account_quota > 0 and account_usage > account_quota):
1071
                    raise QuotaError(
1072
                        'Account quota exceeded: limit: %s, usage: %s' % (
1073
                            account_quota, account_usage))
1074

    
1075
            # Check container quota.
1076
            container_quota = long(self._get_policy(
1077
                container_node, is_account_policy=False)['quota'])
1078
            container_usage = self._get_statistics(container_node)[1]
1079
            if (container_quota > 0 and container_usage > container_quota):
1080
                # This must be executed in a transaction, so the version is
1081
                # never created if it fails.
1082
                raise QuotaError(
1083
                    'Container quota exceeded: limit: %s, usage: %s' % (
1084
                        container_quota, container_usage
1085
                    )
1086
                )
1087

    
1088
        if report_size_change:
1089
            self._report_size_change(
1090
                user, account, size_delta,
1091
                {'action': 'object update', 'path': path,
1092
                 'versions': ','.join([str(dest_version_id)])})
1093
        if permissions is not None:
1094
            self.permissions.access_set(path, permissions)
1095
            self._report_sharing_change(
1096
                user, account, path,
1097
                {'members': self.permissions.access_members(path)})
1098

    
1099
        self._report_object_change(
1100
            user, account, path,
1101
            details={'version': dest_version_id, 'action': 'object update'})
1102
        return dest_version_id
1103

    
1104
    @debug_method
1105
    def update_object_hashmap(self, user, account, container, name, size, type,
1106
                              hashmap, checksum, domain, meta=None,
1107
                              replace_meta=False, permissions=None):
1108
        """Create/update an object's hashmap and return the new version."""
1109

    
1110
        meta = meta or {}
1111
        if size == 0:  # No such thing as an empty hashmap.
1112
            hashmap = [self.put_block('')]
1113
        map = HashMap(self.block_size, self.hash_algorithm)
1114
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1115
        missing = self.store.block_search(map)
1116
        if missing:
1117
            ie = IndexError()
1118
            ie.data = [binascii.hexlify(x) for x in missing]
1119
            raise ie
1120

    
1121
        hash = map.hash()
1122
        hexlified = binascii.hexlify(hash)
1123
        # _update_object_hash() locks destination path
1124
        dest_version_id = self._update_object_hash(
1125
            user, account, container, name, size, type, hexlified, checksum,
1126
            domain, meta, replace_meta, permissions)
1127
        self.store.map_put(hash, map)
1128
        return dest_version_id, hexlified
1129

    
1130
    @debug_method
1131
    @backend_method
1132
    def update_object_checksum(self, user, account, container, name, version,
1133
                               checksum):
1134
        """Update an object's checksum."""
1135

    
1136
        # Update objects with greater version and same hashmap
1137
        # and size (fix metadata updates).
1138
        self._can_write(user, account, container, name)
1139
        path, node = self._lookup_object(account, container, name,
1140
                                         lock_container=True)
1141
        props = self._get_version(node, version)
1142
        versions = self.node.node_get_versions(node)
1143
        for x in versions:
1144
            if (x[self.SERIAL] >= int(version) and
1145
                x[self.HASH] == props[self.HASH] and
1146
                    x[self.SIZE] == props[self.SIZE]):
1147
                self.node.version_put_property(
1148
                    x[self.SERIAL], 'checksum', checksum)
1149

    
1150
    def _copy_object(self, user, src_account, src_container, src_name,
1151
                     dest_account, dest_container, dest_name, type,
1152
                     dest_domain=None, dest_meta=None, replace_meta=False,
1153
                     permissions=None, src_version=None, is_move=False,
1154
                     delimiter=None):
1155

    
1156
        report_size_change = not is_move
1157
        dest_meta = dest_meta or {}
1158
        dest_version_ids = []
1159
        self._can_read(user, src_account, src_container, src_name)
1160

    
1161
        src_container_path = '/'.join((src_account, src_container))
1162
        dest_container_path = '/'.join((dest_account, dest_container))
1163
        # Lock container paths in alphabetical order
1164
        if src_container_path < dest_container_path:
1165
            self._lookup_container(src_account, src_container)
1166
            self._lookup_container(dest_account, dest_container)
1167
        else:
1168
            self._lookup_container(dest_account, dest_container)
1169
            self._lookup_container(src_account, src_container)
1170

    
1171
        path, node = self._lookup_object(src_account, src_container, src_name)
1172
        # TODO: Will do another fetch of the properties in duplicate version...
1173
        props = self._get_version(
1174
            node, src_version)  # Check to see if source exists.
1175
        src_version_id = props[self.SERIAL]
1176
        hash = props[self.HASH]
1177
        size = props[self.SIZE]
1178
        is_copy = not is_move and (src_account, src_container, src_name) != (
1179
            dest_account, dest_container, dest_name)  # New uuid.
1180
        dest_version_ids.append(self._update_object_hash(
1181
            user, dest_account, dest_container, dest_name, size, type, hash,
1182
            None, dest_domain, dest_meta, replace_meta, permissions,
1183
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1184
            report_size_change=report_size_change))
1185
        if is_move and ((src_account, src_container, src_name) !=
1186
                        (dest_account, dest_container, dest_name)):
1187
            self._delete_object(user, src_account, src_container, src_name,
1188
                                report_size_change=report_size_change)
1189

    
1190
        if delimiter:
1191
            prefix = (src_name + delimiter if not
1192
                      src_name.endswith(delimiter) else src_name)
1193
            src_names = self._list_objects_no_limit(
1194
                user, src_account, src_container, prefix, delimiter=None,
1195
                virtual=False, domain=None, keys=[], shared=False, until=None,
1196
                size_range=None, all_props=True, public=False)
1197
            src_names.sort(key=lambda x: x[2])  # order by nodes
1198
            paths = [elem[0] for elem in src_names]
1199
            nodes = [elem[2] for elem in src_names]
1200
            # TODO: Will do another fetch of the properties
1201
            # in duplicate version...
1202
            props = self._get_versions(nodes)  # Check to see if source exists.
1203

    
1204
            for prop, path, node in zip(props, paths, nodes):
1205
                src_version_id = prop[self.SERIAL]
1206
                hash = prop[self.HASH]
1207
                vtype = prop[self.TYPE]
1208
                size = prop[self.SIZE]
1209
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1210
                    delimiter) else dest_name
1211
                vdest_name = path.replace(prefix, dest_prefix, 1)
1212
                # _update_object_hash() locks destination path
1213
                dest_version_ids.append(self._update_object_hash(
1214
                    user, dest_account, dest_container, vdest_name, size,
1215
                    vtype, hash, None, dest_domain, meta={},
1216
                    replace_meta=False, permissions=None, src_node=node,
1217
                    src_version_id=src_version_id, is_copy=is_copy,
1218
                    report_size_change=report_size_change))
1219
                if is_move and ((src_account, src_container, src_name) !=
1220
                                (dest_account, dest_container, dest_name)):
1221
                    self._delete_object(user, src_account, src_container, path,
1222
                                        report_size_change=report_size_change)
1223
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1224
                dest_version_ids)
1225

    
1226
    @debug_method
1227
    @backend_method
1228
    def copy_object(self, user, src_account, src_container, src_name,
1229
                    dest_account, dest_container, dest_name, type, domain,
1230
                    meta=None, replace_meta=False, permissions=None,
1231
                    src_version=None, delimiter=None):
1232
        """Copy an object's data and metadata."""
1233

    
1234
        meta = meta or {}
1235
        dest_version_id = self._copy_object(
1236
            user, src_account, src_container, src_name, dest_account,
1237
            dest_container, dest_name, type, domain, meta, replace_meta,
1238
            permissions, src_version, False, delimiter)
1239
        return dest_version_id
1240

    
1241
    @debug_method
1242
    @backend_method
1243
    def move_object(self, user, src_account, src_container, src_name,
1244
                    dest_account, dest_container, dest_name, type, domain,
1245
                    meta=None, replace_meta=False, permissions=None,
1246
                    delimiter=None):
1247
        """Move an object's data and metadata."""
1248

    
1249
        meta = meta or {}
1250
        if user != src_account:
1251
            raise NotAllowedError
1252
        dest_version_id = self._move_object(
1253
            user, src_account, src_container, src_name, dest_account,
1254
            dest_container, dest_name, type, domain, meta, replace_meta,
1255
            permissions, None, delimiter=delimiter)
1256
        return dest_version_id
1257

    
1258
    def _delete_object(self, user, account, container, name, until=None,
1259
                       delimiter=None, report_size_change=True):
1260
        if user != account:
1261
            raise NotAllowedError
1262

    
1263
        # lookup object and lock container path also
1264
        path, node = self._lookup_object(account, container, name,
1265
                                         lock_container=True)
1266

    
1267
        if until is not None:
1268
            if node is None:
1269
                return
1270
            hashes = []
1271
            size = 0
1272
            serials = []
1273
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1274
                                           update_statistics_ancestors_depth=1)
1275
            hashes += h
1276
            size += s
1277
            serials += v
1278
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1279
                                           update_statistics_ancestors_depth=1)
1280
            hashes += h
1281
            if not self.free_versioning:
1282
                size += s
1283
            serials += v
1284
            for h in hashes:
1285
                self.store.map_delete(h)
1286
            self.node.node_purge(node, until, CLUSTER_DELETED,
1287
                                 update_statistics_ancestors_depth=1)
1288
            try:
1289
                self._get_version(node)
1290
            except NameError:
1291
                self.permissions.access_clear(path)
1292
            self._report_size_change(
1293
                user, account, -size, {
1294
                    'action': 'object purge',
1295
                    'path': path,
1296
                    'versions': ','.join(str(i) for i in serials)
1297
                }
1298
            )
1299
            return
1300

    
1301
        if not self._exists(node):
1302
            raise ItemNotExists('Object is deleted.')
1303

    
1304
        src_version_id, dest_version_id = self._put_version_duplicate(
1305
            user, node, size=0, type='', hash=None, checksum='',
1306
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1307
        del_size = self._apply_versioning(account, container, src_version_id,
1308
                                          update_statistics_ancestors_depth=1)
1309
        if report_size_change:
1310
            self._report_size_change(
1311
                user, account, -del_size,
1312
                {'action': 'object delete',
1313
                 'path': path,
1314
                 'versions': ','.join([str(dest_version_id)])})
1315
        self._report_object_change(
1316
            user, account, path, details={'action': 'object delete'})
1317
        self.permissions.access_clear(path)
1318

    
1319
        if delimiter:
1320
            prefix = name + delimiter if not name.endswith(delimiter) else name
1321
            src_names = self._list_objects_no_limit(
1322
                user, account, container, prefix, delimiter=None,
1323
                virtual=False, domain=None, keys=[], shared=False, until=None,
1324
                size_range=None, all_props=True, public=False)
1325
            paths = []
1326
            for t in src_names:
1327
                path = '/'.join((account, container, t[0]))
1328
                node = t[2]
1329
                if not self._exists(node):
1330
                    continue
1331
                src_version_id, dest_version_id = self._put_version_duplicate(
1332
                    user, node, size=0, type='', hash=None, checksum='',
1333
                    cluster=CLUSTER_DELETED,
1334
                    update_statistics_ancestors_depth=1)
1335
                del_size = self._apply_versioning(
1336
                    account, container, src_version_id,
1337
                    update_statistics_ancestors_depth=1)
1338
                if report_size_change:
1339
                    self._report_size_change(
1340
                        user, account, -del_size,
1341
                        {'action': 'object delete',
1342
                         'path': path,
1343
                         'versions': ','.join([str(dest_version_id)])})
1344
                self._report_object_change(
1345
                    user, account, path, details={'action': 'object delete'})
1346
                paths.append(path)
1347
            self.permissions.access_clear_bulk(paths)
1348

    
1349
    @debug_method
1350
    @backend_method
1351
    def delete_object(self, user, account, container, name, until=None,
1352
                      prefix='', delimiter=None):
1353
        """Delete/purge an object."""
1354

    
1355
        self._delete_object(user, account, container, name, until, delimiter)
1356

    
1357
    @debug_method
1358
    @backend_method
1359
    def list_versions(self, user, account, container, name):
1360
        """Return a list of all object (version, version_timestamp) tuples."""
1361

    
1362
        self._can_read(user, account, container, name)
1363
        path, node = self._lookup_object(account, container, name)
1364
        versions = self.node.node_get_versions(node)
1365
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1366
                x[self.CLUSTER] != CLUSTER_DELETED]
1367

    
1368
    @debug_method
1369
    @backend_method
1370
    def get_uuid(self, user, uuid):
1371
        """Return the (account, container, name) for the UUID given."""
1372

    
1373
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1374
        if info is None:
1375
            raise NameError
1376
        path, serial = info
1377
        account, container, name = path.split('/', 2)
1378
        self._can_read(user, account, container, name)
1379
        return (account, container, name)
1380

    
1381
    @debug_method
1382
    @backend_method
1383
    def get_public(self, user, public):
1384
        """Return the (account, container, name) for the public id given."""
1385

    
1386
        path = self.permissions.public_path(public)
1387
        if path is None:
1388
            raise NameError
1389
        account, container, name = path.split('/', 2)
1390
        self._can_read(user, account, container, name)
1391
        return (account, container, name)
1392

    
1393
    def get_block(self, hash):
1394
        """Return a block's data."""
1395

    
1396
        logger.debug("get_block: %s", hash)
1397
        block = self.store.block_get(self._unhexlify_hash(hash))
1398
        if not block:
1399
            raise ItemNotExists('Block does not exist')
1400
        return block
1401

    
1402
    def put_block(self, data):
1403
        """Store a block and return the hash."""
1404

    
1405
        logger.debug("put_block: %s", len(data))
1406
        return binascii.hexlify(self.store.block_put(data))
1407

    
1408
    def update_block(self, hash, data, offset=0):
1409
        """Update a known block and return the hash."""
1410

    
1411
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1412
        if offset == 0 and len(data) == self.block_size:
1413
            return self.put_block(data)
1414
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1415
        return binascii.hexlify(h)
1416

    
1417
    # Path functions.
1418

    
1419
    def _generate_uuid(self):
1420
        return str(uuidlib.uuid4())
1421

    
1422
    def _put_object_node(self, path, parent, name):
1423
        path = '/'.join((path, name))
1424
        node = self.node.node_lookup(path)
1425
        if node is None:
1426
            node = self.node.node_create(parent, path)
1427
        return path, node
1428

    
1429
    def _put_path(self, user, parent, path,
1430
                  update_statistics_ancestors_depth=None):
1431
        node = self.node.node_create(parent, path)
1432
        self.node.version_create(node, None, 0, '', None, user,
1433
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1434
                                 update_statistics_ancestors_depth)
1435
        return node
1436

    
1437
    def _lookup_account(self, account, create=True):
1438
        node = self.node.node_lookup(account)
1439
        if node is None and create:
1440
            node = self._put_path(
1441
                account, self.ROOTNODE, account,
1442
                update_statistics_ancestors_depth=-1)  # User is account.
1443
        return account, node
1444

    
1445
    def _lookup_container(self, account, container):
1446
        for_update = True if self.lock_container_path else False
1447
        path = '/'.join((account, container))
1448
        node = self.node.node_lookup(path, for_update)
1449
        if node is None:
1450
            raise ItemNotExists('Container does not exist')
1451
        return path, node
1452

    
1453
    def _lookup_object(self, account, container, name, lock_container=False):
1454
        if lock_container:
1455
            self._lookup_container(account, container)
1456

    
1457
        path = '/'.join((account, container, name))
1458
        node = self.node.node_lookup(path)
1459
        if node is None:
1460
            raise ItemNotExists('Object does not exist')
1461
        return path, node
1462

    
1463
    def _lookup_objects(self, paths):
1464
        nodes = self.node.node_lookup_bulk(paths)
1465
        return paths, nodes
1466

    
1467
    def _get_properties(self, node, until=None):
1468
        """Return properties until the timestamp given."""
1469

    
1470
        before = until if until is not None else inf
1471
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1472
        if props is None and until is not None:
1473
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1474
        if props is None:
1475
            raise ItemNotExists('Path does not exist')
1476
        return props
1477

    
1478
    def _get_statistics(self, node, until=None, compute=False):
1479
        """Return (count, sum of size, timestamp) of everything under node."""
1480

    
1481
        if until is not None:
1482
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1483
        elif compute:
1484
            stats = self.node.statistics_latest(node,
1485
                                                except_cluster=CLUSTER_DELETED)
1486
        else:
1487
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1488
        if stats is None:
1489
            stats = (0, 0, 0)
1490
        return stats
1491

    
1492
    def _get_version(self, node, version=None):
1493
        if version is None:
1494
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1495
            if props is None:
1496
                raise ItemNotExists('Object does not exist')
1497
        else:
1498
            try:
1499
                version = int(version)
1500
            except ValueError:
1501
                raise VersionNotExists('Version does not exist')
1502
            props = self.node.version_get_properties(version, node=node)
1503
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1504
                raise VersionNotExists('Version does not exist')
1505
        return props
1506

    
1507
    def _get_versions(self, nodes):
1508
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1509

    
1510
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1511
                               type=None, hash=None, checksum=None,
1512
                               cluster=CLUSTER_NORMAL, is_copy=False,
1513
                               update_statistics_ancestors_depth=None):
1514
        """Create a new version of the node."""
1515

    
1516
        props = self.node.version_lookup(
1517
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1518
        if props is not None:
1519
            src_version_id = props[self.SERIAL]
1520
            src_hash = props[self.HASH]
1521
            src_size = props[self.SIZE]
1522
            src_type = props[self.TYPE]
1523
            src_checksum = props[self.CHECKSUM]
1524
        else:
1525
            src_version_id = None
1526
            src_hash = None
1527
            src_size = 0
1528
            src_type = ''
1529
            src_checksum = ''
1530
        if size is None:  # Set metadata.
1531
            hash = src_hash  # This way hash can be set to None
1532
                             # (account or container).
1533
            size = src_size
1534
        if type is None:
1535
            type = src_type
1536
        if checksum is None:
1537
            checksum = src_checksum
1538
        uuid = self._generate_uuid(
1539
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1540

    
1541
        if src_node is None:
1542
            pre_version_id = src_version_id
1543
        else:
1544
            pre_version_id = None
1545
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1546
            if props is not None:
1547
                pre_version_id = props[self.SERIAL]
1548
        if pre_version_id is not None:
1549
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1550
                                        update_statistics_ancestors_depth)
1551

    
1552
        dest_version_id, mtime = self.node.version_create(
1553
            node, hash, size, type, src_version_id, user, uuid, checksum,
1554
            cluster, update_statistics_ancestors_depth)
1555

    
1556
        self.node.attribute_unset_is_latest(node, dest_version_id)
1557

    
1558
        return pre_version_id, dest_version_id
1559

    
1560
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1561
                                node, meta, replace=False):
1562
        if src_version_id is not None:
1563
            self.node.attribute_copy(src_version_id, dest_version_id)
1564
        if not replace:
1565
            self.node.attribute_del(dest_version_id, domain, (
1566
                k for k, v in meta.iteritems() if v == ''))
1567
            self.node.attribute_set(dest_version_id, domain, node, (
1568
                (k, v) for k, v in meta.iteritems() if v != ''))
1569
        else:
1570
            self.node.attribute_del(dest_version_id, domain)
1571
            self.node.attribute_set(dest_version_id, domain, node, ((
1572
                k, v) for k, v in meta.iteritems()))
1573

    
1574
    def _put_metadata(self, user, node, domain, meta, replace=False,
1575
                      update_statistics_ancestors_depth=None):
1576
        """Create a new version and store metadata."""
1577

    
1578
        src_version_id, dest_version_id = self._put_version_duplicate(
1579
            user, node,
1580
            update_statistics_ancestors_depth=
1581
            update_statistics_ancestors_depth)
1582
        self._put_metadata_duplicate(
1583
            src_version_id, dest_version_id, domain, node, meta, replace)
1584
        return src_version_id, dest_version_id
1585

    
1586
    def _list_limits(self, listing, marker, limit):
1587
        start = 0
1588
        if marker:
1589
            try:
1590
                start = listing.index(marker) + 1
1591
            except ValueError:
1592
                pass
1593
        if not limit or limit > 10000:
1594
            limit = 10000
1595
        return start, limit
1596

    
1597
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1598
                                marker=None, limit=10000, virtual=True,
1599
                                domain=None, keys=None, until=None,
1600
                                size_range=None, allowed=None,
1601
                                all_props=False):
1602
        keys = keys or []
1603
        allowed = allowed or []
1604
        cont_prefix = path + '/'
1605
        prefix = cont_prefix + prefix
1606
        start = cont_prefix + marker if marker else None
1607
        before = until if until is not None else inf
1608
        filterq = keys if domain else []
1609
        sizeq = size_range
1610

    
1611
        objects, prefixes = self.node.latest_version_list(
1612
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1613
            allowed, domain, filterq, sizeq, all_props)
1614
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1615
        objects.sort(key=lambda x: x[0])
1616
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1617
        return objects
1618

    
1619
    # Reporting functions.
1620

    
1621
    @debug_method
1622
    @backend_method
1623
    def _report_size_change(self, user, account, size, details=None):
1624
        details = details or {}
1625

    
1626
        if size == 0:
1627
            return
1628

    
1629
        account_node = self._lookup_account(account, True)[1]
1630
        total = self._get_statistics(account_node, compute=True)[1]
1631
        details.update({'user': user, 'total': total})
1632
        self.messages.append(
1633
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1634
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1635

    
1636
        if not self.using_external_quotaholder:
1637
            return
1638

    
1639
        try:
1640
            name = details['path'] if 'path' in details else ''
1641
            serial = self.astakosclient.issue_one_commission(
1642
                holder=account,
1643
                source=DEFAULT_SOURCE,
1644
                provisions={'pithos.diskspace': size},
1645
                name=name)
1646
        except BaseException, e:
1647
            raise QuotaError(e)
1648
        else:
1649
            self.serials.append(serial)
1650

    
1651
    @debug_method
1652
    @backend_method
1653
    def _report_object_change(self, user, account, path, details=None):
1654
        details = details or {}
1655
        details.update({'user': user})
1656
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1657
                              account, QUEUE_INSTANCE_ID, 'object', path,
1658
                              details))
1659

    
1660
    @debug_method
1661
    @backend_method
1662
    def _report_sharing_change(self, user, account, path, details=None):
1663
        details = details or {}
1664
        details.update({'user': user})
1665
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1666
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1667
                              details))
1668

    
1669
    # Policy functions.
1670

    
1671
    def _check_policy(self, policy, is_account_policy=True):
1672
        default_policy = self.default_account_policy \
1673
            if is_account_policy else self.default_container_policy
1674
        for k in policy.keys():
1675
            if policy[k] == '':
1676
                policy[k] = default_policy.get(k)
1677
        for k, v in policy.iteritems():
1678
            if k == 'quota':
1679
                q = int(v)  # May raise ValueError.
1680
                if q < 0:
1681
                    raise ValueError
1682
            elif k == 'versioning':
1683
                if v not in ['auto', 'none']:
1684
                    raise ValueError
1685
            else:
1686
                raise ValueError
1687

    
1688
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1689
        default_policy = self.default_account_policy \
1690
            if is_account_policy else self.default_container_policy
1691
        if replace:
1692
            for k, v in default_policy.iteritems():
1693
                if k not in policy:
1694
                    policy[k] = v
1695
        self.node.policy_set(node, policy)
1696

    
1697
    def _get_policy(self, node, is_account_policy=True):
1698
        default_policy = self.default_account_policy \
1699
            if is_account_policy else self.default_container_policy
1700
        policy = default_policy.copy()
1701
        policy.update(self.node.policy_get(node))
1702
        return policy
1703

    
1704
    def _apply_versioning(self, account, container, version_id,
1705
                          update_statistics_ancestors_depth=None):
1706
        """Delete the provided version if such is the policy.
1707
           Return size of object removed.
1708
        """
1709

    
1710
        if version_id is None:
1711
            return 0
1712
        path, node = self._lookup_container(account, container)
1713
        versioning = self._get_policy(
1714
            node, is_account_policy=False)['versioning']
1715
        if versioning != 'auto':
1716
            hash, size = self.node.version_remove(
1717
                version_id, update_statistics_ancestors_depth)
1718
            self.store.map_delete(hash)
1719
            return size
1720
        elif self.free_versioning:
1721
            return self.node.version_get_properties(
1722
                version_id, keys=('size',))[0]
1723
        return 0
1724

    
1725
    # Access control functions.
1726

    
1727
    def _check_groups(self, groups):
1728
        # raise ValueError('Bad characters in groups')
1729
        pass
1730

    
1731
    def _check_permissions(self, path, permissions):
1732
        # raise ValueError('Bad characters in permissions')
1733
        pass
1734

    
1735
    def _get_formatted_paths(self, paths):
1736
        formatted = []
1737
        if len(paths) == 0:
1738
            return formatted
1739
        props = self.node.get_props(paths)
1740
        if props:
1741
            for prop in props:
1742
                if prop[1].split(';', 1)[0].strip() in (
1743
                        'application/directory', 'application/folder'):
1744
                    formatted.append((prop[0].rstrip('/') + '/',
1745
                                      self.MATCH_PREFIX))
1746
                formatted.append((prop[0], self.MATCH_EXACT))
1747
        return formatted
1748

    
1749
    def _get_permissions_path(self, account, container, name):
1750
        path = '/'.join((account, container, name))
1751
        permission_paths = self.permissions.access_inherit(path)
1752
        permission_paths.sort()
1753
        permission_paths.reverse()
1754
        for p in permission_paths:
1755
            if p == path:
1756
                return p
1757
            else:
1758
                if p.count('/') < 2:
1759
                    continue
1760
                node = self.node.node_lookup(p)
1761
                props = None
1762
                if node is not None:
1763
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1764
                if props is not None:
1765
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1766
                            'application/directory', 'application/folder'):
1767
                        return p
1768
        return None
1769

    
1770
    def _get_permissions_path_bulk(self, account, container, names):
1771
        formatted_paths = []
1772
        for name in names:
1773
            path = '/'.join((account, container, name))
1774
            formatted_paths.append(path)
1775
        permission_paths = self.permissions.access_inherit_bulk(
1776
            formatted_paths)
1777
        permission_paths.sort()
1778
        permission_paths.reverse()
1779
        permission_paths_list = []
1780
        lookup_list = []
1781
        for p in permission_paths:
1782
            if p in formatted_paths:
1783
                permission_paths_list.append(p)
1784
            else:
1785
                if p.count('/') < 2:
1786
                    continue
1787
                lookup_list.append(p)
1788

    
1789
        if len(lookup_list) > 0:
1790
            props = self.node.get_props(lookup_list)
1791
            if props:
1792
                for prop in props:
1793
                    if prop[1].split(';', 1)[0].strip() in (
1794
                            'application/directory', 'application/folder'):
1795
                        permission_paths_list.append(prop[0])
1796

    
1797
        if len(permission_paths_list) > 0:
1798
            return permission_paths_list
1799

    
1800
        return None
1801

    
1802
    def _can_read(self, user, account, container, name):
1803
        if user == account:
1804
            return True
1805
        path = '/'.join((account, container, name))
1806
        if self.permissions.public_get(path) is not None:
1807
            return True
1808
        path = self._get_permissions_path(account, container, name)
1809
        if not path:
1810
            raise NotAllowedError
1811
        if (not self.permissions.access_check(path, self.READ, user) and not
1812
                self.permissions.access_check(path, self.WRITE, user)):
1813
            raise NotAllowedError
1814

    
1815
    def _can_write(self, user, account, container, name):
1816
        if user == account:
1817
            return True
1818
        path = '/'.join((account, container, name))
1819
        path = self._get_permissions_path(account, container, name)
1820
        if not path:
1821
            raise NotAllowedError
1822
        if not self.permissions.access_check(path, self.WRITE, user):
1823
            raise NotAllowedError
1824

    
1825
    def _allowed_accounts(self, user):
1826
        allow = set()
1827
        for path in self.permissions.access_list_paths(user):
1828
            allow.add(path.split('/', 1)[0])
1829
        return sorted(allow)
1830

    
1831
    def _allowed_containers(self, user, account):
1832
        allow = set()
1833
        for path in self.permissions.access_list_paths(user, account):
1834
            allow.add(path.split('/', 2)[1])
1835
        return sorted(allow)
1836

    
1837
    # Domain functions
1838

    
1839
    @debug_method
1840
    @backend_method
1841
    def get_domain_objects(self, domain, user=None):
1842
        allowed_paths = self.permissions.access_list_paths(
1843
            user, include_owned=user is not None, include_containers=False)
1844
        if not allowed_paths:
1845
            return []
1846
        obj_list = self.node.domain_object_list(
1847
            domain, allowed_paths, CLUSTER_NORMAL)
1848
        return [(path,
1849
                 self._build_metadata(props, user_defined_meta),
1850
                 self.permissions.access_get(path)) for
1851
                path, props, user_defined_meta in obj_list]
1852

    
1853
    # util functions
1854

    
1855
    def _build_metadata(self, props, user_defined=None,
1856
                        include_user_defined=True):
1857
        meta = {'bytes': props[self.SIZE],
1858
                'type': props[self.TYPE],
1859
                'hash': props[self.HASH],
1860
                'version': props[self.SERIAL],
1861
                'version_timestamp': props[self.MTIME],
1862
                'modified_by': props[self.MUSER],
1863
                'uuid': props[self.UUID],
1864
                'checksum': props[self.CHECKSUM]}
1865
        if include_user_defined and user_defined is not None:
1866
            meta.update(user_defined)
1867
        return meta
1868

    
1869
    def _exists(self, node):
1870
        try:
1871
            self._get_version(node)
1872
        except ItemNotExists:
1873
            return False
1874
        else:
1875
            return True
1876

    
1877
    def _unhexlify_hash(self, hash):
1878
        try:
1879
            return binascii.unhexlify(hash)
1880
        except TypeError:
1881
            raise InvalidHash(hash)