Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 3a5e75cc

History | View | Annotate | Download (72.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def backend_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        # if we are inside a database transaction
131
        # just proceed with the method execution
132
        # otherwise manage a new transaction
133
        if self.in_transaction:
134
            return func(self, *args, **kw)
135

    
136
        try:
137
            self.pre_exec()
138
            result = func(self, *args, **kw)
139
            success_status = True
140
            return result
141
        except:
142
            success_status = False
143
            raise
144
        finally:
145
            self.post_exec(success_status)
146
    return wrapper
147

    
148

    
149
def debug_method(func):
150
    @wraps(func)
151
    def wrapper(self, *args, **kw):
152
        try:
153
            result = func(self, *args, **kw)
154
            return result
155
        except:
156
            result = format_exc()
157
            raise
158
        finally:
159
            all_args = map(repr, args)
160
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
161
            logger.debug(">>> %s(%s) <<< %s" % (
162
                func.__name__, ', '.join(all_args).rstrip(', '), result))
163
    return wrapper
164

    
165

    
166
class ModularBackend(BaseBackend):
167
    """A modular backend.
168

169
    Uses modules for SQL functions and storage.
170
    """
171

    
172
    def __init__(self, db_module=None, db_connection=None,
173
                 block_module=None, block_path=None, block_umask=None,
174
                 block_size=None, hash_algorithm=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_auth_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        block_size = block_size or DEFAULT_BLOCK_SIZE
191
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
192
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
193
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
194
        container_quota_policy = container_quota_policy \
195
            or DEFAULT_CONTAINER_QUOTA
196
        container_versioning_policy = container_versioning_policy \
197
            or DEFAULT_CONTAINER_VERSIONING
198

    
199
        self.default_account_policy = {'quota': account_quota_policy}
200
        self.default_container_policy = {
201
            'quota': container_quota_policy,
202
            'versioning': container_versioning_policy
203
        }
204
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
205
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
206

    
207
        self.public_url_security = (public_url_security or
208
                                    DEFAULT_PUBLIC_URL_SECURITY)
209
        self.public_url_alphabet = (public_url_alphabet or
210
                                    DEFAULT_PUBLIC_URL_ALPHABET)
211

    
212
        self.hash_algorithm = hash_algorithm
213
        self.block_size = block_size
214
        self.free_versioning = free_versioning
215

    
216
        def load_module(m):
217
            __import__(m)
218
            return sys.modules[m]
219

    
220
        self.db_module = load_module(db_module)
221
        self.wrapper = self.db_module.DBWrapper(db_connection)
222
        params = {'wrapper': self.wrapper}
223
        self.permissions = self.db_module.Permissions(**params)
224
        self.config = self.db_module.Config(**params)
225
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
226
        for x in ['READ', 'WRITE']:
227
            setattr(self, x, getattr(self.db_module, x))
228
        self.node = self.db_module.Node(**params)
229
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
230
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
231
                  'MATCH_PREFIX', 'MATCH_EXACT']:
232
            setattr(self, x, getattr(self.db_module, x))
233

    
234
        self.ALLOWED = ['read', 'write']
235

    
236
        self.block_module = load_module(block_module)
237
        self.block_params = block_params
238
        params = {'path': block_path,
239
                  'block_size': self.block_size,
240
                  'hash_algorithm': self.hash_algorithm,
241
                  'umask': block_umask}
242
        params.update(self.block_params)
243
        self.store = self.block_module.Store(**params)
244

    
245
        if queue_module and queue_hosts:
246
            self.queue_module = load_module(queue_module)
247
            params = {'hosts': queue_hosts,
248
                      'exchange': queue_exchange,
249
                      'client_id': QUEUE_CLIENT_ID}
250
            self.queue = self.queue_module.Queue(**params)
251
        else:
252
            class NoQueue:
253
                def send(self, *args):
254
                    pass
255

    
256
                def close(self):
257
                    pass
258

    
259
            self.queue = NoQueue()
260

    
261
        self.astakos_auth_url = astakos_auth_url
262
        self.service_token = service_token
263

    
264
        if not astakos_auth_url or not AstakosClient:
265
            self.astakosclient = DisabledAstakosClient(
266
                service_token, astakos_auth_url,
267
                use_pool=True,
268
                pool_size=astakosclient_poolsize)
269
        else:
270
            self.astakosclient = AstakosClient(
271
                service_token, astakos_auth_url,
272
                use_pool=True,
273
                pool_size=astakosclient_poolsize)
274

    
275
        self.serials = []
276
        self.messages = []
277

    
278
        self._move_object = partial(self._copy_object, is_move=True)
279

    
280
        self.lock_container_path = False
281

    
282
        self.in_transaction = False
283

    
284
    def pre_exec(self, lock_container_path=False):
285
        self.lock_container_path = lock_container_path
286
        self.wrapper.execute()
287
        self.serials = []
288
        self.in_transaction = True
289

    
290
    def post_exec(self, success_status=True):
291
        if success_status:
292
            # send messages produced
293
            for m in self.messages:
294
                self.queue.send(*m)
295

    
296
            # register serials
297
            if self.serials:
298
                self.commission_serials.insert_many(
299
                    self.serials)
300

    
301
                # commit to ensure that the serials are registered
302
                # even if resolve commission fails
303
                self.wrapper.commit()
304

    
305
                # start new transaction
306
                self.wrapper.execute()
307

    
308
                r = self.astakosclient.resolve_commissions(
309
                    accept_serials=self.serials,
310
                    reject_serials=[])
311
                self.commission_serials.delete_many(
312
                    r['accepted'])
313

    
314
            self.wrapper.commit()
315
        else:
316
            if self.serials:
317
                r = self.astakosclient.resolve_commissions(
318
                    accept_serials=[],
319
                    reject_serials=self.serials)
320
                self.commission_serials.delete_many(
321
                    r['rejected'])
322
            self.wrapper.rollback()
323
        self.in_transaction = False
324

    
325
    def close(self):
326
        self.wrapper.close()
327
        self.queue.close()
328

    
329
    @property
330
    def using_external_quotaholder(self):
331
        return not isinstance(self.astakosclient, DisabledAstakosClient)
332

    
333
    @debug_method
334
    @backend_method
335
    def list_accounts(self, user, marker=None, limit=10000):
336
        """Return a list of accounts the user can access."""
337

    
338
        allowed = self._allowed_accounts(user)
339
        start, limit = self._list_limits(allowed, marker, limit)
340
        return allowed[start:start + limit]
341

    
342
    @debug_method
343
    @backend_method
344
    def get_account_meta(
345
            self, user, account, domain, until=None, include_user_defined=True,
346
            external_quota=None):
347
        """Return a dictionary with the account metadata for the domain."""
348

    
349
        path, node = self._lookup_account(account, user == account)
350
        if user != account:
351
            if until or (node is None) or (account not
352
                                           in self._allowed_accounts(user)):
353
                raise NotAllowedError
354
        try:
355
            props = self._get_properties(node, until)
356
            mtime = props[self.MTIME]
357
        except NameError:
358
            props = None
359
            mtime = until
360
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
361
        tstamp = max(tstamp, mtime)
362
        if until is None:
363
            modified = tstamp
364
        else:
365
            modified = self._get_statistics(
366
                node, compute=True)[2]  # Overall last modification.
367
            modified = max(modified, mtime)
368

    
369
        if user != account:
370
            meta = {'name': account}
371
        else:
372
            meta = {}
373
            if props is not None and include_user_defined:
374
                meta.update(
375
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
376
            if until is not None:
377
                meta.update({'until_timestamp': tstamp})
378
            meta.update({'name': account, 'count': count, 'bytes': bytes})
379
            if self.using_external_quotaholder:
380
                external_quota = external_quota or {}
381
                meta['bytes'] = external_quota.get('usage', 0)
382
        meta.update({'modified': modified})
383
        return meta
384

    
385
    @debug_method
386
    @backend_method
387
    def update_account_meta(self, user, account, domain, meta, replace=False):
388
        """Update the metadata associated with the account for the domain."""
389

    
390
        if user != account:
391
            raise NotAllowedError
392
        path, node = self._lookup_account(account, True)
393
        self._put_metadata(user, node, domain, meta, replace,
394
                           update_statistics_ancestors_depth=-1)
395

    
396
    @debug_method
397
    @backend_method
398
    def get_account_groups(self, user, account):
399
        """Return a dictionary with the user groups defined for the account."""
400

    
401
        if user != account:
402
            if account not in self._allowed_accounts(user):
403
                raise NotAllowedError
404
            return {}
405
        self._lookup_account(account, True)
406
        return self.permissions.group_dict(account)
407

    
408
    @debug_method
409
    @backend_method
410
    def update_account_groups(self, user, account, groups, replace=False):
411
        """Update the groups associated with the account."""
412

    
413
        if user != account:
414
            raise NotAllowedError
415
        self._lookup_account(account, True)
416
        self._check_groups(groups)
417
        if replace:
418
            self.permissions.group_destroy(account)
419
        for k, v in groups.iteritems():
420
            if not replace:  # If not already deleted.
421
                self.permissions.group_delete(account, k)
422
            if v:
423
                self.permissions.group_addmany(account, k, v)
424

    
425
    @debug_method
426
    @backend_method
427
    def get_account_policy(self, user, account, external_quota=None):
428
        """Return a dictionary with the account policy."""
429

    
430
        if user != account:
431
            if account not in self._allowed_accounts(user):
432
                raise NotAllowedError
433
            return {}
434
        path, node = self._lookup_account(account, True)
435
        policy = self._get_policy(node, is_account_policy=True)
436
        if self.using_external_quotaholder:
437
            external_quota = external_quota or {}
438
            policy['quota'] = external_quota.get('limit', 0)
439
        return policy
440

    
441
    @debug_method
442
    @backend_method
443
    def update_account_policy(self, user, account, policy, replace=False):
444
        """Update the policy associated with the account."""
445

    
446
        if user != account:
447
            raise NotAllowedError
448
        path, node = self._lookup_account(account, True)
449
        self._check_policy(policy, is_account_policy=True)
450
        self._put_policy(node, policy, replace, is_account_policy=True)
451

    
452
    @debug_method
453
    @backend_method
454
    def put_account(self, user, account, policy=None):
455
        """Create a new account with the given name."""
456

    
457
        policy = policy or {}
458
        if user != account:
459
            raise NotAllowedError
460
        node = self.node.node_lookup(account)
461
        if node is not None:
462
            raise AccountExists('Account already exists')
463
        if policy:
464
            self._check_policy(policy, is_account_policy=True)
465
        node = self._put_path(user, self.ROOTNODE, account,
466
                              update_statistics_ancestors_depth=-1)
467
        self._put_policy(node, policy, True, is_account_policy=True)
468

    
469
    @debug_method
470
    @backend_method
471
    def delete_account(self, user, account):
472
        """Delete the account with the given name."""
473

    
474
        if user != account:
475
            raise NotAllowedError
476
        node = self.node.node_lookup(account)
477
        if node is None:
478
            return
479
        if not self.node.node_remove(node,
480
                                     update_statistics_ancestors_depth=-1):
481
            raise AccountNotEmpty('Account is not empty')
482
        self.permissions.group_destroy(account)
483

    
484
    @debug_method
485
    @backend_method
486
    def list_containers(self, user, account, marker=None, limit=10000,
487
                        shared=False, until=None, public=False):
488
        """Return a list of containers existing under an account."""
489

    
490
        if user != account:
491
            if until or account not in self._allowed_accounts(user):
492
                raise NotAllowedError
493
            allowed = self._allowed_containers(user, account)
494
            start, limit = self._list_limits(allowed, marker, limit)
495
            return allowed[start:start + limit]
496
        if shared or public:
497
            allowed = set()
498
            if shared:
499
                allowed.update([x.split('/', 2)[1] for x in
500
                               self.permissions.access_list_shared(account)])
501
            if public:
502
                allowed.update([x[0].split('/', 2)[1] for x in
503
                               self.permissions.public_list(account)])
504
            allowed = sorted(allowed)
505
            start, limit = self._list_limits(allowed, marker, limit)
506
            return allowed[start:start + limit]
507
        node = self.node.node_lookup(account)
508
        containers = [x[0] for x in self._list_object_properties(
509
            node, account, '', '/', marker, limit, False, None, [], until)]
510
        start, limit = self._list_limits(
511
            [x[0] for x in containers], marker, limit)
512
        return containers[start:start + limit]
513

    
514
    @debug_method
515
    @backend_method
516
    def list_container_meta(self, user, account, container, domain,
517
                            until=None):
518
        """Return a list of the container's object meta keys for a domain."""
519

    
520
        allowed = []
521
        if user != account:
522
            if until:
523
                raise NotAllowedError
524
            allowed = self.permissions.access_list_paths(
525
                user, '/'.join((account, container)))
526
            if not allowed:
527
                raise NotAllowedError
528
        path, node = self._lookup_container(account, container)
529
        before = until if until is not None else inf
530
        allowed = self._get_formatted_paths(allowed)
531
        return self.node.latest_attribute_keys(node, domain, before,
532
                                               CLUSTER_DELETED, allowed)
533

    
534
    @debug_method
535
    @backend_method
536
    def get_container_meta(self, user, account, container, domain, until=None,
537
                           include_user_defined=True):
538
        """Return a dictionary with the container metadata for the domain."""
539

    
540
        if user != account:
541
            if until or container not in self._allowed_containers(user,
542
                                                                  account):
543
                raise NotAllowedError
544
        path, node = self._lookup_container(account, container)
545
        props = self._get_properties(node, until)
546
        mtime = props[self.MTIME]
547
        count, bytes, tstamp = self._get_statistics(node, until)
548
        tstamp = max(tstamp, mtime)
549
        if until is None:
550
            modified = tstamp
551
        else:
552
            modified = self._get_statistics(
553
                node)[2]  # Overall last modification.
554
            modified = max(modified, mtime)
555

    
556
        if user != account:
557
            meta = {'name': container}
558
        else:
559
            meta = {}
560
            if include_user_defined:
561
                meta.update(
562
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
563
            if until is not None:
564
                meta.update({'until_timestamp': tstamp})
565
            meta.update({'name': container, 'count': count, 'bytes': bytes})
566
        meta.update({'modified': modified})
567
        return meta
568

    
569
    @debug_method
570
    @backend_method
571
    def update_container_meta(self, user, account, container, domain, meta,
572
                              replace=False):
573
        """Update the metadata associated with the container for the domain."""
574

    
575
        if user != account:
576
            raise NotAllowedError
577
        path, node = self._lookup_container(account, container)
578
        src_version_id, dest_version_id = self._put_metadata(
579
            user, node, domain, meta, replace,
580
            update_statistics_ancestors_depth=0)
581
        if src_version_id is not None:
582
            versioning = self._get_policy(
583
                node, is_account_policy=False)['versioning']
584
            if versioning != 'auto':
585
                self.node.version_remove(src_version_id,
586
                                         update_statistics_ancestors_depth=0)
587

    
588
    @debug_method
589
    @backend_method
590
    def get_container_policy(self, user, account, container):
591
        """Return a dictionary with the container policy."""
592

    
593
        if user != account:
594
            if container not in self._allowed_containers(user, account):
595
                raise NotAllowedError
596
            return {}
597
        path, node = self._lookup_container(account, container)
598
        return self._get_policy(node, is_account_policy=False)
599

    
600
    @debug_method
601
    @backend_method
602
    def update_container_policy(self, user, account, container, policy,
603
                                replace=False):
604
        """Update the policy associated with the container."""
605

    
606
        if user != account:
607
            raise NotAllowedError
608
        path, node = self._lookup_container(account, container)
609
        self._check_policy(policy, is_account_policy=False)
610
        self._put_policy(node, policy, replace, is_account_policy=False)
611

    
612
    @debug_method
613
    @backend_method
614
    def put_container(self, user, account, container, policy=None):
615
        """Create a new container with the given name."""
616

    
617
        policy = policy or {}
618
        if user != account:
619
            raise NotAllowedError
620
        try:
621
            path, node = self._lookup_container(account, container)
622
        except NameError:
623
            pass
624
        else:
625
            raise ContainerExists('Container already exists')
626
        if policy:
627
            self._check_policy(policy, is_account_policy=False)
628
        path = '/'.join((account, container))
629
        node = self._put_path(
630
            user, self._lookup_account(account, True)[1], path,
631
            update_statistics_ancestors_depth=-1)
632
        self._put_policy(node, policy, True, is_account_policy=False)
633

    
634
    @debug_method
635
    @backend_method
636
    def delete_container(self, user, account, container, until=None, prefix='',
637
                         delimiter=None):
638
        """Delete/purge the container with the given name."""
639

    
640
        if user != account:
641
            raise NotAllowedError
642
        path, node = self._lookup_container(account, container)
643

    
644
        if until is not None:
645
            hashes, size, serials = self.node.node_purge_children(
646
                node, until, CLUSTER_HISTORY,
647
                update_statistics_ancestors_depth=0)
648
            for h in hashes:
649
                self.store.map_delete(h)
650
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
651
                                          update_statistics_ancestors_depth=0)
652
            if not self.free_versioning:
653
                self._report_size_change(
654
                    user, account, -size, {
655
                        'action': 'container purge',
656
                        'path': path,
657
                        'versions': ','.join(str(i) for i in serials)
658
                    }
659
                )
660
            return
661

    
662
        if not delimiter:
663
            if self._get_statistics(node)[0] > 0:
664
                raise ContainerNotEmpty('Container is not empty')
665
            hashes, size, serials = self.node.node_purge_children(
666
                node, inf, CLUSTER_HISTORY,
667
                update_statistics_ancestors_depth=0)
668
            for h in hashes:
669
                self.store.map_delete(h)
670
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
671
                                          update_statistics_ancestors_depth=0)
672
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
673
            if not self.free_versioning:
674
                self._report_size_change(
675
                    user, account, -size, {
676
                        'action': 'container purge',
677
                        'path': path,
678
                        'versions': ','.join(str(i) for i in serials)
679
                    }
680
                )
681
        else:
682
            # remove only contents
683
            src_objects = self._list_objects_no_limit(
684
                user, account, container, prefix='', delimiter=None,
685
                virtual=False, domain=None, keys=[], shared=False, until=None,
686
                size_range=None, all_props=False, public=False)
687
            for name, _ in src_objects:
688
                self._delete_object(user, account, container, name, until=None,
689
                                    delimiter=None, report_size_change=True)
690

    
691
    def _list_objects(self, user, account, container, prefix, delimiter,
692
                      marker, limit, virtual, domain, keys, shared, until,
693
                      size_range, all_props, public):
694
        if user != account and until:
695
            raise NotAllowedError
696
        if shared and public:
697
            # get shared first
698
            shared_paths = self._list_object_permissions(
699
                user, account, container, prefix, shared=True, public=False)
700
            objects = set()
701
            if shared_paths:
702
                path, node = self._lookup_container(account, container)
703
                shared_paths = self._get_formatted_paths(shared_paths)
704
                objects |= set(self._list_object_properties(
705
                    node, path, prefix, delimiter, marker, limit, virtual,
706
                    domain, keys, until, size_range, shared_paths, all_props))
707

    
708
            # get public
709
            objects |= set(self._list_public_object_properties(
710
                user, account, container, prefix, all_props))
711
            objects = list(objects)
712

    
713
            objects.sort(key=lambda x: x[0])
714
            start, limit = self._list_limits(
715
                [x[0] for x in objects], marker, limit)
716
            return objects[start:start + limit]
717
        elif public:
718
            objects = self._list_public_object_properties(
719
                user, account, container, prefix, all_props)
720
            start, limit = self._list_limits(
721
                [x[0] for x in objects], marker, limit)
722
            return objects[start:start + limit]
723

    
724
        allowed = self._list_object_permissions(
725
            user, account, container, prefix, shared, public)
726
        if shared and not allowed:
727
            return []
728
        path, node = self._lookup_container(account, container)
729
        allowed = self._get_formatted_paths(allowed)
730
        objects = self._list_object_properties(
731
            node, path, prefix, delimiter, marker, limit, virtual, domain,
732
            keys, until, size_range, allowed, all_props)
733
        start, limit = self._list_limits(
734
            [x[0] for x in objects], marker, limit)
735
        return objects[start:start + limit]
736

    
737
    def _list_public_object_properties(self, user, account, container, prefix,
738
                                       all_props):
739
        public = self._list_object_permissions(
740
            user, account, container, prefix, shared=False, public=True)
741
        paths, nodes = self._lookup_objects(public)
742
        path = '/'.join((account, container))
743
        cont_prefix = path + '/'
744
        paths = [x[len(cont_prefix):] for x in paths]
745
        objects = [(p,) + props for p, props in
746
                   zip(paths, self.node.version_lookup_bulk(
747
                       nodes, all_props=all_props, order_by_path=True))]
748
        return objects
749

    
750
    def _list_objects_no_limit(self, user, account, container, prefix,
751
                               delimiter, virtual, domain, keys, shared, until,
752
                               size_range, all_props, public):
753
        objects = []
754
        while True:
755
            marker = objects[-1] if objects else None
756
            limit = 10000
757
            l = self._list_objects(
758
                user, account, container, prefix, delimiter, marker, limit,
759
                virtual, domain, keys, shared, until, size_range, all_props,
760
                public)
761
            objects.extend(l)
762
            if not l or len(l) < limit:
763
                break
764
        return objects
765

    
766
    def _list_object_permissions(self, user, account, container, prefix,
767
                                 shared, public):
768
        allowed = []
769
        path = '/'.join((account, container, prefix)).rstrip('/')
770
        if user != account:
771
            allowed = self.permissions.access_list_paths(user, path)
772
            if not allowed:
773
                raise NotAllowedError
774
        else:
775
            allowed = set()
776
            if shared:
777
                allowed.update(self.permissions.access_list_shared(path))
778
            if public:
779
                allowed.update(
780
                    [x[0] for x in self.permissions.public_list(path)])
781
            allowed = sorted(allowed)
782
            if not allowed:
783
                return []
784
        return allowed
785

    
786
    @debug_method
787
    @backend_method
788
    def list_objects(self, user, account, container, prefix='', delimiter=None,
789
                     marker=None, limit=10000, virtual=True, domain=None,
790
                     keys=None, shared=False, until=None, size_range=None,
791
                     public=False):
792
        """List (object name, object version_id) under a container."""
793

    
794
        keys = keys or []
795
        return self._list_objects(
796
            user, account, container, prefix, delimiter, marker, limit,
797
            virtual, domain, keys, shared, until, size_range, False, public)
798

    
799
    @debug_method
800
    @backend_method
801
    def list_object_meta(self, user, account, container, prefix='',
802
                         delimiter=None, marker=None, limit=10000,
803
                         virtual=True, domain=None, keys=None, shared=False,
804
                         until=None, size_range=None, public=False):
805
        """Return a list of metadata dicts of objects under a container."""
806

    
807
        keys = keys or []
808
        props = self._list_objects(
809
            user, account, container, prefix, delimiter, marker, limit,
810
            virtual, domain, keys, shared, until, size_range, True, public)
811
        objects = []
812
        for p in props:
813
            if len(p) == 2:
814
                objects.append({'subdir': p[0]})
815
            else:
816
                objects.append({
817
                    'name': p[0],
818
                    'bytes': p[self.SIZE + 1],
819
                    'type': p[self.TYPE + 1],
820
                    'hash': p[self.HASH + 1],
821
                    'version': p[self.SERIAL + 1],
822
                    'version_timestamp': p[self.MTIME + 1],
823
                    'modified': p[self.MTIME + 1] if until is None else None,
824
                    'modified_by': p[self.MUSER + 1],
825
                    'uuid': p[self.UUID + 1],
826
                    'checksum': p[self.CHECKSUM + 1]})
827
        return objects
828

    
829
    @debug_method
830
    @backend_method
831
    def list_object_permissions(self, user, account, container, prefix=''):
832
        """Return a list of paths enforce permissions under a container."""
833

    
834
        return self._list_object_permissions(user, account, container, prefix,
835
                                             True, False)
836

    
837
    @debug_method
838
    @backend_method
839
    def list_object_public(self, user, account, container, prefix=''):
840
        """Return a mapping of object paths to public ids under a container."""
841

    
842
        public = {}
843
        for path, p in self.permissions.public_list('/'.join((account,
844
                                                              container,
845
                                                              prefix))):
846
            public[path] = p
847
        return public
848

    
849
    @debug_method
850
    @backend_method
851
    def get_object_meta(self, user, account, container, name, domain,
852
                        version=None, include_user_defined=True):
853
        """Return a dictionary with the object metadata for the domain."""
854

    
855
        self._can_read(user, account, container, name)
856
        path, node = self._lookup_object(account, container, name)
857
        props = self._get_version(node, version)
858
        if version is None:
859
            modified = props[self.MTIME]
860
        else:
861
            try:
862
                modified = self._get_version(
863
                    node)[self.MTIME]  # Overall last modification.
864
            except NameError:  # Object may be deleted.
865
                del_props = self.node.version_lookup(
866
                    node, inf, CLUSTER_DELETED)
867
                if del_props is None:
868
                    raise ItemNotExists('Object does not exist')
869
                modified = del_props[self.MTIME]
870

    
871
        meta = {}
872
        if include_user_defined:
873
            meta.update(
874
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
875
        meta.update({'name': name,
876
                     'bytes': props[self.SIZE],
877
                     'type': props[self.TYPE],
878
                     'hash': props[self.HASH],
879
                     'version': props[self.SERIAL],
880
                     'version_timestamp': props[self.MTIME],
881
                     'modified': modified,
882
                     'modified_by': props[self.MUSER],
883
                     'uuid': props[self.UUID],
884
                     'checksum': props[self.CHECKSUM]})
885
        return meta
886

    
887
    @debug_method
888
    @backend_method
889
    def update_object_meta(self, user, account, container, name, domain, meta,
890
                           replace=False):
891
        """Update object metadata for a domain and return the new version."""
892

    
893
        self._can_write(user, account, container, name)
894

    
895
        path, node = self._lookup_object(account, container, name,
896
                                         lock_container=True)
897
        src_version_id, dest_version_id = self._put_metadata(
898
            user, node, domain, meta, replace,
899
            update_statistics_ancestors_depth=1)
900
        self._apply_versioning(account, container, src_version_id,
901
                               update_statistics_ancestors_depth=1)
902
        return dest_version_id
903

    
904
    @debug_method
905
    @backend_method
906
    def get_object_permissions_bulk(self, user, account, container, names):
907
        """Return the action allowed on the object, the path
908
        from which the object gets its permissions from,
909
        along with a dictionary containing the permissions."""
910

    
911
        permissions_path = self._get_permissions_path_bulk(account, container,
912
                                                           names)
913
        access_objects = self.permissions.access_check_bulk(permissions_path,
914
                                                            user)
915
        #group_parents = access_objects['group_parents']
916
        nobject_permissions = {}
917
        cpath = '/'.join((account, container, ''))
918
        cpath_idx = len(cpath)
919
        for path in permissions_path:
920
            allowed = 1
921
            name = path[cpath_idx:]
922
            if user != account:
923
                try:
924
                    allowed = access_objects[path]
925
                except KeyError:
926
                    raise NotAllowedError
927
            access_dict, allowed = \
928
                self.permissions.access_get_for_bulk(access_objects[path])
929
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
930
                                         access_dict)
931
        self._lookup_objects(permissions_path)
932
        return nobject_permissions
933

    
934
    @debug_method
935
    @backend_method
936
    def get_object_permissions(self, user, account, container, name):
937
        """Return the action allowed on the object, the path
938
        from which the object gets its permissions from,
939
        along with a dictionary containing the permissions."""
940

    
941
        allowed = 'write'
942
        permissions_path = self._get_permissions_path(account, container, name)
943
        if user != account:
944
            if self.permissions.access_check(permissions_path, self.WRITE,
945
                                             user):
946
                allowed = 'write'
947
            elif self.permissions.access_check(permissions_path, self.READ,
948
                                               user):
949
                allowed = 'read'
950
            else:
951
                raise NotAllowedError
952
        self._lookup_object(account, container, name)
953
        return (allowed,
954
                permissions_path,
955
                self.permissions.access_get(permissions_path))
956

    
957
    @debug_method
958
    @backend_method
959
    def update_object_permissions(self, user, account, container, name,
960
                                  permissions):
961
        """Update the permissions associated with the object."""
962

    
963
        if user != account:
964
            raise NotAllowedError
965
        path = self._lookup_object(account, container, name,
966
                                   lock_container=True)[0]
967
        self._check_permissions(path, permissions)
968
        try:
969
            self.permissions.access_set(path, permissions)
970
        except:
971
            raise ValueError
972
        else:
973
            self._report_sharing_change(user, account, path, {'members':
974
                                        self.permissions.access_members(path)})
975

    
976
    @debug_method
977
    @backend_method
978
    def get_object_public(self, user, account, container, name):
979
        """Return the public id of the object if applicable."""
980

    
981
        self._can_read(user, account, container, name)
982
        path = self._lookup_object(account, container, name)[0]
983
        p = self.permissions.public_get(path)
984
        return p
985

    
986
    @debug_method
987
    @backend_method
988
    def update_object_public(self, user, account, container, name, public):
989
        """Update the public status of the object."""
990

    
991
        self._can_write(user, account, container, name)
992
        path = self._lookup_object(account, container, name,
993
                                   lock_container=True)[0]
994
        if not public:
995
            self.permissions.public_unset(path)
996
        else:
997
            self.permissions.public_set(
998
                path, self.public_url_security, self.public_url_alphabet)
999

    
1000
    @debug_method
1001
    @backend_method
1002
    def get_object_hashmap(self, user, account, container, name, version=None):
1003
        """Return the object's size and a list with partial hashes."""
1004

    
1005
        self._can_read(user, account, container, name)
1006
        path, node = self._lookup_object(account, container, name)
1007
        props = self._get_version(node, version)
1008
        if props[self.HASH] is None:
1009
            return 0, ()
1010
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1011
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1012

    
1013
    def _update_object_hash(self, user, account, container, name, size, type,
1014
                            hash, checksum, domain, meta, replace_meta,
1015
                            permissions, src_node=None, src_version_id=None,
1016
                            is_copy=False, report_size_change=True):
1017
        if permissions is not None and user != account:
1018
            raise NotAllowedError
1019
        self._can_write(user, account, container, name)
1020
        if permissions is not None:
1021
            path = '/'.join((account, container, name))
1022
            self._check_permissions(path, permissions)
1023

    
1024
        account_path, account_node = self._lookup_account(account, True)
1025
        container_path, container_node = self._lookup_container(
1026
            account, container)
1027

    
1028
        path, node = self._put_object_node(
1029
            container_path, container_node, name)
1030
        pre_version_id, dest_version_id = self._put_version_duplicate(
1031
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1032
            checksum=checksum, is_copy=is_copy,
1033
            update_statistics_ancestors_depth=1)
1034

    
1035
        # Handle meta.
1036
        if src_version_id is None:
1037
            src_version_id = pre_version_id
1038
        self._put_metadata_duplicate(
1039
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1040

    
1041
        del_size = self._apply_versioning(account, container, pre_version_id,
1042
                                          update_statistics_ancestors_depth=1)
1043
        size_delta = size - del_size
1044
        if size_delta > 0:
1045
            # Check account quota.
1046
            if not self.using_external_quotaholder:
1047
                account_quota = long(self._get_policy(
1048
                    account_node, is_account_policy=True)['quota'])
1049
                account_usage = self._get_statistics(account_node,
1050
                                                     compute=True)[1]
1051
                if (account_quota > 0 and account_usage > account_quota):
1052
                    raise QuotaError(
1053
                        'Account quota exceeded: limit: %s, usage: %s' % (
1054
                            account_quota, account_usage))
1055

    
1056
            # Check container quota.
1057
            container_quota = long(self._get_policy(
1058
                container_node, is_account_policy=False)['quota'])
1059
            container_usage = self._get_statistics(container_node)[1]
1060
            if (container_quota > 0 and container_usage > container_quota):
1061
                # This must be executed in a transaction, so the version is
1062
                # never created if it fails.
1063
                raise QuotaError(
1064
                    'Container quota exceeded: limit: %s, usage: %s' % (
1065
                        container_quota, container_usage
1066
                    )
1067
                )
1068

    
1069
        if report_size_change:
1070
            self._report_size_change(
1071
                user, account, size_delta,
1072
                {'action': 'object update', 'path': path,
1073
                 'versions': ','.join([str(dest_version_id)])})
1074
        if permissions is not None:
1075
            self.permissions.access_set(path, permissions)
1076
            self._report_sharing_change(
1077
                user, account, path,
1078
                {'members': self.permissions.access_members(path)})
1079

    
1080
        self._report_object_change(
1081
            user, account, path,
1082
            details={'version': dest_version_id, 'action': 'object update'})
1083
        return dest_version_id
1084

    
1085
    @debug_method
1086
    def update_object_hashmap(self, user, account, container, name, size, type,
1087
                              hashmap, checksum, domain, meta=None,
1088
                              replace_meta=False, permissions=None):
1089
        """Create/update an object's hashmap and return the new version."""
1090

    
1091
        meta = meta or {}
1092
        if size == 0:  # No such thing as an empty hashmap.
1093
            hashmap = [self.put_block('')]
1094
        map = HashMap(self.block_size, self.hash_algorithm)
1095
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1096
        missing = self.store.block_search(map)
1097
        if missing:
1098
            ie = IndexError()
1099
            ie.data = [binascii.hexlify(x) for x in missing]
1100
            raise ie
1101

    
1102
        hash = map.hash()
1103
        hexlified = binascii.hexlify(hash)
1104
        # _update_object_hash() locks destination path
1105
        dest_version_id = self._update_object_hash(
1106
            user, account, container, name, size, type, hexlified, checksum,
1107
            domain, meta, replace_meta, permissions)
1108
        self.store.map_put(hash, map)
1109
        return dest_version_id, hexlified
1110

    
1111
    @debug_method
1112
    @backend_method
1113
    def update_object_checksum(self, user, account, container, name, version,
1114
                               checksum):
1115
        """Update an object's checksum."""
1116

    
1117
        # Update objects with greater version and same hashmap
1118
        # and size (fix metadata updates).
1119
        self._can_write(user, account, container, name)
1120
        path, node = self._lookup_object(account, container, name,
1121
                                         lock_container=True)
1122
        props = self._get_version(node, version)
1123
        versions = self.node.node_get_versions(node)
1124
        for x in versions:
1125
            if (x[self.SERIAL] >= int(version) and
1126
                x[self.HASH] == props[self.HASH] and
1127
                    x[self.SIZE] == props[self.SIZE]):
1128
                self.node.version_put_property(
1129
                    x[self.SERIAL], 'checksum', checksum)
1130

    
1131
    def _copy_object(self, user, src_account, src_container, src_name,
1132
                     dest_account, dest_container, dest_name, type,
1133
                     dest_domain=None, dest_meta=None, replace_meta=False,
1134
                     permissions=None, src_version=None, is_move=False,
1135
                     delimiter=None):
1136

    
1137
        report_size_change = not is_move
1138
        dest_meta = dest_meta or {}
1139
        dest_version_ids = []
1140
        self._can_read(user, src_account, src_container, src_name)
1141

    
1142
        src_container_path = '/'.join((src_account, src_container))
1143
        dest_container_path = '/'.join((dest_account, dest_container))
1144
        # Lock container paths in alphabetical order
1145
        if src_container_path < dest_container_path:
1146
            self._lookup_container(src_account, src_container)
1147
            self._lookup_container(dest_account, dest_container)
1148
        else:
1149
            self._lookup_container(dest_account, dest_container)
1150
            self._lookup_container(src_account, src_container)
1151

    
1152
        path, node = self._lookup_object(src_account, src_container, src_name)
1153
        # TODO: Will do another fetch of the properties in duplicate version...
1154
        props = self._get_version(
1155
            node, src_version)  # Check to see if source exists.
1156
        src_version_id = props[self.SERIAL]
1157
        hash = props[self.HASH]
1158
        size = props[self.SIZE]
1159
        is_copy = not is_move and (src_account, src_container, src_name) != (
1160
            dest_account, dest_container, dest_name)  # New uuid.
1161
        dest_version_ids.append(self._update_object_hash(
1162
            user, dest_account, dest_container, dest_name, size, type, hash,
1163
            None, dest_domain, dest_meta, replace_meta, permissions,
1164
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1165
            report_size_change=report_size_change))
1166
        if is_move and ((src_account, src_container, src_name) !=
1167
                        (dest_account, dest_container, dest_name)):
1168
            self._delete_object(user, src_account, src_container, src_name,
1169
                                report_size_change=report_size_change)
1170

    
1171
        if delimiter:
1172
            prefix = (src_name + delimiter if not
1173
                      src_name.endswith(delimiter) else src_name)
1174
            src_objects = self._list_objects_no_limit(
1175
                user, src_account, src_container, prefix, delimiter=None,
1176
                virtual=False, domain=None, keys=[], shared=False, until=None,
1177
                size_range=None, all_props=True, public=False)
1178

    
1179
            for props in src_objects:
1180
                path = props[0]
1181
                type = props[self.TYPE]
1182
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1183
                    delimiter) else dest_name
1184
                vdest_name = path.replace(prefix, dest_prefix, 1)
1185
                self._copy_object(user, src_account, src_container,
1186
                                  src_name=path, dest_account=dest_account,
1187
                                  dest_container=dest_container,
1188
                                  dest_name=vdest_name, type=type,
1189
                                  dest_domain=dest_domain, dest_meta=dest_meta,
1190
                                  replace_meta=replace_meta, permissions=None,
1191
                                  src_version=None, is_move=is_move,
1192
                                  delimiter=None)
1193
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1194
                dest_version_ids)
1195

    
1196
    @debug_method
1197
    @backend_method
1198
    def copy_object(self, user, src_account, src_container, src_name,
1199
                    dest_account, dest_container, dest_name, type, domain,
1200
                    meta=None, replace_meta=False, permissions=None,
1201
                    src_version=None, delimiter=None):
1202
        """Copy an object's data and metadata."""
1203

    
1204
        meta = meta or {}
1205
        dest_version_id = self._copy_object(
1206
            user, src_account, src_container, src_name, dest_account,
1207
            dest_container, dest_name, type, domain, meta, replace_meta,
1208
            permissions, src_version, False, delimiter)
1209
        return dest_version_id
1210

    
1211
    @debug_method
1212
    @backend_method
1213
    def move_object(self, user, src_account, src_container, src_name,
1214
                    dest_account, dest_container, dest_name, type, domain,
1215
                    meta=None, replace_meta=False, permissions=None,
1216
                    delimiter=None):
1217
        """Move an object's data and metadata."""
1218

    
1219
        meta = meta or {}
1220
        if user != src_account:
1221
            raise NotAllowedError
1222
        dest_version_id = self._move_object(
1223
            user, src_account, src_container, src_name, dest_account,
1224
            dest_container, dest_name, type, domain, meta, replace_meta,
1225
            permissions, None, delimiter=delimiter)
1226
        return dest_version_id
1227

    
1228
    def _delete_object(self, user, account, container, name, until=None,
1229
                       delimiter=None, report_size_change=True):
1230
        if user != account:
1231
            raise NotAllowedError
1232

    
1233
        # lookup object and lock container path also
1234
        path, node = self._lookup_object(account, container, name,
1235
                                         lock_container=True)
1236

    
1237
        if until is not None:
1238
            if node is None:
1239
                return
1240
            hashes = []
1241
            size = 0
1242
            serials = []
1243
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1244
                                           update_statistics_ancestors_depth=1)
1245
            hashes += h
1246
            size += s
1247
            serials += v
1248
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1249
                                           update_statistics_ancestors_depth=1)
1250
            hashes += h
1251
            if not self.free_versioning:
1252
                size += s
1253
            serials += v
1254
            for h in hashes:
1255
                self.store.map_delete(h)
1256
            self.node.node_purge(node, until, CLUSTER_DELETED,
1257
                                 update_statistics_ancestors_depth=1)
1258
            try:
1259
                self._get_version(node)
1260
            except NameError:
1261
                self.permissions.access_clear(path)
1262
            self._report_size_change(
1263
                user, account, -size, {
1264
                    'action': 'object purge',
1265
                    'path': path,
1266
                    'versions': ','.join(str(i) for i in serials)
1267
                }
1268
            )
1269
            return
1270

    
1271
        if not self._exists(node):
1272
            raise ItemNotExists('Object is deleted.')
1273

    
1274
        src_version_id, dest_version_id = self._put_version_duplicate(
1275
            user, node, size=0, type='', hash=None, checksum='',
1276
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1277
        del_size = self._apply_versioning(account, container, src_version_id,
1278
                                          update_statistics_ancestors_depth=1)
1279
        if report_size_change:
1280
            self._report_size_change(
1281
                user, account, -del_size,
1282
                {'action': 'object delete',
1283
                 'path': path,
1284
                 'versions': ','.join([str(dest_version_id)])})
1285
        self._report_object_change(
1286
            user, account, path, details={'action': 'object delete'})
1287
        self.permissions.access_clear(path)
1288

    
1289
        if delimiter:
1290
            prefix = name + delimiter if not name.endswith(delimiter) else name
1291
            src_objects = self._list_objects_no_limit(
1292
                user, account, container, prefix, delimiter=None,
1293
                virtual=False, domain=None, keys=[], shared=False, until=None,
1294
                size_range=None, all_props=False, public=False)
1295
            for name, _ in src_objects:
1296
                self._delete_object(user, account, container, name, until=None,
1297
                                    delimiter=None,
1298
                                    report_size_change=report_size_change)
1299

    
1300
    @debug_method
1301
    @backend_method
1302
    def delete_object(self, user, account, container, name, until=None,
1303
                      prefix='', delimiter=None):
1304
        """Delete/purge an object."""
1305

    
1306
        self._delete_object(user, account, container, name, until, delimiter)
1307

    
1308
    @debug_method
1309
    @backend_method
1310
    def list_versions(self, user, account, container, name):
1311
        """Return a list of all object (version, version_timestamp) tuples."""
1312

    
1313
        self._can_read(user, account, container, name)
1314
        path, node = self._lookup_object(account, container, name)
1315
        versions = self.node.node_get_versions(node)
1316
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1317
                x[self.CLUSTER] != CLUSTER_DELETED]
1318

    
1319
    @debug_method
1320
    @backend_method
1321
    def get_uuid(self, user, uuid, check_permissions=True):
1322
        """Return the (account, container, name) for the UUID given."""
1323

    
1324
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1325
        if info is None:
1326
            raise NameError
1327
        path, serial = info
1328
        account, container, name = path.split('/', 2)
1329
        if check_permissions:
1330
            self._can_read(user, account, container, name)
1331
        return (account, container, name)
1332

    
1333
    @debug_method
1334
    @backend_method
1335
    def get_public(self, user, public):
1336
        """Return the (account, container, name) for the public id given."""
1337

    
1338
        path = self.permissions.public_path(public)
1339
        if path is None:
1340
            raise NameError
1341
        account, container, name = path.split('/', 2)
1342
        self._can_read(user, account, container, name)
1343
        return (account, container, name)
1344

    
1345
    def get_block(self, hash):
1346
        """Return a block's data."""
1347

    
1348
        logger.debug("get_block: %s", hash)
1349
        block = self.store.block_get(self._unhexlify_hash(hash))
1350
        if not block:
1351
            raise ItemNotExists('Block does not exist')
1352
        return block
1353

    
1354
    def put_block(self, data):
1355
        """Store a block and return the hash."""
1356

    
1357
        logger.debug("put_block: %s", len(data))
1358
        return binascii.hexlify(self.store.block_put(data))
1359

    
1360
    def update_block(self, hash, data, offset=0):
1361
        """Update a known block and return the hash."""
1362

    
1363
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1364
        if offset == 0 and len(data) == self.block_size:
1365
            return self.put_block(data)
1366
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1367
        return binascii.hexlify(h)
1368

    
1369
    # Path functions.
1370

    
1371
    def _generate_uuid(self):
1372
        return str(uuidlib.uuid4())
1373

    
1374
    def _put_object_node(self, path, parent, name):
1375
        path = '/'.join((path, name))
1376
        node = self.node.node_lookup(path)
1377
        if node is None:
1378
            node = self.node.node_create(parent, path)
1379
        return path, node
1380

    
1381
    def _put_path(self, user, parent, path,
1382
                  update_statistics_ancestors_depth=None):
1383
        node = self.node.node_create(parent, path)
1384
        self.node.version_create(node, None, 0, '', None, user,
1385
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1386
                                 update_statistics_ancestors_depth)
1387
        return node
1388

    
1389
    def _lookup_account(self, account, create=True):
1390
        node = self.node.node_lookup(account)
1391
        if node is None and create:
1392
            node = self._put_path(
1393
                account, self.ROOTNODE, account,
1394
                update_statistics_ancestors_depth=-1)  # User is account.
1395
        return account, node
1396

    
1397
    def _lookup_container(self, account, container):
1398
        for_update = True if self.lock_container_path else False
1399
        path = '/'.join((account, container))
1400
        node = self.node.node_lookup(path, for_update)
1401
        if node is None:
1402
            raise ItemNotExists('Container does not exist')
1403
        return path, node
1404

    
1405
    def _lookup_object(self, account, container, name, lock_container=False):
1406
        if lock_container:
1407
            self._lookup_container(account, container)
1408

    
1409
        path = '/'.join((account, container, name))
1410
        node = self.node.node_lookup(path)
1411
        if node is None:
1412
            raise ItemNotExists('Object does not exist')
1413
        return path, node
1414

    
1415
    def _lookup_objects(self, paths):
1416
        nodes = self.node.node_lookup_bulk(paths)
1417
        return paths, nodes
1418

    
1419
    def _get_properties(self, node, until=None):
1420
        """Return properties until the timestamp given."""
1421

    
1422
        before = until if until is not None else inf
1423
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1424
        if props is None and until is not None:
1425
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1426
        if props is None:
1427
            raise ItemNotExists('Path does not exist')
1428
        return props
1429

    
1430
    def _get_statistics(self, node, until=None, compute=False):
1431
        """Return (count, sum of size, timestamp) of everything under node."""
1432

    
1433
        if until is not None:
1434
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1435
        elif compute:
1436
            stats = self.node.statistics_latest(node,
1437
                                                except_cluster=CLUSTER_DELETED)
1438
        else:
1439
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1440
        if stats is None:
1441
            stats = (0, 0, 0)
1442
        return stats
1443

    
1444
    def _get_version(self, node, version=None):
1445
        if version is None:
1446
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1447
            if props is None:
1448
                raise ItemNotExists('Object does not exist')
1449
        else:
1450
            try:
1451
                version = int(version)
1452
            except ValueError:
1453
                raise VersionNotExists('Version does not exist')
1454
            props = self.node.version_get_properties(version, node=node)
1455
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1456
                raise VersionNotExists('Version does not exist')
1457
        return props
1458

    
1459
    def _get_versions(self, nodes):
1460
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1461

    
1462
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1463
                               type=None, hash=None, checksum=None,
1464
                               cluster=CLUSTER_NORMAL, is_copy=False,
1465
                               update_statistics_ancestors_depth=None):
1466
        """Create a new version of the node."""
1467

    
1468
        props = self.node.version_lookup(
1469
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1470
        if props is not None:
1471
            src_version_id = props[self.SERIAL]
1472
            src_hash = props[self.HASH]
1473
            src_size = props[self.SIZE]
1474
            src_type = props[self.TYPE]
1475
            src_checksum = props[self.CHECKSUM]
1476
        else:
1477
            src_version_id = None
1478
            src_hash = None
1479
            src_size = 0
1480
            src_type = ''
1481
            src_checksum = ''
1482
        if size is None:  # Set metadata.
1483
            hash = src_hash  # This way hash can be set to None
1484
                             # (account or container).
1485
            size = src_size
1486
        if type is None:
1487
            type = src_type
1488
        if checksum is None:
1489
            checksum = src_checksum
1490
        uuid = self._generate_uuid(
1491
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1492

    
1493
        if src_node is None:
1494
            pre_version_id = src_version_id
1495
        else:
1496
            pre_version_id = None
1497
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1498
            if props is not None:
1499
                pre_version_id = props[self.SERIAL]
1500
        if pre_version_id is not None:
1501
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1502
                                        update_statistics_ancestors_depth)
1503

    
1504
        dest_version_id, mtime = self.node.version_create(
1505
            node, hash, size, type, src_version_id, user, uuid, checksum,
1506
            cluster, update_statistics_ancestors_depth)
1507

    
1508
        self.node.attribute_unset_is_latest(node, dest_version_id)
1509

    
1510
        return pre_version_id, dest_version_id
1511

    
1512
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1513
                                node, meta, replace=False):
1514
        if src_version_id is not None:
1515
            self.node.attribute_copy(src_version_id, dest_version_id)
1516
        if not replace:
1517
            self.node.attribute_del(dest_version_id, domain, (
1518
                k for k, v in meta.iteritems() if v == ''))
1519
            self.node.attribute_set(dest_version_id, domain, node, (
1520
                (k, v) for k, v in meta.iteritems() if v != ''))
1521
        else:
1522
            self.node.attribute_del(dest_version_id, domain)
1523
            self.node.attribute_set(dest_version_id, domain, node, ((
1524
                k, v) for k, v in meta.iteritems()))
1525

    
1526
    def _put_metadata(self, user, node, domain, meta, replace=False,
1527
                      update_statistics_ancestors_depth=None):
1528
        """Create a new version and store metadata."""
1529

    
1530
        src_version_id, dest_version_id = self._put_version_duplicate(
1531
            user, node,
1532
            update_statistics_ancestors_depth=
1533
            update_statistics_ancestors_depth)
1534
        self._put_metadata_duplicate(
1535
            src_version_id, dest_version_id, domain, node, meta, replace)
1536
        return src_version_id, dest_version_id
1537

    
1538
    def _list_limits(self, listing, marker, limit):
1539
        start = 0
1540
        if marker:
1541
            try:
1542
                start = listing.index(marker) + 1
1543
            except ValueError:
1544
                pass
1545
        if not limit or limit > 10000:
1546
            limit = 10000
1547
        return start, limit
1548

    
1549
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1550
                                marker=None, limit=10000, virtual=True,
1551
                                domain=None, keys=None, until=None,
1552
                                size_range=None, allowed=None,
1553
                                all_props=False):
1554
        keys = keys or []
1555
        allowed = allowed or []
1556
        cont_prefix = path + '/'
1557
        prefix = cont_prefix + prefix
1558
        start = cont_prefix + marker if marker else None
1559
        before = until if until is not None else inf
1560
        filterq = keys if domain else []
1561
        sizeq = size_range
1562

    
1563
        objects, prefixes = self.node.latest_version_list(
1564
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1565
            allowed, domain, filterq, sizeq, all_props)
1566
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1567
        objects.sort(key=lambda x: x[0])
1568
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1569
        return objects
1570

    
1571
    # Reporting functions.
1572

    
1573
    @debug_method
1574
    @backend_method
1575
    def _report_size_change(self, user, account, size, details=None):
1576
        details = details or {}
1577

    
1578
        if size == 0:
1579
            return
1580

    
1581
        account_node = self._lookup_account(account, True)[1]
1582
        total = self._get_statistics(account_node, compute=True)[1]
1583
        details.update({'user': user, 'total': total})
1584
        self.messages.append(
1585
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1586
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1587

    
1588
        if not self.using_external_quotaholder:
1589
            return
1590

    
1591
        try:
1592
            name = details['path'] if 'path' in details else ''
1593
            serial = self.astakosclient.issue_one_commission(
1594
                holder=account,
1595
                source=DEFAULT_SOURCE,
1596
                provisions={'pithos.diskspace': size},
1597
                name=name)
1598
        except BaseException, e:
1599
            raise QuotaError(e)
1600
        else:
1601
            self.serials.append(serial)
1602

    
1603
    @debug_method
1604
    @backend_method
1605
    def _report_object_change(self, user, account, path, details=None):
1606
        details = details or {}
1607
        details.update({'user': user})
1608
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1609
                              account, QUEUE_INSTANCE_ID, 'object', path,
1610
                              details))
1611

    
1612
    @debug_method
1613
    @backend_method
1614
    def _report_sharing_change(self, user, account, path, details=None):
1615
        details = details or {}
1616
        details.update({'user': user})
1617
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1618
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1619
                              details))
1620

    
1621
    # Policy functions.
1622

    
1623
    def _check_policy(self, policy, is_account_policy=True):
1624
        default_policy = self.default_account_policy \
1625
            if is_account_policy else self.default_container_policy
1626
        for k in policy.keys():
1627
            if policy[k] == '':
1628
                policy[k] = default_policy.get(k)
1629
        for k, v in policy.iteritems():
1630
            if k == 'quota':
1631
                q = int(v)  # May raise ValueError.
1632
                if q < 0:
1633
                    raise ValueError
1634
            elif k == 'versioning':
1635
                if v not in ['auto', 'none']:
1636
                    raise ValueError
1637
            else:
1638
                raise ValueError
1639

    
1640
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1641
        default_policy = self.default_account_policy \
1642
            if is_account_policy else self.default_container_policy
1643
        if replace:
1644
            for k, v in default_policy.iteritems():
1645
                if k not in policy:
1646
                    policy[k] = v
1647
        self.node.policy_set(node, policy)
1648

    
1649
    def _get_policy(self, node, is_account_policy=True):
1650
        default_policy = self.default_account_policy \
1651
            if is_account_policy else self.default_container_policy
1652
        policy = default_policy.copy()
1653
        policy.update(self.node.policy_get(node))
1654
        return policy
1655

    
1656
    def _apply_versioning(self, account, container, version_id,
1657
                          update_statistics_ancestors_depth=None):
1658
        """Delete the provided version if such is the policy.
1659
           Return size of object removed.
1660
        """
1661

    
1662
        if version_id is None:
1663
            return 0
1664
        path, node = self._lookup_container(account, container)
1665
        versioning = self._get_policy(
1666
            node, is_account_policy=False)['versioning']
1667
        if versioning != 'auto':
1668
            hash, size = self.node.version_remove(
1669
                version_id, update_statistics_ancestors_depth)
1670
            self.store.map_delete(hash)
1671
            return size
1672
        elif self.free_versioning:
1673
            return self.node.version_get_properties(
1674
                version_id, keys=('size',))[0]
1675
        return 0
1676

    
1677
    # Access control functions.
1678

    
1679
    def _check_groups(self, groups):
1680
        # raise ValueError('Bad characters in groups')
1681
        pass
1682

    
1683
    def _check_permissions(self, path, permissions):
1684
        # raise ValueError('Bad characters in permissions')
1685
        pass
1686

    
1687
    def _get_formatted_paths(self, paths):
1688
        formatted = []
1689
        if len(paths) == 0:
1690
            return formatted
1691
        props = self.node.get_props(paths)
1692
        if props:
1693
            for prop in props:
1694
                if prop[1].split(';', 1)[0].strip() in (
1695
                        'application/directory', 'application/folder'):
1696
                    formatted.append((prop[0].rstrip('/') + '/',
1697
                                      self.MATCH_PREFIX))
1698
                formatted.append((prop[0], self.MATCH_EXACT))
1699
        return formatted
1700

    
1701
    def _get_permissions_path(self, account, container, name):
1702
        path = '/'.join((account, container, name))
1703
        permission_paths = self.permissions.access_inherit(path)
1704
        permission_paths.sort()
1705
        permission_paths.reverse()
1706
        for p in permission_paths:
1707
            if p == path:
1708
                return p
1709
            else:
1710
                if p.count('/') < 2:
1711
                    continue
1712
                node = self.node.node_lookup(p)
1713
                props = None
1714
                if node is not None:
1715
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1716
                if props is not None:
1717
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1718
                            'application/directory', 'application/folder'):
1719
                        return p
1720
        return None
1721

    
1722
    def _get_permissions_path_bulk(self, account, container, names):
1723
        formatted_paths = []
1724
        for name in names:
1725
            path = '/'.join((account, container, name))
1726
            formatted_paths.append(path)
1727
        permission_paths = self.permissions.access_inherit_bulk(
1728
            formatted_paths)
1729
        permission_paths.sort()
1730
        permission_paths.reverse()
1731
        permission_paths_list = []
1732
        lookup_list = []
1733
        for p in permission_paths:
1734
            if p in formatted_paths:
1735
                permission_paths_list.append(p)
1736
            else:
1737
                if p.count('/') < 2:
1738
                    continue
1739
                lookup_list.append(p)
1740

    
1741
        if len(lookup_list) > 0:
1742
            props = self.node.get_props(lookup_list)
1743
            if props:
1744
                for prop in props:
1745
                    if prop[1].split(';', 1)[0].strip() in (
1746
                            'application/directory', 'application/folder'):
1747
                        permission_paths_list.append(prop[0])
1748

    
1749
        if len(permission_paths_list) > 0:
1750
            return permission_paths_list
1751

    
1752
        return None
1753

    
1754
    def _can_read(self, user, account, container, name):
1755
        if user == account:
1756
            return True
1757
        path = '/'.join((account, container, name))
1758
        if self.permissions.public_get(path) is not None:
1759
            return True
1760
        path = self._get_permissions_path(account, container, name)
1761
        if not path:
1762
            raise NotAllowedError
1763
        if (not self.permissions.access_check(path, self.READ, user) and not
1764
                self.permissions.access_check(path, self.WRITE, user)):
1765
            raise NotAllowedError
1766

    
1767
    def _can_write(self, user, account, container, name):
1768
        if user == account:
1769
            return True
1770
        path = '/'.join((account, container, name))
1771
        path = self._get_permissions_path(account, container, name)
1772
        if not path:
1773
            raise NotAllowedError
1774
        if not self.permissions.access_check(path, self.WRITE, user):
1775
            raise NotAllowedError
1776

    
1777
    def _allowed_accounts(self, user):
1778
        allow = set()
1779
        for path in self.permissions.access_list_paths(user):
1780
            allow.add(path.split('/', 1)[0])
1781
        return sorted(allow)
1782

    
1783
    def _allowed_containers(self, user, account):
1784
        allow = set()
1785
        for path in self.permissions.access_list_paths(user, account):
1786
            allow.add(path.split('/', 2)[1])
1787
        return sorted(allow)
1788

    
1789
    # Domain functions
1790

    
1791
    @debug_method
1792
    @backend_method
1793
    def get_domain_objects(self, domain, user=None):
1794
        allowed_paths = self.permissions.access_list_paths(
1795
            user, include_owned=user is not None, include_containers=False)
1796
        if not allowed_paths:
1797
            return []
1798
        obj_list = self.node.domain_object_list(
1799
            domain, allowed_paths, CLUSTER_NORMAL)
1800
        return [(path,
1801
                 self._build_metadata(props, user_defined_meta),
1802
                 self.permissions.access_get(path)) for
1803
                path, props, user_defined_meta in obj_list]
1804

    
1805
    # util functions
1806

    
1807
    def _build_metadata(self, props, user_defined=None,
1808
                        include_user_defined=True):
1809
        meta = {'bytes': props[self.SIZE],
1810
                'type': props[self.TYPE],
1811
                'hash': props[self.HASH],
1812
                'version': props[self.SERIAL],
1813
                'version_timestamp': props[self.MTIME],
1814
                'modified_by': props[self.MUSER],
1815
                'uuid': props[self.UUID],
1816
                'checksum': props[self.CHECKSUM]}
1817
        if include_user_defined and user_defined is not None:
1818
            meta.update(user_defined)
1819
        return meta
1820

    
1821
    def _exists(self, node):
1822
        try:
1823
            self._get_version(node)
1824
        except ItemNotExists:
1825
            return False
1826
        else:
1827
            return True
1828

    
1829
    def _unhexlify_hash(self, hash):
1830
        try:
1831
            return binascii.unhexlify(hash)
1832
        except TypeError:
1833
            raise InvalidHash(hash)