Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 91fc9266

History | View | Annotate | Download (74.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def backend_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        # if we are inside a database transaction
131
        # just proceed with the method execution
132
        # otherwise manage a new transaction
133
        if self.in_transaction:
134
            return func(self, *args, **kw)
135

    
136
        try:
137
            self.pre_exec()
138
            result = func(self, *args, **kw)
139
            success_status = True
140
            return result
141
        except:
142
            success_status = False
143
            raise
144
        finally:
145
            self.post_exec(success_status)
146
    return wrapper
147

    
148

    
149
def debug_method(func):
150
    @wraps(func)
151
    def wrapper(self, *args, **kw):
152
        try:
153
            result = func(self, *args, **kw)
154
            return result
155
        except:
156
            result = format_exc()
157
            raise
158
        finally:
159
            all_args = map(repr, args)
160
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
161
            logger.debug(">>> %s(%s) <<< %s" % (
162
                func.__name__, ', '.join(all_args).rstrip(', '), result))
163
    return wrapper
164

    
165

    
166
class ModularBackend(BaseBackend):
167
    """A modular backend.
168

169
    Uses modules for SQL functions and storage.
170
    """
171

    
172
    def __init__(self, db_module=None, db_connection=None,
173
                 block_module=None, block_path=None, block_umask=None,
174
                 block_size=None, hash_algorithm=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_auth_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        block_size = block_size or DEFAULT_BLOCK_SIZE
191
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
192
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
193
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
194
        container_quota_policy = container_quota_policy \
195
            or DEFAULT_CONTAINER_QUOTA
196
        container_versioning_policy = container_versioning_policy \
197
            or DEFAULT_CONTAINER_VERSIONING
198

    
199
        self.default_account_policy = {'quota': account_quota_policy}
200
        self.default_container_policy = {
201
            'quota': container_quota_policy,
202
            'versioning': container_versioning_policy
203
        }
204
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
205
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
206

    
207
        self.public_url_security = (public_url_security or
208
                                    DEFAULT_PUBLIC_URL_SECURITY)
209
        self.public_url_alphabet = (public_url_alphabet or
210
                                    DEFAULT_PUBLIC_URL_ALPHABET)
211

    
212
        self.hash_algorithm = hash_algorithm
213
        self.block_size = block_size
214
        self.free_versioning = free_versioning
215

    
216
        def load_module(m):
217
            __import__(m)
218
            return sys.modules[m]
219

    
220
        self.db_module = load_module(db_module)
221
        self.wrapper = self.db_module.DBWrapper(db_connection)
222
        params = {'wrapper': self.wrapper}
223
        self.permissions = self.db_module.Permissions(**params)
224
        self.config = self.db_module.Config(**params)
225
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
226
        for x in ['READ', 'WRITE']:
227
            setattr(self, x, getattr(self.db_module, x))
228
        self.node = self.db_module.Node(**params)
229
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
230
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
231
                  'MATCH_PREFIX', 'MATCH_EXACT']:
232
            setattr(self, x, getattr(self.db_module, x))
233

    
234
        self.ALLOWED = ['read', 'write']
235

    
236
        self.block_module = load_module(block_module)
237
        self.block_params = block_params
238
        params = {'path': block_path,
239
                  'block_size': self.block_size,
240
                  'hash_algorithm': self.hash_algorithm,
241
                  'umask': block_umask}
242
        params.update(self.block_params)
243
        self.store = self.block_module.Store(**params)
244

    
245
        if queue_module and queue_hosts:
246
            self.queue_module = load_module(queue_module)
247
            params = {'hosts': queue_hosts,
248
                      'exchange': queue_exchange,
249
                      'client_id': QUEUE_CLIENT_ID}
250
            self.queue = self.queue_module.Queue(**params)
251
        else:
252
            class NoQueue:
253
                def send(self, *args):
254
                    pass
255

    
256
                def close(self):
257
                    pass
258

    
259
            self.queue = NoQueue()
260

    
261
        self.astakos_auth_url = astakos_auth_url
262
        self.service_token = service_token
263

    
264
        if not astakos_auth_url or not AstakosClient:
265
            self.astakosclient = DisabledAstakosClient(
266
                service_token, astakos_auth_url,
267
                use_pool=True,
268
                pool_size=astakosclient_poolsize)
269
        else:
270
            self.astakosclient = AstakosClient(
271
                service_token, astakos_auth_url,
272
                use_pool=True,
273
                pool_size=astakosclient_poolsize)
274

    
275
        self.serials = []
276
        self.messages = []
277

    
278
        self._move_object = partial(self._copy_object, is_move=True)
279

    
280
        self.lock_container_path = False
281

    
282
        self.in_transaction = False
283

    
284
    def pre_exec(self, lock_container_path=False):
285
        self.lock_container_path = lock_container_path
286
        self.wrapper.execute()
287
        self.serials = []
288
        self.in_transaction = True
289

    
290
    def post_exec(self, success_status=True):
291
        if success_status:
292
            # send messages produced
293
            for m in self.messages:
294
                self.queue.send(*m)
295

    
296
            # register serials
297
            if self.serials:
298
                self.commission_serials.insert_many(
299
                    self.serials)
300

    
301
                # commit to ensure that the serials are registered
302
                # even if resolve commission fails
303
                self.wrapper.commit()
304

    
305
                # start new transaction
306
                self.wrapper.execute()
307

    
308
                r = self.astakosclient.resolve_commissions(
309
                    accept_serials=self.serials,
310
                    reject_serials=[])
311
                self.commission_serials.delete_many(
312
                    r['accepted'])
313

    
314
            self.wrapper.commit()
315
        else:
316
            if self.serials:
317
                r = self.astakosclient.resolve_commissions(
318
                    accept_serials=[],
319
                    reject_serials=self.serials)
320
                self.commission_serials.delete_many(
321
                    r['rejected'])
322
            self.wrapper.rollback()
323
        self.in_transaction = False
324

    
325
    def close(self):
326
        self.wrapper.close()
327
        self.queue.close()
328

    
329
    @property
330
    def using_external_quotaholder(self):
331
        return not isinstance(self.astakosclient, DisabledAstakosClient)
332

    
333
    @debug_method
334
    @backend_method
335
    def list_accounts(self, user, marker=None, limit=10000):
336
        """Return a list of accounts the user can access."""
337

    
338
        allowed = self._allowed_accounts(user)
339
        start, limit = self._list_limits(allowed, marker, limit)
340
        return allowed[start:start + limit]
341

    
342
    @debug_method
343
    @backend_method
344
    def get_account_meta(
345
            self, user, account, domain, until=None, include_user_defined=True,
346
            external_quota=None):
347
        """Return a dictionary with the account metadata for the domain."""
348

    
349
        path, node = self._lookup_account(account, user == account)
350
        if user != account:
351
            if until or (node is None) or (account not
352
                                           in self._allowed_accounts(user)):
353
                raise NotAllowedError
354
        try:
355
            props = self._get_properties(node, until)
356
            mtime = props[self.MTIME]
357
        except NameError:
358
            props = None
359
            mtime = until
360
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
361
        tstamp = max(tstamp, mtime)
362
        if until is None:
363
            modified = tstamp
364
        else:
365
            modified = self._get_statistics(
366
                node, compute=True)[2]  # Overall last modification.
367
            modified = max(modified, mtime)
368

    
369
        if user != account:
370
            meta = {'name': account}
371
        else:
372
            meta = {}
373
            if props is not None and include_user_defined:
374
                meta.update(
375
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
376
            if until is not None:
377
                meta.update({'until_timestamp': tstamp})
378
            meta.update({'name': account, 'count': count, 'bytes': bytes})
379
            if self.using_external_quotaholder:
380
                external_quota = external_quota or {}
381
                meta['bytes'] = external_quota.get('usage', 0)
382
        meta.update({'modified': modified})
383
        return meta
384

    
385
    @debug_method
386
    @backend_method
387
    def update_account_meta(self, user, account, domain, meta, replace=False):
388
        """Update the metadata associated with the account for the domain."""
389

    
390
        if user != account:
391
            raise NotAllowedError
392
        path, node = self._lookup_account(account, True)
393
        self._put_metadata(user, node, domain, meta, replace,
394
                           update_statistics_ancestors_depth=-1)
395

    
396
    @debug_method
397
    @backend_method
398
    def get_account_groups(self, user, account):
399
        """Return a dictionary with the user groups defined for the account."""
400

    
401
        if user != account:
402
            if account not in self._allowed_accounts(user):
403
                raise NotAllowedError
404
            return {}
405
        self._lookup_account(account, True)
406
        return self.permissions.group_dict(account)
407

    
408
    @debug_method
409
    @backend_method
410
    def update_account_groups(self, user, account, groups, replace=False):
411
        """Update the groups associated with the account."""
412

    
413
        if user != account:
414
            raise NotAllowedError
415
        self._lookup_account(account, True)
416
        self._check_groups(groups)
417
        if replace:
418
            self.permissions.group_destroy(account)
419
        for k, v in groups.iteritems():
420
            if not replace:  # If not already deleted.
421
                self.permissions.group_delete(account, k)
422
            if v:
423
                self.permissions.group_addmany(account, k, v)
424

    
425
    @debug_method
426
    @backend_method
427
    def get_account_policy(self, user, account, external_quota=None):
428
        """Return a dictionary with the account policy."""
429

    
430
        if user != account:
431
            if account not in self._allowed_accounts(user):
432
                raise NotAllowedError
433
            return {}
434
        path, node = self._lookup_account(account, True)
435
        policy = self._get_policy(node, is_account_policy=True)
436
        if self.using_external_quotaholder:
437
            external_quota = external_quota or {}
438
            policy['quota'] = external_quota.get('limit', 0)
439
        return policy
440

    
441
    @debug_method
442
    @backend_method
443
    def update_account_policy(self, user, account, policy, replace=False):
444
        """Update the policy associated with the account."""
445

    
446
        if user != account:
447
            raise NotAllowedError
448
        path, node = self._lookup_account(account, True)
449
        self._check_policy(policy, is_account_policy=True)
450
        self._put_policy(node, policy, replace, is_account_policy=True)
451

    
452
    @debug_method
453
    @backend_method
454
    def put_account(self, user, account, policy=None):
455
        """Create a new account with the given name."""
456

    
457
        policy = policy or {}
458
        if user != account:
459
            raise NotAllowedError
460
        node = self.node.node_lookup(account)
461
        if node is not None:
462
            raise AccountExists('Account already exists')
463
        if policy:
464
            self._check_policy(policy, is_account_policy=True)
465
        node = self._put_path(user, self.ROOTNODE, account,
466
                              update_statistics_ancestors_depth=-1)
467
        self._put_policy(node, policy, True, is_account_policy=True)
468

    
469
    @debug_method
470
    @backend_method
471
    def delete_account(self, user, account):
472
        """Delete the account with the given name."""
473

    
474
        if user != account:
475
            raise NotAllowedError
476
        node = self.node.node_lookup(account)
477
        if node is None:
478
            return
479
        if not self.node.node_remove(node,
480
                                     update_statistics_ancestors_depth=-1):
481
            raise AccountNotEmpty('Account is not empty')
482
        self.permissions.group_destroy(account)
483

    
484
    @debug_method
485
    @backend_method
486
    def list_containers(self, user, account, marker=None, limit=10000,
487
                        shared=False, until=None, public=False):
488
        """Return a list of containers existing under an account."""
489

    
490
        if user != account:
491
            if until or account not in self._allowed_accounts(user):
492
                raise NotAllowedError
493
            allowed = self._allowed_containers(user, account)
494
            start, limit = self._list_limits(allowed, marker, limit)
495
            return allowed[start:start + limit]
496
        if shared or public:
497
            allowed = set()
498
            if shared:
499
                allowed.update([x.split('/', 2)[1] for x in
500
                               self.permissions.access_list_shared(account)])
501
            if public:
502
                allowed.update([x[0].split('/', 2)[1] for x in
503
                               self.permissions.public_list(account)])
504
            allowed = sorted(allowed)
505
            start, limit = self._list_limits(allowed, marker, limit)
506
            return allowed[start:start + limit]
507
        node = self.node.node_lookup(account)
508
        containers = [x[0] for x in self._list_object_properties(
509
            node, account, '', '/', marker, limit, False, None, [], until)]
510
        start, limit = self._list_limits(
511
            [x[0] for x in containers], marker, limit)
512
        return containers[start:start + limit]
513

    
514
    @debug_method
515
    @backend_method
516
    def list_container_meta(self, user, account, container, domain,
517
                            until=None):
518
        """Return a list of the container's object meta keys for a domain."""
519

    
520
        allowed = []
521
        if user != account:
522
            if until:
523
                raise NotAllowedError
524
            allowed = self.permissions.access_list_paths(
525
                user, '/'.join((account, container)))
526
            if not allowed:
527
                raise NotAllowedError
528
        path, node = self._lookup_container(account, container)
529
        before = until if until is not None else inf
530
        allowed = self._get_formatted_paths(allowed)
531
        return self.node.latest_attribute_keys(node, domain, before,
532
                                               CLUSTER_DELETED, allowed)
533

    
534
    @debug_method
535
    @backend_method
536
    def get_container_meta(self, user, account, container, domain, until=None,
537
                           include_user_defined=True):
538
        """Return a dictionary with the container metadata for the domain."""
539

    
540
        if user != account:
541
            if until or container not in self._allowed_containers(user,
542
                                                                  account):
543
                raise NotAllowedError
544
        path, node = self._lookup_container(account, container)
545
        props = self._get_properties(node, until)
546
        mtime = props[self.MTIME]
547
        count, bytes, tstamp = self._get_statistics(node, until)
548
        tstamp = max(tstamp, mtime)
549
        if until is None:
550
            modified = tstamp
551
        else:
552
            modified = self._get_statistics(
553
                node)[2]  # Overall last modification.
554
            modified = max(modified, mtime)
555

    
556
        if user != account:
557
            meta = {'name': container}
558
        else:
559
            meta = {}
560
            if include_user_defined:
561
                meta.update(
562
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
563
            if until is not None:
564
                meta.update({'until_timestamp': tstamp})
565
            meta.update({'name': container, 'count': count, 'bytes': bytes})
566
        meta.update({'modified': modified})
567
        return meta
568

    
569
    @debug_method
570
    @backend_method
571
    def update_container_meta(self, user, account, container, domain, meta,
572
                              replace=False):
573
        """Update the metadata associated with the container for the domain."""
574

    
575
        if user != account:
576
            raise NotAllowedError
577
        path, node = self._lookup_container(account, container)
578
        src_version_id, dest_version_id = self._put_metadata(
579
            user, node, domain, meta, replace,
580
            update_statistics_ancestors_depth=0)
581
        if src_version_id is not None:
582
            versioning = self._get_policy(
583
                node, is_account_policy=False)['versioning']
584
            if versioning != 'auto':
585
                self.node.version_remove(src_version_id,
586
                                         update_statistics_ancestors_depth=0)
587

    
588
    @debug_method
589
    @backend_method
590
    def get_container_policy(self, user, account, container):
591
        """Return a dictionary with the container policy."""
592

    
593
        if user != account:
594
            if container not in self._allowed_containers(user, account):
595
                raise NotAllowedError
596
            return {}
597
        path, node = self._lookup_container(account, container)
598
        return self._get_policy(node, is_account_policy=False)
599

    
600
    @debug_method
601
    @backend_method
602
    def update_container_policy(self, user, account, container, policy,
603
                                replace=False):
604
        """Update the policy associated with the container."""
605

    
606
        if user != account:
607
            raise NotAllowedError
608
        path, node = self._lookup_container(account, container)
609
        self._check_policy(policy, is_account_policy=False)
610
        self._put_policy(node, policy, replace, is_account_policy=False)
611

    
612
    @debug_method
613
    @backend_method
614
    def put_container(self, user, account, container, policy=None):
615
        """Create a new container with the given name."""
616

    
617
        policy = policy or {}
618
        if user != account:
619
            raise NotAllowedError
620
        try:
621
            path, node = self._lookup_container(account, container)
622
        except NameError:
623
            pass
624
        else:
625
            raise ContainerExists('Container already exists')
626
        if policy:
627
            self._check_policy(policy, is_account_policy=False)
628
        path = '/'.join((account, container))
629
        node = self._put_path(
630
            user, self._lookup_account(account, True)[1], path,
631
            update_statistics_ancestors_depth=-1)
632
        self._put_policy(node, policy, True, is_account_policy=False)
633

    
634
    @debug_method
635
    @backend_method
636
    def delete_container(self, user, account, container, until=None, prefix='',
637
                         delimiter=None):
638
        """Delete/purge the container with the given name."""
639

    
640
        if user != account:
641
            raise NotAllowedError
642
        path, node = self._lookup_container(account, container)
643

    
644
        if until is not None:
645
            hashes, size, serials = self.node.node_purge_children(
646
                node, until, CLUSTER_HISTORY,
647
                update_statistics_ancestors_depth=0)
648
            for h in hashes:
649
                self.store.map_delete(h)
650
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
651
                                          update_statistics_ancestors_depth=0)
652
            if not self.free_versioning:
653
                self._report_size_change(
654
                    user, account, -size, {
655
                        'action': 'container purge',
656
                        'path': path,
657
                        'versions': ','.join(str(i) for i in serials)
658
                    }
659
                )
660
            return
661

    
662
        if not delimiter:
663
            if self._get_statistics(node)[0] > 0:
664
                raise ContainerNotEmpty('Container is not empty')
665
            hashes, size, serials = self.node.node_purge_children(
666
                node, inf, CLUSTER_HISTORY,
667
                update_statistics_ancestors_depth=0)
668
            for h in hashes:
669
                self.store.map_delete(h)
670
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
671
                                          update_statistics_ancestors_depth=0)
672
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
673
            if not self.free_versioning:
674
                self._report_size_change(
675
                    user, account, -size, {
676
                        'action': 'container purge',
677
                        'path': path,
678
                        'versions': ','.join(str(i) for i in serials)
679
                    }
680
                )
681
        else:
682
            # remove only contents
683
            src_names = self._list_objects_no_limit(
684
                user, account, container, prefix='', delimiter=None,
685
                virtual=False, domain=None, keys=[], shared=False, until=None,
686
                size_range=None, all_props=True, public=False)
687
            paths = []
688
            for t in src_names:
689
                path = '/'.join((account, container, t[0]))
690
                node = t[2]
691
                if not self._exists(node):
692
                    continue
693
                src_version_id, dest_version_id = self._put_version_duplicate(
694
                    user, node, size=0, type='', hash=None, checksum='',
695
                    cluster=CLUSTER_DELETED,
696
                    update_statistics_ancestors_depth=1)
697
                del_size = self._apply_versioning(
698
                    account, container, src_version_id,
699
                    update_statistics_ancestors_depth=1)
700
                self._report_size_change(
701
                    user, account, -del_size, {
702
                        'action': 'object delete',
703
                        'path': path,
704
                        'versions': ','.join([str(dest_version_id)])})
705
                self._report_object_change(
706
                    user, account, path, details={'action': 'object delete'})
707
                paths.append(path)
708
            self.permissions.access_clear_bulk(paths)
709

    
710
    def _list_objects(self, user, account, container, prefix, delimiter,
711
                      marker, limit, virtual, domain, keys, shared, until,
712
                      size_range, all_props, public):
713
        if user != account and until:
714
            raise NotAllowedError
715
        if shared and public:
716
            # get shared first
717
            shared_paths = self._list_object_permissions(
718
                user, account, container, prefix, shared=True, public=False)
719
            objects = set()
720
            if shared_paths:
721
                path, node = self._lookup_container(account, container)
722
                shared_paths = self._get_formatted_paths(shared_paths)
723
                objects |= set(self._list_object_properties(
724
                    node, path, prefix, delimiter, marker, limit, virtual,
725
                    domain, keys, until, size_range, shared_paths, all_props))
726

    
727
            # get public
728
            objects |= set(self._list_public_object_properties(
729
                user, account, container, prefix, all_props))
730
            objects = list(objects)
731

    
732
            objects.sort(key=lambda x: x[0])
733
            start, limit = self._list_limits(
734
                [x[0] for x in objects], marker, limit)
735
            return objects[start:start + limit]
736
        elif public:
737
            objects = self._list_public_object_properties(
738
                user, account, container, prefix, all_props)
739
            start, limit = self._list_limits(
740
                [x[0] for x in objects], marker, limit)
741
            return objects[start:start + limit]
742

    
743
        allowed = self._list_object_permissions(
744
            user, account, container, prefix, shared, public)
745
        if shared and not allowed:
746
            return []
747
        path, node = self._lookup_container(account, container)
748
        allowed = self._get_formatted_paths(allowed)
749
        objects = self._list_object_properties(
750
            node, path, prefix, delimiter, marker, limit, virtual, domain,
751
            keys, until, size_range, allowed, all_props)
752
        start, limit = self._list_limits(
753
            [x[0] for x in objects], marker, limit)
754
        return objects[start:start + limit]
755

    
756
    def _list_public_object_properties(self, user, account, container, prefix,
757
                                       all_props):
758
        public = self._list_object_permissions(
759
            user, account, container, prefix, shared=False, public=True)
760
        paths, nodes = self._lookup_objects(public)
761
        path = '/'.join((account, container))
762
        cont_prefix = path + '/'
763
        paths = [x[len(cont_prefix):] for x in paths]
764
        objects = [(p,) + props for p, props in
765
                   zip(paths, self.node.version_lookup_bulk(
766
                       nodes, all_props=all_props))]
767
        return objects
768

    
769
    def _list_objects_no_limit(self, user, account, container, prefix,
770
                               delimiter, virtual, domain, keys, shared, until,
771
                               size_range, all_props, public):
772
        objects = []
773
        while True:
774
            marker = objects[-1] if objects else None
775
            limit = 10000
776
            l = self._list_objects(
777
                user, account, container, prefix, delimiter, marker, limit,
778
                virtual, domain, keys, shared, until, size_range, all_props,
779
                public)
780
            objects.extend(l)
781
            if not l or len(l) < limit:
782
                break
783
        return objects
784

    
785
    def _list_object_permissions(self, user, account, container, prefix,
786
                                 shared, public):
787
        allowed = []
788
        path = '/'.join((account, container, prefix)).rstrip('/')
789
        if user != account:
790
            allowed = self.permissions.access_list_paths(user, path)
791
            if not allowed:
792
                raise NotAllowedError
793
        else:
794
            allowed = set()
795
            if shared:
796
                allowed.update(self.permissions.access_list_shared(path))
797
            if public:
798
                allowed.update(
799
                    [x[0] for x in self.permissions.public_list(path)])
800
            allowed = sorted(allowed)
801
            if not allowed:
802
                return []
803
        return allowed
804

    
805
    @debug_method
806
    @backend_method
807
    def list_objects(self, user, account, container, prefix='', delimiter=None,
808
                     marker=None, limit=10000, virtual=True, domain=None,
809
                     keys=None, shared=False, until=None, size_range=None,
810
                     public=False):
811
        """List (object name, object version_id) under a container."""
812

    
813
        keys = keys or []
814
        return self._list_objects(
815
            user, account, container, prefix, delimiter, marker, limit,
816
            virtual, domain, keys, shared, until, size_range, False, public)
817

    
818
    @debug_method
819
    @backend_method
820
    def list_object_meta(self, user, account, container, prefix='',
821
                         delimiter=None, marker=None, limit=10000,
822
                         virtual=True, domain=None, keys=None, shared=False,
823
                         until=None, size_range=None, public=False):
824
        """Return a list of metadata dicts of objects under a container."""
825

    
826
        keys = keys or []
827
        props = self._list_objects(
828
            user, account, container, prefix, delimiter, marker, limit,
829
            virtual, domain, keys, shared, until, size_range, True, public)
830
        objects = []
831
        for p in props:
832
            if len(p) == 2:
833
                objects.append({'subdir': p[0]})
834
            else:
835
                objects.append({
836
                    'name': p[0],
837
                    'bytes': p[self.SIZE + 1],
838
                    'type': p[self.TYPE + 1],
839
                    'hash': p[self.HASH + 1],
840
                    'version': p[self.SERIAL + 1],
841
                    'version_timestamp': p[self.MTIME + 1],
842
                    'modified': p[self.MTIME + 1] if until is None else None,
843
                    'modified_by': p[self.MUSER + 1],
844
                    'uuid': p[self.UUID + 1],
845
                    'checksum': p[self.CHECKSUM + 1]})
846
        return objects
847

    
848
    @debug_method
849
    @backend_method
850
    def list_object_permissions(self, user, account, container, prefix=''):
851
        """Return a list of paths enforce permissions under a container."""
852

    
853
        return self._list_object_permissions(user, account, container, prefix,
854
                                             True, False)
855

    
856
    @debug_method
857
    @backend_method
858
    def list_object_public(self, user, account, container, prefix=''):
859
        """Return a mapping of object paths to public ids under a container."""
860

    
861
        public = {}
862
        for path, p in self.permissions.public_list('/'.join((account,
863
                                                              container,
864
                                                              prefix))):
865
            public[path] = p
866
        return public
867

    
868
    @debug_method
869
    @backend_method
870
    def get_object_meta(self, user, account, container, name, domain,
871
                        version=None, include_user_defined=True):
872
        """Return a dictionary with the object metadata for the domain."""
873

    
874
        self._can_read(user, account, container, name)
875
        path, node = self._lookup_object(account, container, name)
876
        props = self._get_version(node, version)
877
        if version is None:
878
            modified = props[self.MTIME]
879
        else:
880
            try:
881
                modified = self._get_version(
882
                    node)[self.MTIME]  # Overall last modification.
883
            except NameError:  # Object may be deleted.
884
                del_props = self.node.version_lookup(
885
                    node, inf, CLUSTER_DELETED)
886
                if del_props is None:
887
                    raise ItemNotExists('Object does not exist')
888
                modified = del_props[self.MTIME]
889

    
890
        meta = {}
891
        if include_user_defined:
892
            meta.update(
893
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
894
        meta.update({'name': name,
895
                     'bytes': props[self.SIZE],
896
                     'type': props[self.TYPE],
897
                     'hash': props[self.HASH],
898
                     'version': props[self.SERIAL],
899
                     'version_timestamp': props[self.MTIME],
900
                     'modified': modified,
901
                     'modified_by': props[self.MUSER],
902
                     'uuid': props[self.UUID],
903
                     'checksum': props[self.CHECKSUM]})
904
        return meta
905

    
906
    @debug_method
907
    @backend_method
908
    def update_object_meta(self, user, account, container, name, domain, meta,
909
                           replace=False):
910
        """Update object metadata for a domain and return the new version."""
911

    
912
        self._can_write(user, account, container, name)
913

    
914
        path, node = self._lookup_object(account, container, name,
915
                                         lock_container=True)
916
        src_version_id, dest_version_id = self._put_metadata(
917
            user, node, domain, meta, replace,
918
            update_statistics_ancestors_depth=1)
919
        self._apply_versioning(account, container, src_version_id,
920
                               update_statistics_ancestors_depth=1)
921
        return dest_version_id
922

    
923
    @debug_method
924
    @backend_method
925
    def get_object_permissions_bulk(self, user, account, container, names):
926
        """Return the action allowed on the object, the path
927
        from which the object gets its permissions from,
928
        along with a dictionary containing the permissions."""
929

    
930
        permissions_path = self._get_permissions_path_bulk(account, container,
931
                                                           names)
932
        access_objects = self.permissions.access_check_bulk(permissions_path,
933
                                                            user)
934
        #group_parents = access_objects['group_parents']
935
        nobject_permissions = {}
936
        cpath = '/'.join((account, container, ''))
937
        cpath_idx = len(cpath)
938
        for path in permissions_path:
939
            allowed = 1
940
            name = path[cpath_idx:]
941
            if user != account:
942
                try:
943
                    allowed = access_objects[path]
944
                except KeyError:
945
                    raise NotAllowedError
946
            access_dict, allowed = \
947
                self.permissions.access_get_for_bulk(access_objects[path])
948
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
949
                                         access_dict)
950
        self._lookup_objects(permissions_path)
951
        return nobject_permissions
952

    
953
    @debug_method
954
    @backend_method
955
    def get_object_permissions(self, user, account, container, name):
956
        """Return the action allowed on the object, the path
957
        from which the object gets its permissions from,
958
        along with a dictionary containing the permissions."""
959

    
960
        allowed = 'write'
961
        permissions_path = self._get_permissions_path(account, container, name)
962
        if user != account:
963
            if self.permissions.access_check(permissions_path, self.WRITE,
964
                                             user):
965
                allowed = 'write'
966
            elif self.permissions.access_check(permissions_path, self.READ,
967
                                               user):
968
                allowed = 'read'
969
            else:
970
                raise NotAllowedError
971
        self._lookup_object(account, container, name)
972
        return (allowed,
973
                permissions_path,
974
                self.permissions.access_get(permissions_path))
975

    
976
    @debug_method
977
    @backend_method
978
    def update_object_permissions(self, user, account, container, name,
979
                                  permissions):
980
        """Update the permissions associated with the object."""
981

    
982
        if user != account:
983
            raise NotAllowedError
984
        path = self._lookup_object(account, container, name,
985
                                   lock_container=True)[0]
986
        self._check_permissions(path, permissions)
987
        try:
988
            self.permissions.access_set(path, permissions)
989
        except:
990
            raise ValueError
991
        else:
992
            self._report_sharing_change(user, account, path, {'members':
993
                                        self.permissions.access_members(path)})
994

    
995
    @debug_method
996
    @backend_method
997
    def get_object_public(self, user, account, container, name):
998
        """Return the public id of the object if applicable."""
999

    
1000
        self._can_read(user, account, container, name)
1001
        path = self._lookup_object(account, container, name)[0]
1002
        p = self.permissions.public_get(path)
1003
        return p
1004

    
1005
    @debug_method
1006
    @backend_method
1007
    def update_object_public(self, user, account, container, name, public):
1008
        """Update the public status of the object."""
1009

    
1010
        self._can_write(user, account, container, name)
1011
        path = self._lookup_object(account, container, name,
1012
                                   lock_container=True)[0]
1013
        if not public:
1014
            self.permissions.public_unset(path)
1015
        else:
1016
            self.permissions.public_set(
1017
                path, self.public_url_security, self.public_url_alphabet)
1018

    
1019
    @debug_method
1020
    @backend_method
1021
    def get_object_hashmap(self, user, account, container, name, version=None):
1022
        """Return the object's size and a list with partial hashes."""
1023

    
1024
        self._can_read(user, account, container, name)
1025
        path, node = self._lookup_object(account, container, name)
1026
        props = self._get_version(node, version)
1027
        if props[self.HASH] is None:
1028
            return 0, ()
1029
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1030
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1031

    
1032
    def _update_object_hash(self, user, account, container, name, size, type,
1033
                            hash, checksum, domain, meta, replace_meta,
1034
                            permissions, src_node=None, src_version_id=None,
1035
                            is_copy=False, report_size_change=True):
1036
        if permissions is not None and user != account:
1037
            raise NotAllowedError
1038
        self._can_write(user, account, container, name)
1039
        if permissions is not None:
1040
            path = '/'.join((account, container, name))
1041
            self._check_permissions(path, permissions)
1042

    
1043
        account_path, account_node = self._lookup_account(account, True)
1044
        container_path, container_node = self._lookup_container(
1045
            account, container)
1046

    
1047
        path, node = self._put_object_node(
1048
            container_path, container_node, name)
1049
        pre_version_id, dest_version_id = self._put_version_duplicate(
1050
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1051
            checksum=checksum, is_copy=is_copy,
1052
            update_statistics_ancestors_depth=1)
1053

    
1054
        # Handle meta.
1055
        if src_version_id is None:
1056
            src_version_id = pre_version_id
1057
        self._put_metadata_duplicate(
1058
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1059

    
1060
        del_size = self._apply_versioning(account, container, pre_version_id,
1061
                                          update_statistics_ancestors_depth=1)
1062
        size_delta = size - del_size
1063
        if size_delta > 0:
1064
            # Check account quota.
1065
            if not self.using_external_quotaholder:
1066
                account_quota = long(self._get_policy(
1067
                    account_node, is_account_policy=True)['quota'])
1068
                account_usage = self._get_statistics(account_node,
1069
                                                     compute=True)[1]
1070
                if (account_quota > 0 and account_usage > account_quota):
1071
                    raise QuotaError(
1072
                        'Account quota exceeded: limit: %s, usage: %s' % (
1073
                            account_quota, account_usage))
1074

    
1075
            # Check container quota.
1076
            container_quota = long(self._get_policy(
1077
                container_node, is_account_policy=False)['quota'])
1078
            container_usage = self._get_statistics(container_node)[1]
1079
            if (container_quota > 0 and container_usage > container_quota):
1080
                # This must be executed in a transaction, so the version is
1081
                # never created if it fails.
1082
                raise QuotaError(
1083
                    'Container quota exceeded: limit: %s, usage: %s' % (
1084
                        container_quota, container_usage
1085
                    )
1086
                )
1087

    
1088
        if report_size_change:
1089
            self._report_size_change(
1090
                user, account, size_delta,
1091
                {'action': 'object update', 'path': path,
1092
                 'versions': ','.join([str(dest_version_id)])})
1093
        if permissions is not None:
1094
            self.permissions.access_set(path, permissions)
1095
            self._report_sharing_change(
1096
                user, account, path,
1097
                {'members': self.permissions.access_members(path)})
1098

    
1099
        self._report_object_change(
1100
            user, account, path,
1101
            details={'version': dest_version_id, 'action': 'object update'})
1102
        return dest_version_id
1103

    
1104
    @debug_method
1105
    def update_object_hashmap(self, user, account, container, name, size, type,
1106
                              hashmap, checksum, domain, meta=None,
1107
                              replace_meta=False, permissions=None):
1108
        """Create/update an object's hashmap and return the new version."""
1109

    
1110
        meta = meta or {}
1111
        if size == 0:  # No such thing as an empty hashmap.
1112
            hashmap = [self.put_block('')]
1113
        map = HashMap(self.block_size, self.hash_algorithm)
1114
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1115
        missing = self.store.block_search(map)
1116
        if missing:
1117
            ie = IndexError()
1118
            ie.data = [binascii.hexlify(x) for x in missing]
1119
            raise ie
1120

    
1121
        hash = map.hash()
1122
        hexlified = binascii.hexlify(hash)
1123
        # _update_object_hash() locks destination path
1124
        dest_version_id = self._update_object_hash(
1125
            user, account, container, name, size, type, hexlified, checksum,
1126
            domain, meta, replace_meta, permissions)
1127
        self.store.map_put(hash, map)
1128
        return dest_version_id, hexlified
1129

    
1130
    @debug_method
1131
    @backend_method
1132
    def update_object_checksum(self, user, account, container, name, version,
1133
                               checksum):
1134
        """Update an object's checksum."""
1135

    
1136
        # Update objects with greater version and same hashmap
1137
        # and size (fix metadata updates).
1138
        self._can_write(user, account, container, name)
1139
        path, node = self._lookup_object(account, container, name,
1140
                                         lock_container=True)
1141
        props = self._get_version(node, version)
1142
        versions = self.node.node_get_versions(node)
1143
        for x in versions:
1144
            if (x[self.SERIAL] >= int(version) and
1145
                x[self.HASH] == props[self.HASH] and
1146
                    x[self.SIZE] == props[self.SIZE]):
1147
                self.node.version_put_property(
1148
                    x[self.SERIAL], 'checksum', checksum)
1149

    
1150
    def _copy_object(self, user, src_account, src_container, src_name,
1151
                     dest_account, dest_container, dest_name, type,
1152
                     dest_domain=None, dest_meta=None, replace_meta=False,
1153
                     permissions=None, src_version=None, is_move=False,
1154
                     delimiter=None):
1155

    
1156
        report_size_change = not is_move
1157
        dest_meta = dest_meta or {}
1158
        dest_version_ids = []
1159
        self._can_read(user, src_account, src_container, src_name)
1160

    
1161
        src_container_path = '/'.join((src_account, src_container))
1162
        dest_container_path = '/'.join((dest_account, dest_container))
1163
        # Lock container paths in alphabetical order
1164
        if src_container_path < dest_container_path:
1165
            self._lookup_container(src_account, src_container)
1166
            self._lookup_container(dest_account, dest_container)
1167
        else:
1168
            self._lookup_container(dest_account, dest_container)
1169
            self._lookup_container(src_account, src_container)
1170

    
1171
        path, node = self._lookup_object(src_account, src_container, src_name)
1172
        # TODO: Will do another fetch of the properties in duplicate version...
1173
        props = self._get_version(
1174
            node, src_version)  # Check to see if source exists.
1175
        src_version_id = props[self.SERIAL]
1176
        hash = props[self.HASH]
1177
        size = props[self.SIZE]
1178
        is_copy = not is_move and (src_account, src_container, src_name) != (
1179
            dest_account, dest_container, dest_name)  # New uuid.
1180
        dest_version_ids.append(self._update_object_hash(
1181
            user, dest_account, dest_container, dest_name, size, type, hash,
1182
            None, dest_domain, dest_meta, replace_meta, permissions,
1183
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1184
            report_size_change=report_size_change))
1185
        if is_move and ((src_account, src_container, src_name) !=
1186
                        (dest_account, dest_container, dest_name)):
1187
            self._delete_object(user, src_account, src_container, src_name,
1188
                                report_size_change=report_size_change)
1189

    
1190
        if delimiter:
1191
            prefix = (src_name + delimiter if not
1192
                      src_name.endswith(delimiter) else src_name)
1193
            src_names = self._list_objects_no_limit(
1194
                user, src_account, src_container, prefix, delimiter=None,
1195
                virtual=False, domain=None, keys=[], shared=False, until=None,
1196
                size_range=None, all_props=True, public=False)
1197
            src_names.sort(key=lambda x: x[2])  # order by nodes
1198
            paths = [elem[0] for elem in src_names]
1199
            nodes = [elem[2] for elem in src_names]
1200
            # TODO: Will do another fetch of the properties
1201
            # in duplicate version...
1202
            props = self._get_versions(nodes)  # Check to see if source exists.
1203

    
1204
            for prop, path, node in zip(props, paths, nodes):
1205
                src_version_id = prop[self.SERIAL]
1206
                hash = prop[self.HASH]
1207
                vtype = prop[self.TYPE]
1208
                size = prop[self.SIZE]
1209
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1210
                    delimiter) else dest_name
1211
                vdest_name = path.replace(prefix, dest_prefix, 1)
1212
                # _update_object_hash() locks destination path
1213
                dest_version_ids.append(self._update_object_hash(
1214
                    user, dest_account, dest_container, vdest_name, size,
1215
                    vtype, hash, None, dest_domain, meta={},
1216
                    replace_meta=False, permissions=None, src_node=node,
1217
                    src_version_id=src_version_id, is_copy=is_copy,
1218
                    report_size_change=report_size_change))
1219
                if is_move and ((src_account, src_container, src_name) !=
1220
                                (dest_account, dest_container, dest_name)):
1221
                    self._delete_object(user, src_account, src_container, path,
1222
                                        report_size_change=report_size_change)
1223
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1224
                dest_version_ids)
1225

    
1226
    @debug_method
1227
    @backend_method
1228
    def copy_object(self, user, src_account, src_container, src_name,
1229
                    dest_account, dest_container, dest_name, type, domain,
1230
                    meta=None, replace_meta=False, permissions=None,
1231
                    src_version=None, delimiter=None):
1232
        """Copy an object's data and metadata."""
1233

    
1234
        meta = meta or {}
1235
        dest_version_id = self._copy_object(
1236
            user, src_account, src_container, src_name, dest_account,
1237
            dest_container, dest_name, type, domain, meta, replace_meta,
1238
            permissions, src_version, False, delimiter)
1239
        return dest_version_id
1240

    
1241
    @debug_method
1242
    @backend_method
1243
    def move_object(self, user, src_account, src_container, src_name,
1244
                    dest_account, dest_container, dest_name, type, domain,
1245
                    meta=None, replace_meta=False, permissions=None,
1246
                    delimiter=None):
1247
        """Move an object's data and metadata."""
1248

    
1249
        meta = meta or {}
1250
        if user != src_account:
1251
            raise NotAllowedError
1252
        dest_version_id = self._move_object(
1253
            user, src_account, src_container, src_name, dest_account,
1254
            dest_container, dest_name, type, domain, meta, replace_meta,
1255
            permissions, None, delimiter=delimiter)
1256
        return dest_version_id
1257

    
1258
    def _delete_object(self, user, account, container, name, until=None,
1259
                       delimiter=None, report_size_change=True):
1260
        if user != account:
1261
            raise NotAllowedError
1262

    
1263
        # lookup object and lock container path also
1264
        path, node = self._lookup_object(account, container, name,
1265
                                         lock_container=True)
1266

    
1267
        if until is not None:
1268
            if node is None:
1269
                return
1270
            hashes = []
1271
            size = 0
1272
            serials = []
1273
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1274
                                           update_statistics_ancestors_depth=1)
1275
            hashes += h
1276
            size += s
1277
            serials += v
1278
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1279
                                           update_statistics_ancestors_depth=1)
1280
            hashes += h
1281
            if not self.free_versioning:
1282
                size += s
1283
            serials += v
1284
            for h in hashes:
1285
                self.store.map_delete(h)
1286
            self.node.node_purge(node, until, CLUSTER_DELETED,
1287
                                 update_statistics_ancestors_depth=1)
1288
            try:
1289
                self._get_version(node)
1290
            except NameError:
1291
                self.permissions.access_clear(path)
1292
            self._report_size_change(
1293
                user, account, -size, {
1294
                    'action': 'object purge',
1295
                    'path': path,
1296
                    'versions': ','.join(str(i) for i in serials)
1297
                }
1298
            )
1299
            return
1300

    
1301
        if not self._exists(node):
1302
            raise ItemNotExists('Object is deleted.')
1303

    
1304
        src_version_id, dest_version_id = self._put_version_duplicate(
1305
            user, node, size=0, type='', hash=None, checksum='',
1306
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1307
        del_size = self._apply_versioning(account, container, src_version_id,
1308
                                          update_statistics_ancestors_depth=1)
1309
        if report_size_change:
1310
            self._report_size_change(
1311
                user, account, -del_size,
1312
                {'action': 'object delete',
1313
                 'path': path,
1314
                 'versions': ','.join([str(dest_version_id)])})
1315
        self._report_object_change(
1316
            user, account, path, details={'action': 'object delete'})
1317
        self.permissions.access_clear(path)
1318

    
1319
        if delimiter:
1320
            prefix = name + delimiter if not name.endswith(delimiter) else name
1321
            src_names = self._list_objects_no_limit(
1322
                user, account, container, prefix, delimiter=None,
1323
                virtual=False, domain=None, keys=[], shared=False, until=None,
1324
                size_range=None, all_props=True, public=False)
1325
            paths = []
1326
            for t in src_names:
1327
                path = '/'.join((account, container, t[0]))
1328
                node = t[2]
1329
                if not self._exists(node):
1330
                    continue
1331
                src_version_id, dest_version_id = self._put_version_duplicate(
1332
                    user, node, size=0, type='', hash=None, checksum='',
1333
                    cluster=CLUSTER_DELETED,
1334
                    update_statistics_ancestors_depth=1)
1335
                del_size = self._apply_versioning(
1336
                    account, container, src_version_id,
1337
                    update_statistics_ancestors_depth=1)
1338
                if report_size_change:
1339
                    self._report_size_change(
1340
                        user, account, -del_size,
1341
                        {'action': 'object delete',
1342
                         'path': path,
1343
                         'versions': ','.join([str(dest_version_id)])})
1344
                self._report_object_change(
1345
                    user, account, path, details={'action': 'object delete'})
1346
                paths.append(path)
1347
            self.permissions.access_clear_bulk(paths)
1348

    
1349
    @debug_method
1350
    @backend_method
1351
    def delete_object(self, user, account, container, name, until=None,
1352
                      prefix='', delimiter=None):
1353
        """Delete/purge an object."""
1354

    
1355
        self._delete_object(user, account, container, name, until, delimiter)
1356

    
1357
    @debug_method
1358
    @backend_method
1359
    def list_versions(self, user, account, container, name):
1360
        """Return a list of all object (version, version_timestamp) tuples."""
1361

    
1362
        self._can_read(user, account, container, name)
1363
        path, node = self._lookup_object(account, container, name)
1364
        versions = self.node.node_get_versions(node)
1365
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1366
                x[self.CLUSTER] != CLUSTER_DELETED]
1367

    
1368
    @debug_method
1369
    @backend_method
1370
    def get_uuid(self, user, uuid, check_permissions=True):
1371
        """Return the (account, container, name) for the UUID given."""
1372

    
1373
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1374
        if info is None:
1375
            raise NameError
1376
        path, serial = info
1377
        account, container, name = path.split('/', 2)
1378
        if check_permissions:
1379
            self._can_read(user, account, container, name)
1380
        return (account, container, name)
1381

    
1382
    @debug_method
1383
    @backend_method
1384
    def get_public(self, user, public):
1385
        """Return the (account, container, name) for the public id given."""
1386

    
1387
        path = self.permissions.public_path(public)
1388
        if path is None:
1389
            raise NameError
1390
        account, container, name = path.split('/', 2)
1391
        self._can_read(user, account, container, name)
1392
        return (account, container, name)
1393

    
1394
    def get_block(self, hash):
1395
        """Return a block's data."""
1396

    
1397
        logger.debug("get_block: %s", hash)
1398
        block = self.store.block_get(self._unhexlify_hash(hash))
1399
        if not block:
1400
            raise ItemNotExists('Block does not exist')
1401
        return block
1402

    
1403
    def put_block(self, data):
1404
        """Store a block and return the hash."""
1405

    
1406
        logger.debug("put_block: %s", len(data))
1407
        return binascii.hexlify(self.store.block_put(data))
1408

    
1409
    def update_block(self, hash, data, offset=0):
1410
        """Update a known block and return the hash."""
1411

    
1412
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1413
        if offset == 0 and len(data) == self.block_size:
1414
            return self.put_block(data)
1415
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1416
        return binascii.hexlify(h)
1417

    
1418
    # Path functions.
1419

    
1420
    def _generate_uuid(self):
1421
        return str(uuidlib.uuid4())
1422

    
1423
    def _put_object_node(self, path, parent, name):
1424
        path = '/'.join((path, name))
1425
        node = self.node.node_lookup(path)
1426
        if node is None:
1427
            node = self.node.node_create(parent, path)
1428
        return path, node
1429

    
1430
    def _put_path(self, user, parent, path,
1431
                  update_statistics_ancestors_depth=None):
1432
        node = self.node.node_create(parent, path)
1433
        self.node.version_create(node, None, 0, '', None, user,
1434
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1435
                                 update_statistics_ancestors_depth)
1436
        return node
1437

    
1438
    def _lookup_account(self, account, create=True):
1439
        node = self.node.node_lookup(account)
1440
        if node is None and create:
1441
            node = self._put_path(
1442
                account, self.ROOTNODE, account,
1443
                update_statistics_ancestors_depth=-1)  # User is account.
1444
        return account, node
1445

    
1446
    def _lookup_container(self, account, container):
1447
        for_update = True if self.lock_container_path else False
1448
        path = '/'.join((account, container))
1449
        node = self.node.node_lookup(path, for_update)
1450
        if node is None:
1451
            raise ItemNotExists('Container does not exist')
1452
        return path, node
1453

    
1454
    def _lookup_object(self, account, container, name, lock_container=False):
1455
        if lock_container:
1456
            self._lookup_container(account, container)
1457

    
1458
        path = '/'.join((account, container, name))
1459
        node = self.node.node_lookup(path)
1460
        if node is None:
1461
            raise ItemNotExists('Object does not exist')
1462
        return path, node
1463

    
1464
    def _lookup_objects(self, paths):
1465
        nodes = self.node.node_lookup_bulk(paths)
1466
        return paths, nodes
1467

    
1468
    def _get_properties(self, node, until=None):
1469
        """Return properties until the timestamp given."""
1470

    
1471
        before = until if until is not None else inf
1472
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1473
        if props is None and until is not None:
1474
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1475
        if props is None:
1476
            raise ItemNotExists('Path does not exist')
1477
        return props
1478

    
1479
    def _get_statistics(self, node, until=None, compute=False):
1480
        """Return (count, sum of size, timestamp) of everything under node."""
1481

    
1482
        if until is not None:
1483
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1484
        elif compute:
1485
            stats = self.node.statistics_latest(node,
1486
                                                except_cluster=CLUSTER_DELETED)
1487
        else:
1488
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1489
        if stats is None:
1490
            stats = (0, 0, 0)
1491
        return stats
1492

    
1493
    def _get_version(self, node, version=None):
1494
        if version is None:
1495
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1496
            if props is None:
1497
                raise ItemNotExists('Object does not exist')
1498
        else:
1499
            try:
1500
                version = int(version)
1501
            except ValueError:
1502
                raise VersionNotExists('Version does not exist')
1503
            props = self.node.version_get_properties(version, node=node)
1504
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1505
                raise VersionNotExists('Version does not exist')
1506
        return props
1507

    
1508
    def _get_versions(self, nodes):
1509
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1510

    
1511
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1512
                               type=None, hash=None, checksum=None,
1513
                               cluster=CLUSTER_NORMAL, is_copy=False,
1514
                               update_statistics_ancestors_depth=None):
1515
        """Create a new version of the node."""
1516

    
1517
        props = self.node.version_lookup(
1518
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1519
        if props is not None:
1520
            src_version_id = props[self.SERIAL]
1521
            src_hash = props[self.HASH]
1522
            src_size = props[self.SIZE]
1523
            src_type = props[self.TYPE]
1524
            src_checksum = props[self.CHECKSUM]
1525
        else:
1526
            src_version_id = None
1527
            src_hash = None
1528
            src_size = 0
1529
            src_type = ''
1530
            src_checksum = ''
1531
        if size is None:  # Set metadata.
1532
            hash = src_hash  # This way hash can be set to None
1533
                             # (account or container).
1534
            size = src_size
1535
        if type is None:
1536
            type = src_type
1537
        if checksum is None:
1538
            checksum = src_checksum
1539
        uuid = self._generate_uuid(
1540
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1541

    
1542
        if src_node is None:
1543
            pre_version_id = src_version_id
1544
        else:
1545
            pre_version_id = None
1546
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1547
            if props is not None:
1548
                pre_version_id = props[self.SERIAL]
1549
        if pre_version_id is not None:
1550
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1551
                                        update_statistics_ancestors_depth)
1552

    
1553
        dest_version_id, mtime = self.node.version_create(
1554
            node, hash, size, type, src_version_id, user, uuid, checksum,
1555
            cluster, update_statistics_ancestors_depth)
1556

    
1557
        self.node.attribute_unset_is_latest(node, dest_version_id)
1558

    
1559
        return pre_version_id, dest_version_id
1560

    
1561
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1562
                                node, meta, replace=False):
1563
        if src_version_id is not None:
1564
            self.node.attribute_copy(src_version_id, dest_version_id)
1565
        if not replace:
1566
            self.node.attribute_del(dest_version_id, domain, (
1567
                k for k, v in meta.iteritems() if v == ''))
1568
            self.node.attribute_set(dest_version_id, domain, node, (
1569
                (k, v) for k, v in meta.iteritems() if v != ''))
1570
        else:
1571
            self.node.attribute_del(dest_version_id, domain)
1572
            self.node.attribute_set(dest_version_id, domain, node, ((
1573
                k, v) for k, v in meta.iteritems()))
1574

    
1575
    def _put_metadata(self, user, node, domain, meta, replace=False,
1576
                      update_statistics_ancestors_depth=None):
1577
        """Create a new version and store metadata."""
1578

    
1579
        src_version_id, dest_version_id = self._put_version_duplicate(
1580
            user, node,
1581
            update_statistics_ancestors_depth=
1582
            update_statistics_ancestors_depth)
1583
        self._put_metadata_duplicate(
1584
            src_version_id, dest_version_id, domain, node, meta, replace)
1585
        return src_version_id, dest_version_id
1586

    
1587
    def _list_limits(self, listing, marker, limit):
1588
        start = 0
1589
        if marker:
1590
            try:
1591
                start = listing.index(marker) + 1
1592
            except ValueError:
1593
                pass
1594
        if not limit or limit > 10000:
1595
            limit = 10000
1596
        return start, limit
1597

    
1598
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1599
                                marker=None, limit=10000, virtual=True,
1600
                                domain=None, keys=None, until=None,
1601
                                size_range=None, allowed=None,
1602
                                all_props=False):
1603
        keys = keys or []
1604
        allowed = allowed or []
1605
        cont_prefix = path + '/'
1606
        prefix = cont_prefix + prefix
1607
        start = cont_prefix + marker if marker else None
1608
        before = until if until is not None else inf
1609
        filterq = keys if domain else []
1610
        sizeq = size_range
1611

    
1612
        objects, prefixes = self.node.latest_version_list(
1613
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1614
            allowed, domain, filterq, sizeq, all_props)
1615
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1616
        objects.sort(key=lambda x: x[0])
1617
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1618
        return objects
1619

    
1620
    # Reporting functions.
1621

    
1622
    @debug_method
1623
    @backend_method
1624
    def _report_size_change(self, user, account, size, details=None):
1625
        details = details or {}
1626

    
1627
        if size == 0:
1628
            return
1629

    
1630
        account_node = self._lookup_account(account, True)[1]
1631
        total = self._get_statistics(account_node, compute=True)[1]
1632
        details.update({'user': user, 'total': total})
1633
        self.messages.append(
1634
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1635
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1636

    
1637
        if not self.using_external_quotaholder:
1638
            return
1639

    
1640
        try:
1641
            name = details['path'] if 'path' in details else ''
1642
            serial = self.astakosclient.issue_one_commission(
1643
                holder=account,
1644
                source=DEFAULT_SOURCE,
1645
                provisions={'pithos.diskspace': size},
1646
                name=name)
1647
        except BaseException, e:
1648
            raise QuotaError(e)
1649
        else:
1650
            self.serials.append(serial)
1651

    
1652
    @debug_method
1653
    @backend_method
1654
    def _report_object_change(self, user, account, path, details=None):
1655
        details = details or {}
1656
        details.update({'user': user})
1657
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1658
                              account, QUEUE_INSTANCE_ID, 'object', path,
1659
                              details))
1660

    
1661
    @debug_method
1662
    @backend_method
1663
    def _report_sharing_change(self, user, account, path, details=None):
1664
        details = details or {}
1665
        details.update({'user': user})
1666
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1667
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1668
                              details))
1669

    
1670
    # Policy functions.
1671

    
1672
    def _check_policy(self, policy, is_account_policy=True):
1673
        default_policy = self.default_account_policy \
1674
            if is_account_policy else self.default_container_policy
1675
        for k in policy.keys():
1676
            if policy[k] == '':
1677
                policy[k] = default_policy.get(k)
1678
        for k, v in policy.iteritems():
1679
            if k == 'quota':
1680
                q = int(v)  # May raise ValueError.
1681
                if q < 0:
1682
                    raise ValueError
1683
            elif k == 'versioning':
1684
                if v not in ['auto', 'none']:
1685
                    raise ValueError
1686
            else:
1687
                raise ValueError
1688

    
1689
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1690
        default_policy = self.default_account_policy \
1691
            if is_account_policy else self.default_container_policy
1692
        if replace:
1693
            for k, v in default_policy.iteritems():
1694
                if k not in policy:
1695
                    policy[k] = v
1696
        self.node.policy_set(node, policy)
1697

    
1698
    def _get_policy(self, node, is_account_policy=True):
1699
        default_policy = self.default_account_policy \
1700
            if is_account_policy else self.default_container_policy
1701
        policy = default_policy.copy()
1702
        policy.update(self.node.policy_get(node))
1703
        return policy
1704

    
1705
    def _apply_versioning(self, account, container, version_id,
1706
                          update_statistics_ancestors_depth=None):
1707
        """Delete the provided version if such is the policy.
1708
           Return size of object removed.
1709
        """
1710

    
1711
        if version_id is None:
1712
            return 0
1713
        path, node = self._lookup_container(account, container)
1714
        versioning = self._get_policy(
1715
            node, is_account_policy=False)['versioning']
1716
        if versioning != 'auto':
1717
            hash, size = self.node.version_remove(
1718
                version_id, update_statistics_ancestors_depth)
1719
            self.store.map_delete(hash)
1720
            return size
1721
        elif self.free_versioning:
1722
            return self.node.version_get_properties(
1723
                version_id, keys=('size',))[0]
1724
        return 0
1725

    
1726
    # Access control functions.
1727

    
1728
    def _check_groups(self, groups):
1729
        # raise ValueError('Bad characters in groups')
1730
        pass
1731

    
1732
    def _check_permissions(self, path, permissions):
1733
        # raise ValueError('Bad characters in permissions')
1734
        pass
1735

    
1736
    def _get_formatted_paths(self, paths):
1737
        formatted = []
1738
        if len(paths) == 0:
1739
            return formatted
1740
        props = self.node.get_props(paths)
1741
        if props:
1742
            for prop in props:
1743
                if prop[1].split(';', 1)[0].strip() in (
1744
                        'application/directory', 'application/folder'):
1745
                    formatted.append((prop[0].rstrip('/') + '/',
1746
                                      self.MATCH_PREFIX))
1747
                formatted.append((prop[0], self.MATCH_EXACT))
1748
        return formatted
1749

    
1750
    def _get_permissions_path(self, account, container, name):
1751
        path = '/'.join((account, container, name))
1752
        permission_paths = self.permissions.access_inherit(path)
1753
        permission_paths.sort()
1754
        permission_paths.reverse()
1755
        for p in permission_paths:
1756
            if p == path:
1757
                return p
1758
            else:
1759
                if p.count('/') < 2:
1760
                    continue
1761
                node = self.node.node_lookup(p)
1762
                props = None
1763
                if node is not None:
1764
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1765
                if props is not None:
1766
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1767
                            'application/directory', 'application/folder'):
1768
                        return p
1769
        return None
1770

    
1771
    def _get_permissions_path_bulk(self, account, container, names):
1772
        formatted_paths = []
1773
        for name in names:
1774
            path = '/'.join((account, container, name))
1775
            formatted_paths.append(path)
1776
        permission_paths = self.permissions.access_inherit_bulk(
1777
            formatted_paths)
1778
        permission_paths.sort()
1779
        permission_paths.reverse()
1780
        permission_paths_list = []
1781
        lookup_list = []
1782
        for p in permission_paths:
1783
            if p in formatted_paths:
1784
                permission_paths_list.append(p)
1785
            else:
1786
                if p.count('/') < 2:
1787
                    continue
1788
                lookup_list.append(p)
1789

    
1790
        if len(lookup_list) > 0:
1791
            props = self.node.get_props(lookup_list)
1792
            if props:
1793
                for prop in props:
1794
                    if prop[1].split(';', 1)[0].strip() in (
1795
                            'application/directory', 'application/folder'):
1796
                        permission_paths_list.append(prop[0])
1797

    
1798
        if len(permission_paths_list) > 0:
1799
            return permission_paths_list
1800

    
1801
        return None
1802

    
1803
    def _can_read(self, user, account, container, name):
1804
        if user == account:
1805
            return True
1806
        path = '/'.join((account, container, name))
1807
        if self.permissions.public_get(path) is not None:
1808
            return True
1809
        path = self._get_permissions_path(account, container, name)
1810
        if not path:
1811
            raise NotAllowedError
1812
        if (not self.permissions.access_check(path, self.READ, user) and not
1813
                self.permissions.access_check(path, self.WRITE, user)):
1814
            raise NotAllowedError
1815

    
1816
    def _can_write(self, user, account, container, name):
1817
        if user == account:
1818
            return True
1819
        path = '/'.join((account, container, name))
1820
        path = self._get_permissions_path(account, container, name)
1821
        if not path:
1822
            raise NotAllowedError
1823
        if not self.permissions.access_check(path, self.WRITE, user):
1824
            raise NotAllowedError
1825

    
1826
    def _allowed_accounts(self, user):
1827
        allow = set()
1828
        for path in self.permissions.access_list_paths(user):
1829
            allow.add(path.split('/', 1)[0])
1830
        return sorted(allow)
1831

    
1832
    def _allowed_containers(self, user, account):
1833
        allow = set()
1834
        for path in self.permissions.access_list_paths(user, account):
1835
            allow.add(path.split('/', 2)[1])
1836
        return sorted(allow)
1837

    
1838
    # Domain functions
1839

    
1840
    @debug_method
1841
    @backend_method
1842
    def get_domain_objects(self, domain, user=None):
1843
        allowed_paths = self.permissions.access_list_paths(
1844
            user, include_owned=user is not None, include_containers=False)
1845
        if not allowed_paths:
1846
            return []
1847
        obj_list = self.node.domain_object_list(
1848
            domain, allowed_paths, CLUSTER_NORMAL)
1849
        return [(path,
1850
                 self._build_metadata(props, user_defined_meta),
1851
                 self.permissions.access_get(path)) for
1852
                path, props, user_defined_meta in obj_list]
1853

    
1854
    # util functions
1855

    
1856
    def _build_metadata(self, props, user_defined=None,
1857
                        include_user_defined=True):
1858
        meta = {'bytes': props[self.SIZE],
1859
                'type': props[self.TYPE],
1860
                'hash': props[self.HASH],
1861
                'version': props[self.SERIAL],
1862
                'version_timestamp': props[self.MTIME],
1863
                'modified_by': props[self.MUSER],
1864
                'uuid': props[self.UUID],
1865
                'checksum': props[self.CHECKSUM]}
1866
        if include_user_defined and user_defined is not None:
1867
            meta.update(user_defined)
1868
        return meta
1869

    
1870
    def _exists(self, node):
1871
        try:
1872
            self._get_version(node)
1873
        except ItemNotExists:
1874
            return False
1875
        else:
1876
            return True
1877

    
1878
    def _unhexlify_hash(self, hash):
1879
        try:
1880
            return binascii.unhexlify(hash)
1881
        except TypeError:
1882
            raise InvalidHash(hash)