Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ ec6f741b

History | View | Annotate | Download (74.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
124

    
125
logger = logging.getLogger(__name__)
126

    
127

    
128
def backend_method(func):
129
    @wraps(func)
130
    def wrapper(self, *args, **kw):
131
        # if we are inside a database transaction
132
        # just proceed with the method execution
133
        # otherwise manage a new transaction
134
        if self.in_transaction:
135
            return func(self, *args, **kw)
136

    
137
        try:
138
            self.pre_exec()
139
            result = func(self, *args, **kw)
140
            success_status = True
141
            return result
142
        except:
143
            success_status = False
144
            raise
145
        finally:
146
            self.post_exec(success_status)
147
    return wrapper
148

    
149

    
150
def debug_method(func):
151
    @wraps(func)
152
    def wrapper(self, *args, **kw):
153
        try:
154
            result = func(self, *args, **kw)
155
            return result
156
        except:
157
            result = format_exc()
158
            raise
159
        finally:
160
            all_args = map(repr, args)
161
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
162
            logger.debug(">>> %s(%s) <<< %s" % (
163
                func.__name__, ', '.join(all_args).rstrip(', '), result))
164
    return wrapper
165

    
166

    
167
class ModularBackend(BaseBackend):
168
    """A modular backend.
169

170
    Uses modules for SQL functions and storage.
171
    """
172

    
173
    def __init__(self, db_module=None, db_connection=None,
174
                 block_module=None, block_path=None, block_umask=None,
175
                 block_size=None, hash_algorithm=None,
176
                 queue_module=None, queue_hosts=None, queue_exchange=None,
177
                 astakos_auth_url=None, service_token=None,
178
                 astakosclient_poolsize=None,
179
                 free_versioning=True, block_params=None,
180
                 public_url_security=None,
181
                 public_url_alphabet=None,
182
                 account_quota_policy=None,
183
                 container_quota_policy=None,
184
                 container_versioning_policy=None):
185
        db_module = db_module or DEFAULT_DB_MODULE
186
        db_connection = db_connection or DEFAULT_DB_CONNECTION
187
        block_module = block_module or DEFAULT_BLOCK_MODULE
188
        block_path = block_path or DEFAULT_BLOCK_PATH
189
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
190
        block_params = block_params or DEFAULT_BLOCK_PARAMS
191
        block_size = block_size or DEFAULT_BLOCK_SIZE
192
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
193
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
194
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
195
        container_quota_policy = container_quota_policy \
196
            or DEFAULT_CONTAINER_QUOTA
197
        container_versioning_policy = container_versioning_policy \
198
            or DEFAULT_CONTAINER_VERSIONING
199

    
200
        self.default_account_policy = {'quota': account_quota_policy}
201
        self.default_container_policy = {
202
            'quota': container_quota_policy,
203
            'versioning': container_versioning_policy
204
        }
205
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
206
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
207

    
208
        self.public_url_security = (public_url_security or
209
                                    DEFAULT_PUBLIC_URL_SECURITY)
210
        self.public_url_alphabet = (public_url_alphabet or
211
                                    DEFAULT_PUBLIC_URL_ALPHABET)
212

    
213
        self.hash_algorithm = hash_algorithm
214
        self.block_size = block_size
215
        self.free_versioning = free_versioning
216

    
217
        def load_module(m):
218
            __import__(m)
219
            return sys.modules[m]
220

    
221
        self.db_module = load_module(db_module)
222
        self.wrapper = self.db_module.DBWrapper(db_connection)
223
        params = {'wrapper': self.wrapper}
224
        self.permissions = self.db_module.Permissions(**params)
225
        self.config = self.db_module.Config(**params)
226
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
227
        for x in ['READ', 'WRITE']:
228
            setattr(self, x, getattr(self.db_module, x))
229
        self.node = self.db_module.Node(**params)
230
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
231
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
232
                  'MATCH_PREFIX', 'MATCH_EXACT']:
233
            setattr(self, x, getattr(self.db_module, x))
234

    
235
        self.ALLOWED = ['read', 'write']
236

    
237
        self.block_module = load_module(block_module)
238
        self.block_params = block_params
239
        params = {'path': block_path,
240
                  'block_size': self.block_size,
241
                  'hash_algorithm': self.hash_algorithm,
242
                  'umask': block_umask}
243
        params.update(self.block_params)
244
        self.store = self.block_module.Store(**params)
245

    
246
        if queue_module and queue_hosts:
247
            self.queue_module = load_module(queue_module)
248
            params = {'hosts': queue_hosts,
249
                      'exchange': queue_exchange,
250
                      'client_id': QUEUE_CLIENT_ID}
251
            self.queue = self.queue_module.Queue(**params)
252
        else:
253
            class NoQueue:
254
                def send(self, *args):
255
                    pass
256

    
257
                def close(self):
258
                    pass
259

    
260
            self.queue = NoQueue()
261

    
262
        self.astakos_auth_url = astakos_auth_url
263
        self.service_token = service_token
264

    
265
        if not astakos_auth_url or not AstakosClient:
266
            self.astakosclient = DisabledAstakosClient(
267
                service_token, astakos_auth_url,
268
                use_pool=True,
269
                pool_size=astakosclient_poolsize)
270
        else:
271
            self.astakosclient = AstakosClient(
272
                service_token, astakos_auth_url,
273
                use_pool=True,
274
                pool_size=astakosclient_poolsize)
275

    
276
        self.serials = []
277
        self.messages = []
278

    
279
        self._move_object = partial(self._copy_object, is_move=True)
280

    
281
        self.lock_container_path = False
282

    
283
        self.in_transaction = False
284

    
285
    def pre_exec(self, lock_container_path=False):
286
        self.lock_container_path = lock_container_path
287
        self.wrapper.execute()
288
        self.serials = []
289
        self.in_transaction = True
290

    
291
    def post_exec(self, success_status=True):
292
        if success_status:
293
            # send messages produced
294
            for m in self.messages:
295
                self.queue.send(*m)
296

    
297
            # register serials
298
            if self.serials:
299
                self.commission_serials.insert_many(
300
                    self.serials)
301

    
302
                # commit to ensure that the serials are registered
303
                # even if resolve commission fails
304
                self.wrapper.commit()
305

    
306
                # start new transaction
307
                self.wrapper.execute()
308

    
309
                r = self.astakosclient.resolve_commissions(
310
                    accept_serials=self.serials,
311
                    reject_serials=[])
312
                self.commission_serials.delete_many(
313
                    r['accepted'])
314

    
315
            self.wrapper.commit()
316
        else:
317
            if self.serials:
318
                r = self.astakosclient.resolve_commissions(
319
                    accept_serials=[],
320
                    reject_serials=self.serials)
321
                self.commission_serials.delete_many(
322
                    r['rejected'])
323
            self.wrapper.rollback()
324
        self.in_transaction = False
325

    
326
    def close(self):
327
        self.wrapper.close()
328
        self.queue.close()
329

    
330
    @property
331
    def using_external_quotaholder(self):
332
        return not isinstance(self.astakosclient, DisabledAstakosClient)
333

    
334
    @debug_method
335
    @backend_method
336
    def list_accounts(self, user, marker=None, limit=10000):
337
        """Return a list of accounts the user can access."""
338

    
339
        allowed = self._allowed_accounts(user)
340
        start, limit = self._list_limits(allowed, marker, limit)
341
        return allowed[start:start + limit]
342

    
343
    def _get_account_quotas(self, account):
344
        """Get account usage from astakos."""
345

    
346
        quotas = self.astakosclient.service_get_quotas(account)[account]
347
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
348
                                                  {})
349

    
350
    @debug_method
351
    @backend_method
352
    def get_account_meta(self, user, account, domain, until=None,
353
                         include_user_defined=True):
354
        """Return a dictionary with the account metadata for the domain."""
355

    
356
        path, node = self._lookup_account(account, user == account)
357
        if user != account:
358
            if until or (node is None) or (account not
359
                                           in self._allowed_accounts(user)):
360
                raise NotAllowedError
361
        try:
362
            props = self._get_properties(node, until)
363
            mtime = props[self.MTIME]
364
        except NameError:
365
            props = None
366
            mtime = until
367
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
368
        tstamp = max(tstamp, mtime)
369
        if until is None:
370
            modified = tstamp
371
        else:
372
            modified = self._get_statistics(
373
                node, compute=True)[2]  # Overall last modification.
374
            modified = max(modified, mtime)
375

    
376
        if user != account:
377
            meta = {'name': account}
378
        else:
379
            meta = {}
380
            if props is not None and include_user_defined:
381
                meta.update(
382
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
383
            if until is not None:
384
                meta.update({'until_timestamp': tstamp})
385
            meta.update({'name': account, 'count': count, 'bytes': bytes})
386
            if self.using_external_quotaholder:
387
                external_quota = self._get_account_quotas(account)
388
                meta['bytes'] = external_quota.get('usage', 0)
389
        meta.update({'modified': modified})
390
        return meta
391

    
392
    @debug_method
393
    @backend_method
394
    def update_account_meta(self, user, account, domain, meta, replace=False):
395
        """Update the metadata associated with the account for the domain."""
396

    
397
        if user != account:
398
            raise NotAllowedError
399
        path, node = self._lookup_account(account, True)
400
        self._put_metadata(user, node, domain, meta, replace,
401
                           update_statistics_ancestors_depth=-1)
402

    
403
    @debug_method
404
    @backend_method
405
    def get_account_groups(self, user, account):
406
        """Return a dictionary with the user groups defined for the account."""
407

    
408
        if user != account:
409
            if account not in self._allowed_accounts(user):
410
                raise NotAllowedError
411
            return {}
412
        self._lookup_account(account, True)
413
        return self.permissions.group_dict(account)
414

    
415
    @debug_method
416
    @backend_method
417
    def update_account_groups(self, user, account, groups, replace=False):
418
        """Update the groups associated with the account."""
419

    
420
        if user != account:
421
            raise NotAllowedError
422
        self._lookup_account(account, True)
423
        self._check_groups(groups)
424
        if replace:
425
            self.permissions.group_destroy(account)
426
        for k, v in groups.iteritems():
427
            if not replace:  # If not already deleted.
428
                self.permissions.group_delete(account, k)
429
            if v:
430
                self.permissions.group_addmany(account, k, v)
431

    
432
    @debug_method
433
    @backend_method
434
    def get_account_policy(self, user, account):
435
        """Return a dictionary with the account policy."""
436

    
437
        if user != account:
438
            if account not in self._allowed_accounts(user):
439
                raise NotAllowedError
440
            return {}
441
        path, node = self._lookup_account(account, True)
442
        policy = self._get_policy(node, is_account_policy=True)
443
        if self.using_external_quotaholder:
444
            external_quota = self._get_account_quotas(account)
445
            policy['quota'] = external_quota.get('limit', 0)
446
        return policy
447

    
448
    @debug_method
449
    @backend_method
450
    def update_account_policy(self, user, account, policy, replace=False):
451
        """Update the policy associated with the account."""
452

    
453
        if user != account:
454
            raise NotAllowedError
455
        path, node = self._lookup_account(account, True)
456
        self._check_policy(policy, is_account_policy=True)
457
        self._put_policy(node, policy, replace, is_account_policy=True)
458

    
459
    @debug_method
460
    @backend_method
461
    def put_account(self, user, account, policy=None):
462
        """Create a new account with the given name."""
463

    
464
        policy = policy or {}
465
        if user != account:
466
            raise NotAllowedError
467
        node = self.node.node_lookup(account)
468
        if node is not None:
469
            raise AccountExists('Account already exists')
470
        if policy:
471
            self._check_policy(policy, is_account_policy=True)
472
        node = self._put_path(user, self.ROOTNODE, account,
473
                              update_statistics_ancestors_depth=-1)
474
        self._put_policy(node, policy, True, is_account_policy=True)
475

    
476
    @debug_method
477
    @backend_method
478
    def delete_account(self, user, account):
479
        """Delete the account with the given name."""
480

    
481
        if user != account:
482
            raise NotAllowedError
483
        node = self.node.node_lookup(account)
484
        if node is None:
485
            return
486
        if not self.node.node_remove(node,
487
                                     update_statistics_ancestors_depth=-1):
488
            raise AccountNotEmpty('Account is not empty')
489
        self.permissions.group_destroy(account)
490

    
491
    @debug_method
492
    @backend_method
493
    def list_containers(self, user, account, marker=None, limit=10000,
494
                        shared=False, until=None, public=False):
495
        """Return a list of containers existing under an account."""
496

    
497
        if user != account:
498
            if until or account not in self._allowed_accounts(user):
499
                raise NotAllowedError
500
            allowed = self._allowed_containers(user, account)
501
            start, limit = self._list_limits(allowed, marker, limit)
502
            return allowed[start:start + limit]
503
        if shared or public:
504
            allowed = set()
505
            if shared:
506
                allowed.update([x.split('/', 2)[1] for x in
507
                               self.permissions.access_list_shared(account)])
508
            if public:
509
                allowed.update([x[0].split('/', 2)[1] for x in
510
                               self.permissions.public_list(account)])
511
            allowed = sorted(allowed)
512
            start, limit = self._list_limits(allowed, marker, limit)
513
            return allowed[start:start + limit]
514
        node = self.node.node_lookup(account)
515
        containers = [x[0] for x in self._list_object_properties(
516
            node, account, '', '/', marker, limit, False, None, [], until)]
517
        start, limit = self._list_limits(
518
            [x[0] for x in containers], marker, limit)
519
        return containers[start:start + limit]
520

    
521
    @debug_method
522
    @backend_method
523
    def list_container_meta(self, user, account, container, domain,
524
                            until=None):
525
        """Return a list of the container's object meta keys for a domain."""
526

    
527
        allowed = []
528
        if user != account:
529
            if until:
530
                raise NotAllowedError
531
            allowed = self.permissions.access_list_paths(
532
                user, '/'.join((account, container)))
533
            if not allowed:
534
                raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536
        before = until if until is not None else inf
537
        allowed = self._get_formatted_paths(allowed)
538
        return self.node.latest_attribute_keys(node, domain, before,
539
                                               CLUSTER_DELETED, allowed)
540

    
541
    @debug_method
542
    @backend_method
543
    def get_container_meta(self, user, account, container, domain, until=None,
544
                           include_user_defined=True):
545
        """Return a dictionary with the container metadata for the domain."""
546

    
547
        if user != account:
548
            if until or container not in self._allowed_containers(user,
549
                                                                  account):
550
                raise NotAllowedError
551
        path, node = self._lookup_container(account, container)
552
        props = self._get_properties(node, until)
553
        mtime = props[self.MTIME]
554
        count, bytes, tstamp = self._get_statistics(node, until)
555
        tstamp = max(tstamp, mtime)
556
        if until is None:
557
            modified = tstamp
558
        else:
559
            modified = self._get_statistics(
560
                node)[2]  # Overall last modification.
561
            modified = max(modified, mtime)
562

    
563
        if user != account:
564
            meta = {'name': container}
565
        else:
566
            meta = {}
567
            if include_user_defined:
568
                meta.update(
569
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
570
            if until is not None:
571
                meta.update({'until_timestamp': tstamp})
572
            meta.update({'name': container, 'count': count, 'bytes': bytes})
573
        meta.update({'modified': modified})
574
        return meta
575

    
576
    @debug_method
577
    @backend_method
578
    def update_container_meta(self, user, account, container, domain, meta,
579
                              replace=False):
580
        """Update the metadata associated with the container for the domain."""
581

    
582
        if user != account:
583
            raise NotAllowedError
584
        path, node = self._lookup_container(account, container)
585
        src_version_id, dest_version_id = self._put_metadata(
586
            user, node, domain, meta, replace,
587
            update_statistics_ancestors_depth=0)
588
        if src_version_id is not None:
589
            versioning = self._get_policy(
590
                node, is_account_policy=False)['versioning']
591
            if versioning != 'auto':
592
                self.node.version_remove(src_version_id,
593
                                         update_statistics_ancestors_depth=0)
594

    
595
    @debug_method
596
    @backend_method
597
    def get_container_policy(self, user, account, container):
598
        """Return a dictionary with the container policy."""
599

    
600
        if user != account:
601
            if container not in self._allowed_containers(user, account):
602
                raise NotAllowedError
603
            return {}
604
        path, node = self._lookup_container(account, container)
605
        return self._get_policy(node, is_account_policy=False)
606

    
607
    @debug_method
608
    @backend_method
609
    def update_container_policy(self, user, account, container, policy,
610
                                replace=False):
611
        """Update the policy associated with the container."""
612

    
613
        if user != account:
614
            raise NotAllowedError
615
        path, node = self._lookup_container(account, container)
616
        self._check_policy(policy, is_account_policy=False)
617
        self._put_policy(node, policy, replace, is_account_policy=False)
618

    
619
    @debug_method
620
    @backend_method
621
    def put_container(self, user, account, container, policy=None):
622
        """Create a new container with the given name."""
623

    
624
        policy = policy or {}
625
        if user != account:
626
            raise NotAllowedError
627
        try:
628
            path, node = self._lookup_container(account, container)
629
        except NameError:
630
            pass
631
        else:
632
            raise ContainerExists('Container already exists')
633
        if policy:
634
            self._check_policy(policy, is_account_policy=False)
635
        path = '/'.join((account, container))
636
        node = self._put_path(
637
            user, self._lookup_account(account, True)[1], path,
638
            update_statistics_ancestors_depth=-1)
639
        self._put_policy(node, policy, True, is_account_policy=False)
640

    
641
    @debug_method
642
    @backend_method
643
    def delete_container(self, user, account, container, until=None, prefix='',
644
                         delimiter=None):
645
        """Delete/purge the container with the given name."""
646

    
647
        if user != account:
648
            raise NotAllowedError
649
        path, node = self._lookup_container(account, container)
650

    
651
        if until is not None:
652
            hashes, size, serials = self.node.node_purge_children(
653
                node, until, CLUSTER_HISTORY,
654
                update_statistics_ancestors_depth=0)
655
            for h in hashes:
656
                self.store.map_delete(h)
657
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
658
                                          update_statistics_ancestors_depth=0)
659
            if not self.free_versioning:
660
                self._report_size_change(
661
                    user, account, -size, {
662
                        'action': 'container purge',
663
                        'path': path,
664
                        'versions': ','.join(str(i) for i in serials)
665
                    }
666
                )
667
            return
668

    
669
        if not delimiter:
670
            if self._get_statistics(node)[0] > 0:
671
                raise ContainerNotEmpty('Container is not empty')
672
            hashes, size, serials = self.node.node_purge_children(
673
                node, inf, CLUSTER_HISTORY,
674
                update_statistics_ancestors_depth=0)
675
            for h in hashes:
676
                self.store.map_delete(h)
677
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
678
                                          update_statistics_ancestors_depth=0)
679
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
680
            if not self.free_versioning:
681
                self._report_size_change(
682
                    user, account, -size, {
683
                        'action': 'container purge',
684
                        'path': path,
685
                        'versions': ','.join(str(i) for i in serials)
686
                    }
687
                )
688
        else:
689
            # remove only contents
690
            src_names = self._list_objects_no_limit(
691
                user, account, container, prefix='', delimiter=None,
692
                virtual=False, domain=None, keys=[], shared=False, until=None,
693
                size_range=None, all_props=True, public=False)
694
            paths = []
695
            for t in src_names:
696
                path = '/'.join((account, container, t[0]))
697
                node = t[2]
698
                if not self._exists(node):
699
                    continue
700
                src_version_id, dest_version_id = self._put_version_duplicate(
701
                    user, node, size=0, type='', hash=None, checksum='',
702
                    cluster=CLUSTER_DELETED,
703
                    update_statistics_ancestors_depth=1)
704
                del_size = self._apply_versioning(
705
                    account, container, src_version_id,
706
                    update_statistics_ancestors_depth=1)
707
                self._report_size_change(
708
                    user, account, -del_size, {
709
                        'action': 'object delete',
710
                        'path': path,
711
                        'versions': ','.join([str(dest_version_id)])})
712
                self._report_object_change(
713
                    user, account, path, details={'action': 'object delete'})
714
                paths.append(path)
715
            self.permissions.access_clear_bulk(paths)
716

    
717
    def _list_objects(self, user, account, container, prefix, delimiter,
718
                      marker, limit, virtual, domain, keys, shared, until,
719
                      size_range, all_props, public):
720
        if user != account and until:
721
            raise NotAllowedError
722
        if shared and public:
723
            # get shared first
724
            shared_paths = self._list_object_permissions(
725
                user, account, container, prefix, shared=True, public=False)
726
            objects = set()
727
            if shared_paths:
728
                path, node = self._lookup_container(account, container)
729
                shared_paths = self._get_formatted_paths(shared_paths)
730
                objects |= set(self._list_object_properties(
731
                    node, path, prefix, delimiter, marker, limit, virtual,
732
                    domain, keys, until, size_range, shared_paths, all_props))
733

    
734
            # get public
735
            objects |= set(self._list_public_object_properties(
736
                user, account, container, prefix, all_props))
737
            objects = list(objects)
738

    
739
            objects.sort(key=lambda x: x[0])
740
            start, limit = self._list_limits(
741
                [x[0] for x in objects], marker, limit)
742
            return objects[start:start + limit]
743
        elif public:
744
            objects = self._list_public_object_properties(
745
                user, account, container, prefix, all_props)
746
            start, limit = self._list_limits(
747
                [x[0] for x in objects], marker, limit)
748
            return objects[start:start + limit]
749

    
750
        allowed = self._list_object_permissions(
751
            user, account, container, prefix, shared, public)
752
        if shared and not allowed:
753
            return []
754
        path, node = self._lookup_container(account, container)
755
        allowed = self._get_formatted_paths(allowed)
756
        objects = self._list_object_properties(
757
            node, path, prefix, delimiter, marker, limit, virtual, domain,
758
            keys, until, size_range, allowed, all_props)
759
        start, limit = self._list_limits(
760
            [x[0] for x in objects], marker, limit)
761
        return objects[start:start + limit]
762

    
763
    def _list_public_object_properties(self, user, account, container, prefix,
764
                                       all_props):
765
        public = self._list_object_permissions(
766
            user, account, container, prefix, shared=False, public=True)
767
        paths, nodes = self._lookup_objects(public)
768
        path = '/'.join((account, container))
769
        cont_prefix = path + '/'
770
        paths = [x[len(cont_prefix):] for x in paths]
771
        objects = [(p,) + props for p, props in
772
                   zip(paths, self.node.version_lookup_bulk(
773
                       nodes, all_props=all_props, order_by_path=True))]
774
        return objects
775

    
776
    def _list_objects_no_limit(self, user, account, container, prefix,
777
                               delimiter, virtual, domain, keys, shared, until,
778
                               size_range, all_props, public):
779
        objects = []
780
        while True:
781
            marker = objects[-1] if objects else None
782
            limit = 10000
783
            l = self._list_objects(
784
                user, account, container, prefix, delimiter, marker, limit,
785
                virtual, domain, keys, shared, until, size_range, all_props,
786
                public)
787
            objects.extend(l)
788
            if not l or len(l) < limit:
789
                break
790
        return objects
791

    
792
    def _list_object_permissions(self, user, account, container, prefix,
793
                                 shared, public):
794
        allowed = []
795
        path = '/'.join((account, container, prefix)).rstrip('/')
796
        if user != account:
797
            allowed = self.permissions.access_list_paths(user, path)
798
            if not allowed:
799
                raise NotAllowedError
800
        else:
801
            allowed = set()
802
            if shared:
803
                allowed.update(self.permissions.access_list_shared(path))
804
            if public:
805
                allowed.update(
806
                    [x[0] for x in self.permissions.public_list(path)])
807
            allowed = sorted(allowed)
808
            if not allowed:
809
                return []
810
        return allowed
811

    
812
    @debug_method
813
    @backend_method
814
    def list_objects(self, user, account, container, prefix='', delimiter=None,
815
                     marker=None, limit=10000, virtual=True, domain=None,
816
                     keys=None, shared=False, until=None, size_range=None,
817
                     public=False):
818
        """List (object name, object version_id) under a container."""
819

    
820
        keys = keys or []
821
        return self._list_objects(
822
            user, account, container, prefix, delimiter, marker, limit,
823
            virtual, domain, keys, shared, until, size_range, False, public)
824

    
825
    @debug_method
826
    @backend_method
827
    def list_object_meta(self, user, account, container, prefix='',
828
                         delimiter=None, marker=None, limit=10000,
829
                         virtual=True, domain=None, keys=None, shared=False,
830
                         until=None, size_range=None, public=False):
831
        """Return a list of metadata dicts of objects under a container."""
832

    
833
        keys = keys or []
834
        props = self._list_objects(
835
            user, account, container, prefix, delimiter, marker, limit,
836
            virtual, domain, keys, shared, until, size_range, True, public)
837
        objects = []
838
        for p in props:
839
            if len(p) == 2:
840
                objects.append({'subdir': p[0]})
841
            else:
842
                objects.append({
843
                    'name': p[0],
844
                    'bytes': p[self.SIZE + 1],
845
                    'type': p[self.TYPE + 1],
846
                    'hash': p[self.HASH + 1],
847
                    'version': p[self.SERIAL + 1],
848
                    'version_timestamp': p[self.MTIME + 1],
849
                    'modified': p[self.MTIME + 1] if until is None else None,
850
                    'modified_by': p[self.MUSER + 1],
851
                    'uuid': p[self.UUID + 1],
852
                    'checksum': p[self.CHECKSUM + 1]})
853
        return objects
854

    
855
    @debug_method
856
    @backend_method
857
    def list_object_permissions(self, user, account, container, prefix=''):
858
        """Return a list of paths enforce permissions under a container."""
859

    
860
        return self._list_object_permissions(user, account, container, prefix,
861
                                             True, False)
862

    
863
    @debug_method
864
    @backend_method
865
    def list_object_public(self, user, account, container, prefix=''):
866
        """Return a mapping of object paths to public ids under a container."""
867

    
868
        public = {}
869
        for path, p in self.permissions.public_list('/'.join((account,
870
                                                              container,
871
                                                              prefix))):
872
            public[path] = p
873
        return public
874

    
875
    @debug_method
876
    @backend_method
877
    def get_object_meta(self, user, account, container, name, domain,
878
                        version=None, include_user_defined=True):
879
        """Return a dictionary with the object metadata for the domain."""
880

    
881
        self._can_read(user, account, container, name)
882
        path, node = self._lookup_object(account, container, name)
883
        props = self._get_version(node, version)
884
        if version is None:
885
            modified = props[self.MTIME]
886
        else:
887
            try:
888
                modified = self._get_version(
889
                    node)[self.MTIME]  # Overall last modification.
890
            except NameError:  # Object may be deleted.
891
                del_props = self.node.version_lookup(
892
                    node, inf, CLUSTER_DELETED)
893
                if del_props is None:
894
                    raise ItemNotExists('Object does not exist')
895
                modified = del_props[self.MTIME]
896

    
897
        meta = {}
898
        if include_user_defined:
899
            meta.update(
900
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
901
        meta.update({'name': name,
902
                     'bytes': props[self.SIZE],
903
                     'type': props[self.TYPE],
904
                     'hash': props[self.HASH],
905
                     'version': props[self.SERIAL],
906
                     'version_timestamp': props[self.MTIME],
907
                     'modified': modified,
908
                     'modified_by': props[self.MUSER],
909
                     'uuid': props[self.UUID],
910
                     'checksum': props[self.CHECKSUM]})
911
        return meta
912

    
913
    @debug_method
914
    @backend_method
915
    def update_object_meta(self, user, account, container, name, domain, meta,
916
                           replace=False):
917
        """Update object metadata for a domain and return the new version."""
918

    
919
        self._can_write(user, account, container, name)
920

    
921
        path, node = self._lookup_object(account, container, name,
922
                                         lock_container=True)
923
        src_version_id, dest_version_id = self._put_metadata(
924
            user, node, domain, meta, replace,
925
            update_statistics_ancestors_depth=1)
926
        self._apply_versioning(account, container, src_version_id,
927
                               update_statistics_ancestors_depth=1)
928
        return dest_version_id
929

    
930
    @debug_method
931
    @backend_method
932
    def get_object_permissions_bulk(self, user, account, container, names):
933
        """Return the action allowed on the object, the path
934
        from which the object gets its permissions from,
935
        along with a dictionary containing the permissions."""
936

    
937
        permissions_path = self._get_permissions_path_bulk(account, container,
938
                                                           names)
939
        access_objects = self.permissions.access_check_bulk(permissions_path,
940
                                                            user)
941
        #group_parents = access_objects['group_parents']
942
        nobject_permissions = {}
943
        cpath = '/'.join((account, container, ''))
944
        cpath_idx = len(cpath)
945
        for path in permissions_path:
946
            allowed = 1
947
            name = path[cpath_idx:]
948
            if user != account:
949
                try:
950
                    allowed = access_objects[path]
951
                except KeyError:
952
                    raise NotAllowedError
953
            access_dict, allowed = \
954
                self.permissions.access_get_for_bulk(access_objects[path])
955
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
956
                                         access_dict)
957
        self._lookup_objects(permissions_path)
958
        return nobject_permissions
959

    
960
    @debug_method
961
    @backend_method
962
    def get_object_permissions(self, user, account, container, name):
963
        """Return the action allowed on the object, the path
964
        from which the object gets its permissions from,
965
        along with a dictionary containing the permissions."""
966

    
967
        allowed = 'write'
968
        permissions_path = self._get_permissions_path(account, container, name)
969
        if user != account:
970
            if self.permissions.access_check(permissions_path, self.WRITE,
971
                                             user):
972
                allowed = 'write'
973
            elif self.permissions.access_check(permissions_path, self.READ,
974
                                               user):
975
                allowed = 'read'
976
            else:
977
                raise NotAllowedError
978
        self._lookup_object(account, container, name)
979
        return (allowed,
980
                permissions_path,
981
                self.permissions.access_get(permissions_path))
982

    
983
    @debug_method
984
    @backend_method
985
    def update_object_permissions(self, user, account, container, name,
986
                                  permissions):
987
        """Update the permissions associated with the object."""
988

    
989
        if user != account:
990
            raise NotAllowedError
991
        path = self._lookup_object(account, container, name,
992
                                   lock_container=True)[0]
993
        self._check_permissions(path, permissions)
994
        try:
995
            self.permissions.access_set(path, permissions)
996
        except:
997
            raise ValueError
998
        else:
999
            self._report_sharing_change(user, account, path, {'members':
1000
                                        self.permissions.access_members(path)})
1001

    
1002
    @debug_method
1003
    @backend_method
1004
    def get_object_public(self, user, account, container, name):
1005
        """Return the public id of the object if applicable."""
1006

    
1007
        self._can_read(user, account, container, name)
1008
        path = self._lookup_object(account, container, name)[0]
1009
        p = self.permissions.public_get(path)
1010
        return p
1011

    
1012
    @debug_method
1013
    @backend_method
1014
    def update_object_public(self, user, account, container, name, public):
1015
        """Update the public status of the object."""
1016

    
1017
        self._can_write(user, account, container, name)
1018
        path = self._lookup_object(account, container, name,
1019
                                   lock_container=True)[0]
1020
        if not public:
1021
            self.permissions.public_unset(path)
1022
        else:
1023
            self.permissions.public_set(
1024
                path, self.public_url_security, self.public_url_alphabet)
1025

    
1026
    @debug_method
1027
    @backend_method
1028
    def get_object_hashmap(self, user, account, container, name, version=None):
1029
        """Return the object's size and a list with partial hashes."""
1030

    
1031
        self._can_read(user, account, container, name)
1032
        path, node = self._lookup_object(account, container, name)
1033
        props = self._get_version(node, version)
1034
        if props[self.HASH] is None:
1035
            return 0, ()
1036
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
1037
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1038

    
1039
    def _update_object_hash(self, user, account, container, name, size, type,
1040
                            hash, checksum, domain, meta, replace_meta,
1041
                            permissions, src_node=None, src_version_id=None,
1042
                            is_copy=False, report_size_change=True):
1043
        if permissions is not None and user != account:
1044
            raise NotAllowedError
1045
        self._can_write(user, account, container, name)
1046
        if permissions is not None:
1047
            path = '/'.join((account, container, name))
1048
            self._check_permissions(path, permissions)
1049

    
1050
        account_path, account_node = self._lookup_account(account, True)
1051
        container_path, container_node = self._lookup_container(
1052
            account, container)
1053

    
1054
        path, node = self._put_object_node(
1055
            container_path, container_node, name)
1056
        pre_version_id, dest_version_id = self._put_version_duplicate(
1057
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1058
            checksum=checksum, is_copy=is_copy,
1059
            update_statistics_ancestors_depth=1)
1060

    
1061
        # Handle meta.
1062
        if src_version_id is None:
1063
            src_version_id = pre_version_id
1064
        self._put_metadata_duplicate(
1065
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1066

    
1067
        del_size = self._apply_versioning(account, container, pre_version_id,
1068
                                          update_statistics_ancestors_depth=1)
1069
        size_delta = size - del_size
1070
        if size_delta > 0:
1071
            # Check account quota.
1072
            if not self.using_external_quotaholder:
1073
                account_quota = long(self._get_policy(
1074
                    account_node, is_account_policy=True)['quota'])
1075
                account_usage = self._get_statistics(account_node,
1076
                                                     compute=True)[1]
1077
                if (account_quota > 0 and account_usage > account_quota):
1078
                    raise QuotaError(
1079
                        'Account quota exceeded: limit: %s, usage: %s' % (
1080
                            account_quota, account_usage))
1081

    
1082
            # Check container quota.
1083
            container_quota = long(self._get_policy(
1084
                container_node, is_account_policy=False)['quota'])
1085
            container_usage = self._get_statistics(container_node)[1]
1086
            if (container_quota > 0 and container_usage > container_quota):
1087
                # This must be executed in a transaction, so the version is
1088
                # never created if it fails.
1089
                raise QuotaError(
1090
                    'Container quota exceeded: limit: %s, usage: %s' % (
1091
                        container_quota, container_usage
1092
                    )
1093
                )
1094

    
1095
        if report_size_change:
1096
            self._report_size_change(
1097
                user, account, size_delta,
1098
                {'action': 'object update', 'path': path,
1099
                 'versions': ','.join([str(dest_version_id)])})
1100
        if permissions is not None:
1101
            self.permissions.access_set(path, permissions)
1102
            self._report_sharing_change(
1103
                user, account, path,
1104
                {'members': self.permissions.access_members(path)})
1105

    
1106
        self._report_object_change(
1107
            user, account, path,
1108
            details={'version': dest_version_id, 'action': 'object update'})
1109
        return dest_version_id
1110

    
1111
    @debug_method
1112
    def update_object_hashmap(self, user, account, container, name, size, type,
1113
                              hashmap, checksum, domain, meta=None,
1114
                              replace_meta=False, permissions=None):
1115
        """Create/update an object's hashmap and return the new version."""
1116

    
1117
        meta = meta or {}
1118
        if size == 0:  # No such thing as an empty hashmap.
1119
            hashmap = [self.put_block('')]
1120
        map = HashMap(self.block_size, self.hash_algorithm)
1121
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1122
        missing = self.store.block_search(map)
1123
        if missing:
1124
            ie = IndexError()
1125
            ie.data = [binascii.hexlify(x) for x in missing]
1126
            raise ie
1127

    
1128
        hash = map.hash()
1129
        hexlified = binascii.hexlify(hash)
1130
        # _update_object_hash() locks destination path
1131
        dest_version_id = self._update_object_hash(
1132
            user, account, container, name, size, type, hexlified, checksum,
1133
            domain, meta, replace_meta, permissions)
1134
        self.store.map_put(hash, map)
1135
        return dest_version_id, hexlified
1136

    
1137
    @debug_method
1138
    @backend_method
1139
    def update_object_checksum(self, user, account, container, name, version,
1140
                               checksum):
1141
        """Update an object's checksum."""
1142

    
1143
        # Update objects with greater version and same hashmap
1144
        # and size (fix metadata updates).
1145
        self._can_write(user, account, container, name)
1146
        path, node = self._lookup_object(account, container, name,
1147
                                         lock_container=True)
1148
        props = self._get_version(node, version)
1149
        versions = self.node.node_get_versions(node)
1150
        for x in versions:
1151
            if (x[self.SERIAL] >= int(version) and
1152
                x[self.HASH] == props[self.HASH] and
1153
                    x[self.SIZE] == props[self.SIZE]):
1154
                self.node.version_put_property(
1155
                    x[self.SERIAL], 'checksum', checksum)
1156

    
1157
    def _copy_object(self, user, src_account, src_container, src_name,
1158
                     dest_account, dest_container, dest_name, type,
1159
                     dest_domain=None, dest_meta=None, replace_meta=False,
1160
                     permissions=None, src_version=None, is_move=False,
1161
                     delimiter=None):
1162

    
1163
        report_size_change = not is_move
1164
        dest_meta = dest_meta or {}
1165
        dest_version_ids = []
1166
        self._can_read(user, src_account, src_container, src_name)
1167

    
1168
        src_container_path = '/'.join((src_account, src_container))
1169
        dest_container_path = '/'.join((dest_account, dest_container))
1170
        # Lock container paths in alphabetical order
1171
        if src_container_path < dest_container_path:
1172
            self._lookup_container(src_account, src_container)
1173
            self._lookup_container(dest_account, dest_container)
1174
        else:
1175
            self._lookup_container(dest_account, dest_container)
1176
            self._lookup_container(src_account, src_container)
1177

    
1178
        path, node = self._lookup_object(src_account, src_container, src_name)
1179
        # TODO: Will do another fetch of the properties in duplicate version...
1180
        props = self._get_version(
1181
            node, src_version)  # Check to see if source exists.
1182
        src_version_id = props[self.SERIAL]
1183
        hash = props[self.HASH]
1184
        size = props[self.SIZE]
1185
        is_copy = not is_move and (src_account, src_container, src_name) != (
1186
            dest_account, dest_container, dest_name)  # New uuid.
1187
        dest_version_ids.append(self._update_object_hash(
1188
            user, dest_account, dest_container, dest_name, size, type, hash,
1189
            None, dest_domain, dest_meta, replace_meta, permissions,
1190
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1191
            report_size_change=report_size_change))
1192
        if is_move and ((src_account, src_container, src_name) !=
1193
                        (dest_account, dest_container, dest_name)):
1194
            self._delete_object(user, src_account, src_container, src_name,
1195
                                report_size_change=report_size_change)
1196

    
1197
        if delimiter:
1198
            prefix = (src_name + delimiter if not
1199
                      src_name.endswith(delimiter) else src_name)
1200
            src_names = self._list_objects_no_limit(
1201
                user, src_account, src_container, prefix, delimiter=None,
1202
                virtual=False, domain=None, keys=[], shared=False, until=None,
1203
                size_range=None, all_props=True, public=False)
1204
            src_names.sort(key=lambda x: x[2])  # order by nodes
1205
            paths = [elem[0] for elem in src_names]
1206
            nodes = [elem[2] for elem in src_names]
1207
            # TODO: Will do another fetch of the properties
1208
            # in duplicate version...
1209
            props = self._get_versions(nodes)  # Check to see if source exists.
1210

    
1211
            for prop, path, node in zip(props, paths, nodes):
1212
                src_version_id = prop[self.SERIAL]
1213
                hash = prop[self.HASH]
1214
                vtype = prop[self.TYPE]
1215
                size = prop[self.SIZE]
1216
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1217
                    delimiter) else dest_name
1218
                vdest_name = path.replace(prefix, dest_prefix, 1)
1219
                # _update_object_hash() locks destination path
1220
                dest_version_ids.append(self._update_object_hash(
1221
                    user, dest_account, dest_container, vdest_name, size,
1222
                    vtype, hash, None, dest_domain, meta={},
1223
                    replace_meta=False, permissions=None, src_node=node,
1224
                    src_version_id=src_version_id, is_copy=is_copy,
1225
                    report_size_change=report_size_change))
1226
                if is_move and ((src_account, src_container, src_name) !=
1227
                                (dest_account, dest_container, dest_name)):
1228
                    self._delete_object(user, src_account, src_container, path,
1229
                                        report_size_change=report_size_change)
1230
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1231
                dest_version_ids)
1232

    
1233
    @debug_method
1234
    @backend_method
1235
    def copy_object(self, user, src_account, src_container, src_name,
1236
                    dest_account, dest_container, dest_name, type, domain,
1237
                    meta=None, replace_meta=False, permissions=None,
1238
                    src_version=None, delimiter=None):
1239
        """Copy an object's data and metadata."""
1240

    
1241
        meta = meta or {}
1242
        dest_version_id = self._copy_object(
1243
            user, src_account, src_container, src_name, dest_account,
1244
            dest_container, dest_name, type, domain, meta, replace_meta,
1245
            permissions, src_version, False, delimiter)
1246
        return dest_version_id
1247

    
1248
    @debug_method
1249
    @backend_method
1250
    def move_object(self, user, src_account, src_container, src_name,
1251
                    dest_account, dest_container, dest_name, type, domain,
1252
                    meta=None, replace_meta=False, permissions=None,
1253
                    delimiter=None):
1254
        """Move an object's data and metadata."""
1255

    
1256
        meta = meta or {}
1257
        if user != src_account:
1258
            raise NotAllowedError
1259
        dest_version_id = self._move_object(
1260
            user, src_account, src_container, src_name, dest_account,
1261
            dest_container, dest_name, type, domain, meta, replace_meta,
1262
            permissions, None, delimiter=delimiter)
1263
        return dest_version_id
1264

    
1265
    def _delete_object(self, user, account, container, name, until=None,
1266
                       delimiter=None, report_size_change=True):
1267
        if user != account:
1268
            raise NotAllowedError
1269

    
1270
        # lookup object and lock container path also
1271
        path, node = self._lookup_object(account, container, name,
1272
                                         lock_container=True)
1273

    
1274
        if until is not None:
1275
            if node is None:
1276
                return
1277
            hashes = []
1278
            size = 0
1279
            serials = []
1280
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1281
                                           update_statistics_ancestors_depth=1)
1282
            hashes += h
1283
            size += s
1284
            serials += v
1285
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1286
                                           update_statistics_ancestors_depth=1)
1287
            hashes += h
1288
            if not self.free_versioning:
1289
                size += s
1290
            serials += v
1291
            for h in hashes:
1292
                self.store.map_delete(h)
1293
            self.node.node_purge(node, until, CLUSTER_DELETED,
1294
                                 update_statistics_ancestors_depth=1)
1295
            try:
1296
                self._get_version(node)
1297
            except NameError:
1298
                self.permissions.access_clear(path)
1299
            self._report_size_change(
1300
                user, account, -size, {
1301
                    'action': 'object purge',
1302
                    'path': path,
1303
                    'versions': ','.join(str(i) for i in serials)
1304
                }
1305
            )
1306
            return
1307

    
1308
        if not self._exists(node):
1309
            raise ItemNotExists('Object is deleted.')
1310

    
1311
        src_version_id, dest_version_id = self._put_version_duplicate(
1312
            user, node, size=0, type='', hash=None, checksum='',
1313
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1314
        del_size = self._apply_versioning(account, container, src_version_id,
1315
                                          update_statistics_ancestors_depth=1)
1316
        if report_size_change:
1317
            self._report_size_change(
1318
                user, account, -del_size,
1319
                {'action': 'object delete',
1320
                 'path': path,
1321
                 'versions': ','.join([str(dest_version_id)])})
1322
        self._report_object_change(
1323
            user, account, path, details={'action': 'object delete'})
1324
        self.permissions.access_clear(path)
1325

    
1326
        if delimiter:
1327
            prefix = name + delimiter if not name.endswith(delimiter) else name
1328
            src_names = self._list_objects_no_limit(
1329
                user, account, container, prefix, delimiter=None,
1330
                virtual=False, domain=None, keys=[], shared=False, until=None,
1331
                size_range=None, all_props=True, public=False)
1332
            paths = []
1333
            for t in src_names:
1334
                path = '/'.join((account, container, t[0]))
1335
                node = t[2]
1336
                if not self._exists(node):
1337
                    continue
1338
                src_version_id, dest_version_id = self._put_version_duplicate(
1339
                    user, node, size=0, type='', hash=None, checksum='',
1340
                    cluster=CLUSTER_DELETED,
1341
                    update_statistics_ancestors_depth=1)
1342
                del_size = self._apply_versioning(
1343
                    account, container, src_version_id,
1344
                    update_statistics_ancestors_depth=1)
1345
                if report_size_change:
1346
                    self._report_size_change(
1347
                        user, account, -del_size,
1348
                        {'action': 'object delete',
1349
                         'path': path,
1350
                         'versions': ','.join([str(dest_version_id)])})
1351
                self._report_object_change(
1352
                    user, account, path, details={'action': 'object delete'})
1353
                paths.append(path)
1354
            self.permissions.access_clear_bulk(paths)
1355

    
1356
    @debug_method
1357
    @backend_method
1358
    def delete_object(self, user, account, container, name, until=None,
1359
                      prefix='', delimiter=None):
1360
        """Delete/purge an object."""
1361

    
1362
        self._delete_object(user, account, container, name, until, delimiter)
1363

    
1364
    @debug_method
1365
    @backend_method
1366
    def list_versions(self, user, account, container, name):
1367
        """Return a list of all object (version, version_timestamp) tuples."""
1368

    
1369
        self._can_read(user, account, container, name)
1370
        path, node = self._lookup_object(account, container, name)
1371
        versions = self.node.node_get_versions(node)
1372
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1373
                x[self.CLUSTER] != CLUSTER_DELETED]
1374

    
1375
    @debug_method
1376
    @backend_method
1377
    def get_uuid(self, user, uuid, check_permissions=True):
1378
        """Return the (account, container, name) for the UUID given."""
1379

    
1380
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1381
        if info is None:
1382
            raise NameError
1383
        path, serial = info
1384
        account, container, name = path.split('/', 2)
1385
        if check_permissions:
1386
            self._can_read(user, account, container, name)
1387
        return (account, container, name)
1388

    
1389
    @debug_method
1390
    @backend_method
1391
    def get_public(self, user, public):
1392
        """Return the (account, container, name) for the public id given."""
1393

    
1394
        path = self.permissions.public_path(public)
1395
        if path is None:
1396
            raise NameError
1397
        account, container, name = path.split('/', 2)
1398
        self._can_read(user, account, container, name)
1399
        return (account, container, name)
1400

    
1401
    def get_block(self, hash):
1402
        """Return a block's data."""
1403

    
1404
        logger.debug("get_block: %s", hash)
1405
        block = self.store.block_get(self._unhexlify_hash(hash))
1406
        if not block:
1407
            raise ItemNotExists('Block does not exist')
1408
        return block
1409

    
1410
    def put_block(self, data):
1411
        """Store a block and return the hash."""
1412

    
1413
        logger.debug("put_block: %s", len(data))
1414
        return binascii.hexlify(self.store.block_put(data))
1415

    
1416
    def update_block(self, hash, data, offset=0):
1417
        """Update a known block and return the hash."""
1418

    
1419
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1420
        if offset == 0 and len(data) == self.block_size:
1421
            return self.put_block(data)
1422
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1423
        return binascii.hexlify(h)
1424

    
1425
    # Path functions.
1426

    
1427
    def _generate_uuid(self):
1428
        return str(uuidlib.uuid4())
1429

    
1430
    def _put_object_node(self, path, parent, name):
1431
        path = '/'.join((path, name))
1432
        node = self.node.node_lookup(path)
1433
        if node is None:
1434
            node = self.node.node_create(parent, path)
1435
        return path, node
1436

    
1437
    def _put_path(self, user, parent, path,
1438
                  update_statistics_ancestors_depth=None):
1439
        node = self.node.node_create(parent, path)
1440
        self.node.version_create(node, None, 0, '', None, user,
1441
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1442
                                 update_statistics_ancestors_depth)
1443
        return node
1444

    
1445
    def _lookup_account(self, account, create=True):
1446
        node = self.node.node_lookup(account)
1447
        if node is None and create:
1448
            node = self._put_path(
1449
                account, self.ROOTNODE, account,
1450
                update_statistics_ancestors_depth=-1)  # User is account.
1451
        return account, node
1452

    
1453
    def _lookup_container(self, account, container):
1454
        for_update = True if self.lock_container_path else False
1455
        path = '/'.join((account, container))
1456
        node = self.node.node_lookup(path, for_update)
1457
        if node is None:
1458
            raise ItemNotExists('Container does not exist')
1459
        return path, node
1460

    
1461
    def _lookup_object(self, account, container, name, lock_container=False):
1462
        if lock_container:
1463
            self._lookup_container(account, container)
1464

    
1465
        path = '/'.join((account, container, name))
1466
        node = self.node.node_lookup(path)
1467
        if node is None:
1468
            raise ItemNotExists('Object does not exist')
1469
        return path, node
1470

    
1471
    def _lookup_objects(self, paths):
1472
        nodes = self.node.node_lookup_bulk(paths)
1473
        return paths, nodes
1474

    
1475
    def _get_properties(self, node, until=None):
1476
        """Return properties until the timestamp given."""
1477

    
1478
        before = until if until is not None else inf
1479
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1480
        if props is None and until is not None:
1481
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1482
        if props is None:
1483
            raise ItemNotExists('Path does not exist')
1484
        return props
1485

    
1486
    def _get_statistics(self, node, until=None, compute=False):
1487
        """Return (count, sum of size, timestamp) of everything under node."""
1488

    
1489
        if until is not None:
1490
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1491
        elif compute:
1492
            stats = self.node.statistics_latest(node,
1493
                                                except_cluster=CLUSTER_DELETED)
1494
        else:
1495
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1496
        if stats is None:
1497
            stats = (0, 0, 0)
1498
        return stats
1499

    
1500
    def _get_version(self, node, version=None):
1501
        if version is None:
1502
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1503
            if props is None:
1504
                raise ItemNotExists('Object does not exist')
1505
        else:
1506
            try:
1507
                version = int(version)
1508
            except ValueError:
1509
                raise VersionNotExists('Version does not exist')
1510
            props = self.node.version_get_properties(version, node=node)
1511
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1512
                raise VersionNotExists('Version does not exist')
1513
        return props
1514

    
1515
    def _get_versions(self, nodes):
1516
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1517

    
1518
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1519
                               type=None, hash=None, checksum=None,
1520
                               cluster=CLUSTER_NORMAL, is_copy=False,
1521
                               update_statistics_ancestors_depth=None):
1522
        """Create a new version of the node."""
1523

    
1524
        props = self.node.version_lookup(
1525
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1526
        if props is not None:
1527
            src_version_id = props[self.SERIAL]
1528
            src_hash = props[self.HASH]
1529
            src_size = props[self.SIZE]
1530
            src_type = props[self.TYPE]
1531
            src_checksum = props[self.CHECKSUM]
1532
        else:
1533
            src_version_id = None
1534
            src_hash = None
1535
            src_size = 0
1536
            src_type = ''
1537
            src_checksum = ''
1538
        if size is None:  # Set metadata.
1539
            hash = src_hash  # This way hash can be set to None
1540
                             # (account or container).
1541
            size = src_size
1542
        if type is None:
1543
            type = src_type
1544
        if checksum is None:
1545
            checksum = src_checksum
1546
        uuid = self._generate_uuid(
1547
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1548

    
1549
        if src_node is None:
1550
            pre_version_id = src_version_id
1551
        else:
1552
            pre_version_id = None
1553
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1554
            if props is not None:
1555
                pre_version_id = props[self.SERIAL]
1556
        if pre_version_id is not None:
1557
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1558
                                        update_statistics_ancestors_depth)
1559

    
1560
        dest_version_id, mtime = self.node.version_create(
1561
            node, hash, size, type, src_version_id, user, uuid, checksum,
1562
            cluster, update_statistics_ancestors_depth)
1563

    
1564
        self.node.attribute_unset_is_latest(node, dest_version_id)
1565

    
1566
        return pre_version_id, dest_version_id
1567

    
1568
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1569
                                node, meta, replace=False):
1570
        if src_version_id is not None:
1571
            self.node.attribute_copy(src_version_id, dest_version_id)
1572
        if not replace:
1573
            self.node.attribute_del(dest_version_id, domain, (
1574
                k for k, v in meta.iteritems() if v == ''))
1575
            self.node.attribute_set(dest_version_id, domain, node, (
1576
                (k, v) for k, v in meta.iteritems() if v != ''))
1577
        else:
1578
            self.node.attribute_del(dest_version_id, domain)
1579
            self.node.attribute_set(dest_version_id, domain, node, ((
1580
                k, v) for k, v in meta.iteritems()))
1581

    
1582
    def _put_metadata(self, user, node, domain, meta, replace=False,
1583
                      update_statistics_ancestors_depth=None):
1584
        """Create a new version and store metadata."""
1585

    
1586
        src_version_id, dest_version_id = self._put_version_duplicate(
1587
            user, node,
1588
            update_statistics_ancestors_depth=
1589
            update_statistics_ancestors_depth)
1590
        self._put_metadata_duplicate(
1591
            src_version_id, dest_version_id, domain, node, meta, replace)
1592
        return src_version_id, dest_version_id
1593

    
1594
    def _list_limits(self, listing, marker, limit):
1595
        start = 0
1596
        if marker:
1597
            try:
1598
                start = listing.index(marker) + 1
1599
            except ValueError:
1600
                pass
1601
        if not limit or limit > 10000:
1602
            limit = 10000
1603
        return start, limit
1604

    
1605
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1606
                                marker=None, limit=10000, virtual=True,
1607
                                domain=None, keys=None, until=None,
1608
                                size_range=None, allowed=None,
1609
                                all_props=False):
1610
        keys = keys or []
1611
        allowed = allowed or []
1612
        cont_prefix = path + '/'
1613
        prefix = cont_prefix + prefix
1614
        start = cont_prefix + marker if marker else None
1615
        before = until if until is not None else inf
1616
        filterq = keys if domain else []
1617
        sizeq = size_range
1618

    
1619
        objects, prefixes = self.node.latest_version_list(
1620
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1621
            allowed, domain, filterq, sizeq, all_props)
1622
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1623
        objects.sort(key=lambda x: x[0])
1624
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1625
        return objects
1626

    
1627
    # Reporting functions.
1628

    
1629
    @debug_method
1630
    @backend_method
1631
    def _report_size_change(self, user, account, size, details=None):
1632
        details = details or {}
1633

    
1634
        if size == 0:
1635
            return
1636

    
1637
        account_node = self._lookup_account(account, True)[1]
1638
        total = self._get_statistics(account_node, compute=True)[1]
1639
        details.update({'user': user, 'total': total})
1640
        self.messages.append(
1641
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1642
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1643

    
1644
        if not self.using_external_quotaholder:
1645
            return
1646

    
1647
        try:
1648
            name = details['path'] if 'path' in details else ''
1649
            serial = self.astakosclient.issue_one_commission(
1650
                holder=account,
1651
                source=DEFAULT_SOURCE,
1652
                provisions={'pithos.diskspace': size},
1653
                name=name)
1654
        except BaseException, e:
1655
            raise QuotaError(e)
1656
        else:
1657
            self.serials.append(serial)
1658

    
1659
    @debug_method
1660
    @backend_method
1661
    def _report_object_change(self, user, account, path, details=None):
1662
        details = details or {}
1663
        details.update({'user': user})
1664
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1665
                              account, QUEUE_INSTANCE_ID, 'object', path,
1666
                              details))
1667

    
1668
    @debug_method
1669
    @backend_method
1670
    def _report_sharing_change(self, user, account, path, details=None):
1671
        details = details or {}
1672
        details.update({'user': user})
1673
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1674
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1675
                              details))
1676

    
1677
    # Policy functions.
1678

    
1679
    def _check_policy(self, policy, is_account_policy=True):
1680
        default_policy = self.default_account_policy \
1681
            if is_account_policy else self.default_container_policy
1682
        for k in policy.keys():
1683
            if policy[k] == '':
1684
                policy[k] = default_policy.get(k)
1685
        for k, v in policy.iteritems():
1686
            if k == 'quota':
1687
                q = int(v)  # May raise ValueError.
1688
                if q < 0:
1689
                    raise ValueError
1690
            elif k == 'versioning':
1691
                if v not in ['auto', 'none']:
1692
                    raise ValueError
1693
            else:
1694
                raise ValueError
1695

    
1696
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1697
        default_policy = self.default_account_policy \
1698
            if is_account_policy else self.default_container_policy
1699
        if replace:
1700
            for k, v in default_policy.iteritems():
1701
                if k not in policy:
1702
                    policy[k] = v
1703
        self.node.policy_set(node, policy)
1704

    
1705
    def _get_policy(self, node, is_account_policy=True):
1706
        default_policy = self.default_account_policy \
1707
            if is_account_policy else self.default_container_policy
1708
        policy = default_policy.copy()
1709
        policy.update(self.node.policy_get(node))
1710
        return policy
1711

    
1712
    def _apply_versioning(self, account, container, version_id,
1713
                          update_statistics_ancestors_depth=None):
1714
        """Delete the provided version if such is the policy.
1715
           Return size of object removed.
1716
        """
1717

    
1718
        if version_id is None:
1719
            return 0
1720
        path, node = self._lookup_container(account, container)
1721
        versioning = self._get_policy(
1722
            node, is_account_policy=False)['versioning']
1723
        if versioning != 'auto':
1724
            hash, size = self.node.version_remove(
1725
                version_id, update_statistics_ancestors_depth)
1726
            self.store.map_delete(hash)
1727
            return size
1728
        elif self.free_versioning:
1729
            return self.node.version_get_properties(
1730
                version_id, keys=('size',))[0]
1731
        return 0
1732

    
1733
    # Access control functions.
1734

    
1735
    def _check_groups(self, groups):
1736
        # raise ValueError('Bad characters in groups')
1737
        pass
1738

    
1739
    def _check_permissions(self, path, permissions):
1740
        # raise ValueError('Bad characters in permissions')
1741
        pass
1742

    
1743
    def _get_formatted_paths(self, paths):
1744
        formatted = []
1745
        if len(paths) == 0:
1746
            return formatted
1747
        props = self.node.get_props(paths)
1748
        if props:
1749
            for prop in props:
1750
                if prop[1].split(';', 1)[0].strip() in (
1751
                        'application/directory', 'application/folder'):
1752
                    formatted.append((prop[0].rstrip('/') + '/',
1753
                                      self.MATCH_PREFIX))
1754
                formatted.append((prop[0], self.MATCH_EXACT))
1755
        return formatted
1756

    
1757
    def _get_permissions_path(self, account, container, name):
1758
        path = '/'.join((account, container, name))
1759
        permission_paths = self.permissions.access_inherit(path)
1760
        permission_paths.sort()
1761
        permission_paths.reverse()
1762
        for p in permission_paths:
1763
            if p == path:
1764
                return p
1765
            else:
1766
                if p.count('/') < 2:
1767
                    continue
1768
                node = self.node.node_lookup(p)
1769
                props = None
1770
                if node is not None:
1771
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1772
                if props is not None:
1773
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1774
                            'application/directory', 'application/folder'):
1775
                        return p
1776
        return None
1777

    
1778
    def _get_permissions_path_bulk(self, account, container, names):
1779
        formatted_paths = []
1780
        for name in names:
1781
            path = '/'.join((account, container, name))
1782
            formatted_paths.append(path)
1783
        permission_paths = self.permissions.access_inherit_bulk(
1784
            formatted_paths)
1785
        permission_paths.sort()
1786
        permission_paths.reverse()
1787
        permission_paths_list = []
1788
        lookup_list = []
1789
        for p in permission_paths:
1790
            if p in formatted_paths:
1791
                permission_paths_list.append(p)
1792
            else:
1793
                if p.count('/') < 2:
1794
                    continue
1795
                lookup_list.append(p)
1796

    
1797
        if len(lookup_list) > 0:
1798
            props = self.node.get_props(lookup_list)
1799
            if props:
1800
                for prop in props:
1801
                    if prop[1].split(';', 1)[0].strip() in (
1802
                            'application/directory', 'application/folder'):
1803
                        permission_paths_list.append(prop[0])
1804

    
1805
        if len(permission_paths_list) > 0:
1806
            return permission_paths_list
1807

    
1808
        return None
1809

    
1810
    def _can_read(self, user, account, container, name):
1811
        if user == account:
1812
            return True
1813
        path = '/'.join((account, container, name))
1814
        if self.permissions.public_get(path) is not None:
1815
            return True
1816
        path = self._get_permissions_path(account, container, name)
1817
        if not path:
1818
            raise NotAllowedError
1819
        if (not self.permissions.access_check(path, self.READ, user) and not
1820
                self.permissions.access_check(path, self.WRITE, user)):
1821
            raise NotAllowedError
1822

    
1823
    def _can_write(self, user, account, container, name):
1824
        if user == account:
1825
            return True
1826
        path = '/'.join((account, container, name))
1827
        path = self._get_permissions_path(account, container, name)
1828
        if not path:
1829
            raise NotAllowedError
1830
        if not self.permissions.access_check(path, self.WRITE, user):
1831
            raise NotAllowedError
1832

    
1833
    def _allowed_accounts(self, user):
1834
        allow = set()
1835
        for path in self.permissions.access_list_paths(user):
1836
            allow.add(path.split('/', 1)[0])
1837
        return sorted(allow)
1838

    
1839
    def _allowed_containers(self, user, account):
1840
        allow = set()
1841
        for path in self.permissions.access_list_paths(user, account):
1842
            allow.add(path.split('/', 2)[1])
1843
        return sorted(allow)
1844

    
1845
    # Domain functions
1846

    
1847
    @debug_method
1848
    @backend_method
1849
    def get_domain_objects(self, domain, user=None):
1850
        allowed_paths = self.permissions.access_list_paths(
1851
            user, include_owned=user is not None, include_containers=False)
1852
        if not allowed_paths:
1853
            return []
1854
        obj_list = self.node.domain_object_list(
1855
            domain, allowed_paths, CLUSTER_NORMAL)
1856
        return [(path,
1857
                 self._build_metadata(props, user_defined_meta),
1858
                 self.permissions.access_get(path)) for
1859
                path, props, user_defined_meta in obj_list]
1860

    
1861
    # util functions
1862

    
1863
    def _build_metadata(self, props, user_defined=None,
1864
                        include_user_defined=True):
1865
        meta = {'bytes': props[self.SIZE],
1866
                'type': props[self.TYPE],
1867
                'hash': props[self.HASH],
1868
                'version': props[self.SERIAL],
1869
                'version_timestamp': props[self.MTIME],
1870
                'modified_by': props[self.MUSER],
1871
                'uuid': props[self.UUID],
1872
                'checksum': props[self.CHECKSUM]}
1873
        if include_user_defined and user_defined is not None:
1874
            meta.update(user_defined)
1875
        return meta
1876

    
1877
    def _exists(self, node):
1878
        try:
1879
            self._get_version(node)
1880
        except ItemNotExists:
1881
            return False
1882
        else:
1883
            return True
1884

    
1885
    def _unhexlify_hash(self, hash):
1886
        try:
1887
            return binascii.unhexlify(hash)
1888
        except TypeError:
1889
            raise InvalidHash(hash)