Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ e161c24f

History | View | Annotate | Download (73 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.ALLOWED = ['read', 'write']
213

    
214
        self.block_module = load_module(block_module)
215
        self.block_params = block_params
216
        params = {'path': block_path,
217
                  'block_size': self.block_size,
218
                  'hash_algorithm': self.hash_algorithm,
219
                  'umask': block_umask}
220
        params.update(self.block_params)
221
        self.store = self.block_module.Store(**params)
222

    
223
        if queue_module and queue_hosts:
224
            self.queue_module = load_module(queue_module)
225
            params = {'hosts': queue_hosts,
226
                      'exchange': queue_exchange,
227
                      'client_id': QUEUE_CLIENT_ID}
228
            self.queue = self.queue_module.Queue(**params)
229
        else:
230
            class NoQueue:
231
                def send(self, *args):
232
                    pass
233

    
234
                def close(self):
235
                    pass
236

    
237
            self.queue = NoQueue()
238

    
239
        self.astakos_url = astakos_url
240
        self.service_token = service_token
241

    
242
        if not astakos_url or not AstakosClient:
243
            self.astakosclient = DisabledAstakosClient(
244
                astakos_url,
245
                use_pool=True,
246
                pool_size=astakosclient_poolsize)
247
        else:
248
            self.astakosclient = AstakosClient(
249
                astakos_url,
250
                use_pool=True,
251
                pool_size=astakosclient_poolsize)
252

    
253
        self.serials = []
254
        self.messages = []
255

    
256
        self._move_object = partial(self._copy_object, is_move=True)
257

    
258
        self.lock_container_path = False
259

    
260
    def pre_exec(self, lock_container_path=False):
261
        self.lock_container_path = lock_container_path
262
        self.wrapper.execute()
263

    
264
    def post_exec(self, success_status=True):
265
        if success_status:
266
            # send messages produced
267
            for m in self.messages:
268
                self.queue.send(*m)
269

    
270
            # register serials
271
            if self.serials:
272
                self.commission_serials.insert_many(
273
                    self.serials)
274

    
275
                # commit to ensure that the serials are registered
276
                # even if resolve commission fails
277
                self.wrapper.commit()
278

    
279
                # start new transaction
280
                self.wrapper.execute()
281

    
282
                r = self.astakosclient.resolve_commissions(
283
                    token=self.service_token,
284
                    accept_serials=self.serials,
285
                    reject_serials=[])
286
                self.commission_serials.delete_many(
287
                    r['accepted'])
288

    
289
            self.wrapper.commit()
290
        else:
291
            if self.serials:
292
                self.astakosclient.resolve_commissions(
293
                    token=self.service_token,
294
                    accept_serials=[],
295
                    reject_serials=self.serials)
296
                self.commission_serials.delete_many(
297
                    r['rejected'])
298
            self.wrapper.rollback()
299

    
300
    def close(self):
301
        self.wrapper.close()
302
        self.queue.close()
303

    
304
    @property
305
    def using_external_quotaholder(self):
306
        return not isinstance(self.astakosclient, DisabledAstakosClient)
307

    
308
    @debug_method
309
    def list_accounts(self, user, marker=None, limit=10000):
310
        """Return a list of accounts the user can access."""
311

    
312
        allowed = self._allowed_accounts(user)
313
        start, limit = self._list_limits(allowed, marker, limit)
314
        return allowed[start:start + limit]
315

    
316
    @debug_method
317
    def get_account_meta(
318
            self, user, account, domain, until=None, include_user_defined=True,
319
            external_quota=None):
320
        """Return a dictionary with the account metadata for the domain."""
321

    
322
        path, node = self._lookup_account(account, user == account)
323
        if user != account:
324
            if until or (node is None) or (account not
325
                                           in self._allowed_accounts(user)):
326
                raise NotAllowedError
327
        try:
328
            props = self._get_properties(node, until)
329
            mtime = props[self.MTIME]
330
        except NameError:
331
            props = None
332
            mtime = until
333
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
334
        tstamp = max(tstamp, mtime)
335
        if until is None:
336
            modified = tstamp
337
        else:
338
            modified = self._get_statistics(
339
                node, compute=True)[2]  # Overall last modification.
340
            modified = max(modified, mtime)
341

    
342
        if user != account:
343
            meta = {'name': account}
344
        else:
345
            meta = {}
346
            if props is not None and include_user_defined:
347
                meta.update(
348
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
349
            if until is not None:
350
                meta.update({'until_timestamp': tstamp})
351
            meta.update({'name': account, 'count': count, 'bytes': bytes})
352
            if self.using_external_quotaholder:
353
                external_quota = external_quota or {}
354
                meta['bytes'] = external_quota.get('usage', 0)
355
        meta.update({'modified': modified})
356
        return meta
357

    
358
    @debug_method
359
    def update_account_meta(self, user, account, domain, meta, replace=False):
360
        """Update the metadata associated with the account for the domain."""
361

    
362
        if user != account:
363
            raise NotAllowedError
364
        path, node = self._lookup_account(account, True)
365
        self._put_metadata(user, node, domain, meta, replace,
366
                           update_statistics_ancestors_depth=-1)
367

    
368
    @debug_method
369
    def get_account_groups(self, user, account):
370
        """Return a dictionary with the user groups defined for the account."""
371

    
372
        if user != account:
373
            if account not in self._allowed_accounts(user):
374
                raise NotAllowedError
375
            return {}
376
        self._lookup_account(account, True)
377
        return self.permissions.group_dict(account)
378

    
379
    @debug_method
380
    def update_account_groups(self, user, account, groups, replace=False):
381
        """Update the groups associated with the account."""
382

    
383
        if user != account:
384
            raise NotAllowedError
385
        self._lookup_account(account, True)
386
        self._check_groups(groups)
387
        if replace:
388
            self.permissions.group_destroy(account)
389
        for k, v in groups.iteritems():
390
            if not replace:  # If not already deleted.
391
                self.permissions.group_delete(account, k)
392
            if v:
393
                self.permissions.group_addmany(account, k, v)
394

    
395
    @debug_method
396
    def get_account_policy(self, user, account, external_quota=None):
397
        """Return a dictionary with the account policy."""
398

    
399
        if user != account:
400
            if account not in self._allowed_accounts(user):
401
                raise NotAllowedError
402
            return {}
403
        path, node = self._lookup_account(account, True)
404
        policy = self._get_policy(node, is_account_policy=True)
405
        if self.using_external_quotaholder:
406
            external_quota = external_quota or {}
407
            policy['quota'] = external_quota.get('limit', 0)
408
        return policy
409

    
410
    @debug_method
411
    def update_account_policy(self, user, account, policy, replace=False):
412
        """Update the policy associated with the account."""
413

    
414
        if user != account:
415
            raise NotAllowedError
416
        path, node = self._lookup_account(account, True)
417
        self._check_policy(policy, is_account_policy=True)
418
        self._put_policy(node, policy, replace, is_account_policy=True)
419

    
420
    @debug_method
421
    def put_account(self, user, account, policy=None):
422
        """Create a new account with the given name."""
423

    
424
        policy = policy or {}
425
        if user != account:
426
            raise NotAllowedError
427
        node = self.node.node_lookup(account)
428
        if node is not None:
429
            raise AccountExists('Account already exists')
430
        if policy:
431
            self._check_policy(policy, is_account_policy=True)
432
        node = self._put_path(user, self.ROOTNODE, account,
433
                              update_statistics_ancestors_depth=-1)
434
        self._put_policy(node, policy, True, is_account_policy=True)
435

    
436
    @debug_method
437
    def delete_account(self, user, account):
438
        """Delete the account with the given name."""
439

    
440
        if user != account:
441
            raise NotAllowedError
442
        node = self.node.node_lookup(account)
443
        if node is None:
444
            return
445
        if not self.node.node_remove(node,
446
                                     update_statistics_ancestors_depth=-1):
447
            raise AccountNotEmpty('Account is not empty')
448
        self.permissions.group_destroy(account)
449

    
450
    @debug_method
451
    def list_containers(self, user, account, marker=None, limit=10000,
452
                        shared=False, until=None, public=False):
453
        """Return a list of containers existing under an account."""
454

    
455
        if user != account:
456
            if until or account not in self._allowed_accounts(user):
457
                raise NotAllowedError
458
            allowed = self._allowed_containers(user, account)
459
            start, limit = self._list_limits(allowed, marker, limit)
460
            return allowed[start:start + limit]
461
        if shared or public:
462
            allowed = set()
463
            if shared:
464
                allowed.update([x.split('/', 2)[1] for x in
465
                               self.permissions.access_list_shared(account)])
466
            if public:
467
                allowed.update([x[0].split('/', 2)[1] for x in
468
                               self.permissions.public_list(account)])
469
            allowed = sorted(allowed)
470
            start, limit = self._list_limits(allowed, marker, limit)
471
            return allowed[start:start + limit]
472
        node = self.node.node_lookup(account)
473
        containers = [x[0] for x in self._list_object_properties(
474
            node, account, '', '/', marker, limit, False, None, [], until)]
475
        start, limit = self._list_limits(
476
            [x[0] for x in containers], marker, limit)
477
        return containers[start:start + limit]
478

    
479
    @debug_method
480
    def list_container_meta(self, user, account, container, domain,
481
                            until=None):
482
        """Return a list of the container's object meta keys for a domain."""
483

    
484
        allowed = []
485
        if user != account:
486
            if until:
487
                raise NotAllowedError
488
            allowed = self.permissions.access_list_paths(
489
                user, '/'.join((account, container)))
490
            if not allowed:
491
                raise NotAllowedError
492
        path, node = self._lookup_container(account, container)
493
        before = until if until is not None else inf
494
        allowed = self._get_formatted_paths(allowed)
495
        return self.node.latest_attribute_keys(node, domain, before,
496
                                               CLUSTER_DELETED, allowed)
497

    
498
    @debug_method
499
    def get_container_meta(self, user, account, container, domain, until=None,
500
                           include_user_defined=True):
501
        """Return a dictionary with the container metadata for the domain."""
502

    
503
        if user != account:
504
            if until or container not in self._allowed_containers(user,
505
                                                                  account):
506
                raise NotAllowedError
507
        path, node = self._lookup_container(account, container)
508
        props = self._get_properties(node, until)
509
        mtime = props[self.MTIME]
510
        count, bytes, tstamp = self._get_statistics(node, until)
511
        tstamp = max(tstamp, mtime)
512
        if until is None:
513
            modified = tstamp
514
        else:
515
            modified = self._get_statistics(
516
                node)[2]  # Overall last modification.
517
            modified = max(modified, mtime)
518

    
519
        if user != account:
520
            meta = {'name': container}
521
        else:
522
            meta = {}
523
            if include_user_defined:
524
                meta.update(
525
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
526
            if until is not None:
527
                meta.update({'until_timestamp': tstamp})
528
            meta.update({'name': container, 'count': count, 'bytes': bytes})
529
        meta.update({'modified': modified})
530
        return meta
531

    
532
    @debug_method
533
    def update_container_meta(self, user, account, container, domain, meta,
534
                              replace=False):
535
        """Update the metadata associated with the container for the domain."""
536

    
537
        if user != account:
538
            raise NotAllowedError
539
        path, node = self._lookup_container(account, container)
540
        src_version_id, dest_version_id = self._put_metadata(
541
            user, node, domain, meta, replace,
542
            update_statistics_ancestors_depth=0)
543
        if src_version_id is not None:
544
            versioning = self._get_policy(
545
                node, is_account_policy=False)['versioning']
546
            if versioning != 'auto':
547
                self.node.version_remove(src_version_id,
548
                                         update_statistics_ancestors_depth=0)
549

    
550
    @debug_method
551
    def get_container_policy(self, user, account, container):
552
        """Return a dictionary with the container policy."""
553

    
554
        if user != account:
555
            if container not in self._allowed_containers(user, account):
556
                raise NotAllowedError
557
            return {}
558
        path, node = self._lookup_container(account, container)
559
        return self._get_policy(node, is_account_policy=False)
560

    
561
    @debug_method
562
    def update_container_policy(self, user, account, container, policy,
563
                                replace=False):
564
        """Update the policy associated with the container."""
565

    
566
        if user != account:
567
            raise NotAllowedError
568
        path, node = self._lookup_container(account, container)
569
        self._check_policy(policy, is_account_policy=False)
570
        self._put_policy(node, policy, replace, is_account_policy=False)
571

    
572
    @debug_method
573
    def put_container(self, user, account, container, policy=None):
574
        """Create a new container with the given name."""
575

    
576
        policy = policy or {}
577
        if user != account:
578
            raise NotAllowedError
579
        try:
580
            path, node = self._lookup_container(account, container)
581
        except NameError:
582
            pass
583
        else:
584
            raise ContainerExists('Container already exists')
585
        if policy:
586
            self._check_policy(policy, is_account_policy=False)
587
        path = '/'.join((account, container))
588
        node = self._put_path(
589
            user, self._lookup_account(account, True)[1], path,
590
            update_statistics_ancestors_depth=-1)
591
        self._put_policy(node, policy, True, is_account_policy=False)
592

    
593
    @debug_method
594
    def delete_container(self, user, account, container, until=None, prefix='',
595
                         delimiter=None):
596
        """Delete/purge the container with the given name."""
597

    
598
        if user != account:
599
            raise NotAllowedError
600
        path, node = self._lookup_container(account, container)
601

    
602
        if until is not None:
603
            hashes, size, serials = self.node.node_purge_children(
604
                node, until, CLUSTER_HISTORY,
605
                update_statistics_ancestors_depth=0)
606
            for h in hashes:
607
                self.store.map_delete(h)
608
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
609
                                          update_statistics_ancestors_depth=0)
610
            if not self.free_versioning:
611
                self._report_size_change(
612
                    user, account, -size, {
613
                        'action': 'container purge',
614
                        'path': path,
615
                        'versions': ','.join(str(i) for i in serials)
616
                    }
617
                )
618
            return
619

    
620
        if not delimiter:
621
            if self._get_statistics(node)[0] > 0:
622
                raise ContainerNotEmpty('Container is not empty')
623
            hashes, size, serials = self.node.node_purge_children(
624
                node, inf, CLUSTER_HISTORY,
625
                update_statistics_ancestors_depth=0)
626
            for h in hashes:
627
                self.store.map_delete(h)
628
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
629
                                          update_statistics_ancestors_depth=0)
630
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
631
            if not self.free_versioning:
632
                self._report_size_change(
633
                    user, account, -size, {
634
                        'action': 'container purge',
635
                        'path': path,
636
                        'versions': ','.join(str(i) for i in serials)
637
                    }
638
                )
639
        else:
640
            # remove only contents
641
            src_names = self._list_objects_no_limit(
642
                user, account, container, prefix='', delimiter=None,
643
                virtual=False, domain=None, keys=[], shared=False, until=None,
644
                size_range=None, all_props=True, public=False)
645
            paths = []
646
            for t in src_names:
647
                path = '/'.join((account, container, t[0]))
648
                node = t[2]
649
                if not self._exists(node):
650
                    continue
651
                src_version_id, dest_version_id = self._put_version_duplicate(
652
                    user, node, size=0, type='', hash=None, checksum='',
653
                    cluster=CLUSTER_DELETED,
654
                    update_statistics_ancestors_depth=1)
655
                del_size = self._apply_versioning(
656
                    account, container, src_version_id,
657
                    update_statistics_ancestors_depth=1)
658
                self._report_size_change(
659
                    user, account, -del_size, {
660
                        'action': 'object delete',
661
                        'path': path,
662
                        'versions': ','.join([str(dest_version_id)])})
663
                self._report_object_change(
664
                    user, account, path, details={'action': 'object delete'})
665
                paths.append(path)
666
            self.permissions.access_clear_bulk(paths)
667

    
668
    def _list_objects(self, user, account, container, prefix, delimiter,
669
                      marker, limit, virtual, domain, keys, shared, until,
670
                      size_range, all_props, public):
671
        if user != account and until:
672
            raise NotAllowedError
673
        if shared and public:
674
            # get shared first
675
            shared_paths = self._list_object_permissions(
676
                user, account, container, prefix, shared=True, public=False)
677
            objects = set()
678
            if shared_paths:
679
                path, node = self._lookup_container(account, container)
680
                shared_paths = self._get_formatted_paths(shared_paths)
681
                objects |= set(self._list_object_properties(
682
                    node, path, prefix, delimiter, marker, limit, virtual,
683
                    domain, keys, until, size_range, shared_paths, all_props))
684

    
685
            # get public
686
            objects |= set(self._list_public_object_properties(
687
                user, account, container, prefix, all_props))
688
            objects = list(objects)
689

    
690
            objects.sort(key=lambda x: x[0])
691
            start, limit = self._list_limits(
692
                [x[0] for x in objects], marker, limit)
693
            return objects[start:start + limit]
694
        elif public:
695
            objects = self._list_public_object_properties(
696
                user, account, container, prefix, all_props)
697
            start, limit = self._list_limits(
698
                [x[0] for x in objects], marker, limit)
699
            return objects[start:start + limit]
700

    
701
        allowed = self._list_object_permissions(
702
            user, account, container, prefix, shared, public)
703
        if shared and not allowed:
704
            return []
705
        path, node = self._lookup_container(account, container)
706
        allowed = self._get_formatted_paths(allowed)
707
        objects = self._list_object_properties(
708
            node, path, prefix, delimiter, marker, limit, virtual, domain,
709
            keys, until, size_range, allowed, all_props)
710
        start, limit = self._list_limits(
711
            [x[0] for x in objects], marker, limit)
712
        return objects[start:start + limit]
713

    
714
    def _list_public_object_properties(self, user, account, container, prefix,
715
                                       all_props):
716
        public = self._list_object_permissions(
717
            user, account, container, prefix, shared=False, public=True)
718
        paths, nodes = self._lookup_objects(public)
719
        path = '/'.join((account, container))
720
        cont_prefix = path + '/'
721
        paths = [x[len(cont_prefix):] for x in paths]
722
        objects = [(p,) + props for p, props in
723
                   zip(paths, self.node.version_lookup_bulk(
724
                       nodes, all_props=all_props))]
725
        return objects
726

    
727
    def _list_objects_no_limit(self, user, account, container, prefix,
728
                               delimiter, virtual, domain, keys, shared, until,
729
                               size_range, all_props, public):
730
        objects = []
731
        while True:
732
            marker = objects[-1] if objects else None
733
            limit = 10000
734
            l = self._list_objects(
735
                user, account, container, prefix, delimiter, marker, limit,
736
                virtual, domain, keys, shared, until, size_range, all_props,
737
                public)
738
            objects.extend(l)
739
            if not l or len(l) < limit:
740
                break
741
        return objects
742

    
743
    def _list_object_permissions(self, user, account, container, prefix,
744
                                 shared, public):
745
        allowed = []
746
        path = '/'.join((account, container, prefix)).rstrip('/')
747
        if user != account:
748
            allowed = self.permissions.access_list_paths(user, path)
749
            if not allowed:
750
                raise NotAllowedError
751
        else:
752
            allowed = set()
753
            if shared:
754
                allowed.update(self.permissions.access_list_shared(path))
755
            if public:
756
                allowed.update(
757
                    [x[0] for x in self.permissions.public_list(path)])
758
            allowed = sorted(allowed)
759
            if not allowed:
760
                return []
761
        return allowed
762

    
763
    @debug_method
764
    def list_objects(self, user, account, container, prefix='', delimiter=None,
765
                     marker=None, limit=10000, virtual=True, domain=None,
766
                     keys=None, shared=False, until=None, size_range=None,
767
                     public=False):
768
        """List (object name, object version_id) under a container."""
769

    
770
        keys = keys or []
771
        return self._list_objects(
772
            user, account, container, prefix, delimiter, marker, limit,
773
            virtual, domain, keys, shared, until, size_range, False, public)
774

    
775
    @debug_method
776
    def list_object_meta(self, user, account, container, prefix='',
777
                         delimiter=None, marker=None, limit=10000,
778
                         virtual=True, domain=None, keys=None, shared=False,
779
                         until=None, size_range=None, public=False):
780
        """Return a list of metadata dicts of objects under a container."""
781

    
782
        keys = keys or []
783
        props = self._list_objects(
784
            user, account, container, prefix, delimiter, marker, limit,
785
            virtual, domain, keys, shared, until, size_range, True, public)
786
        objects = []
787
        for p in props:
788
            if len(p) == 2:
789
                objects.append({'subdir': p[0]})
790
            else:
791
                objects.append({
792
                    'name': p[0],
793
                    'bytes': p[self.SIZE + 1],
794
                    'type': p[self.TYPE + 1],
795
                    'hash': p[self.HASH + 1],
796
                    'version': p[self.SERIAL + 1],
797
                    'version_timestamp': p[self.MTIME + 1],
798
                    'modified': p[self.MTIME + 1] if until is None else None,
799
                    'modified_by': p[self.MUSER + 1],
800
                    'uuid': p[self.UUID + 1],
801
                    'checksum': p[self.CHECKSUM + 1]})
802
        return objects
803

    
804
    @debug_method
805
    def list_object_permissions(self, user, account, container, prefix=''):
806
        """Return a list of paths enforce permissions under a container."""
807

    
808
        return self._list_object_permissions(user, account, container, prefix,
809
                                             True, False)
810

    
811
    @debug_method
812
    def list_object_public(self, user, account, container, prefix=''):
813
        """Return a mapping of object paths to public ids under a container."""
814

    
815
        public = {}
816
        for path, p in self.permissions.public_list('/'.join((account,
817
                                                              container,
818
                                                              prefix))):
819
            public[path] = p
820
        return public
821

    
822
    @debug_method
823
    def get_object_meta(self, user, account, container, name, domain,
824
                        version=None, include_user_defined=True):
825
        """Return a dictionary with the object metadata for the domain."""
826

    
827
        self._can_read(user, account, container, name)
828
        path, node = self._lookup_object(account, container, name)
829
        props = self._get_version(node, version)
830
        if version is None:
831
            modified = props[self.MTIME]
832
        else:
833
            try:
834
                modified = self._get_version(
835
                    node)[self.MTIME]  # Overall last modification.
836
            except NameError:  # Object may be deleted.
837
                del_props = self.node.version_lookup(
838
                    node, inf, CLUSTER_DELETED)
839
                if del_props is None:
840
                    raise ItemNotExists('Object does not exist')
841
                modified = del_props[self.MTIME]
842

    
843
        meta = {}
844
        if include_user_defined:
845
            meta.update(
846
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
847
        meta.update({'name': name,
848
                     'bytes': props[self.SIZE],
849
                     'type': props[self.TYPE],
850
                     'hash': props[self.HASH],
851
                     'version': props[self.SERIAL],
852
                     'version_timestamp': props[self.MTIME],
853
                     'modified': modified,
854
                     'modified_by': props[self.MUSER],
855
                     'uuid': props[self.UUID],
856
                     'checksum': props[self.CHECKSUM]})
857
        return meta
858

    
859
    @debug_method
860
    def update_object_meta(self, user, account, container, name, domain, meta,
861
                           replace=False):
862
        """Update object metadata for a domain and return the new version."""
863

    
864
        self._can_write(user, account, container, name)
865

    
866
        path, node = self._lookup_object(account, container, name,
867
                                         lock_container=True)
868
        src_version_id, dest_version_id = self._put_metadata(
869
            user, node, domain, meta, replace,
870
            update_statistics_ancestors_depth=1)
871
        self._apply_versioning(account, container, src_version_id,
872
                               update_statistics_ancestors_depth=1)
873
        return dest_version_id
874

    
875
    @debug_method
876
    def get_object_permissions_bulk(self, user, account, container, names):
877
        """Return the action allowed on the object, the path
878
        from which the object gets its permissions from,
879
        along with a dictionary containing the permissions."""
880

    
881
        permissions_path = self._get_permissions_path_bulk(account, container,
882
                                                           names)
883
        access_objects = self.permissions.access_check_bulk(permissions_path,
884
                                                            user)
885
        #group_parents = access_objects['group_parents']
886
        nobject_permissions = {}
887
        cpath = '/'.join((account, container, ''))
888
        cpath_idx = len(cpath)
889
        for path in permissions_path:
890
            allowed = 1
891
            name = path[cpath_idx:]
892
            if user != account:
893
                try:
894
                    allowed = access_objects[path]
895
                except KeyError:
896
                    raise NotAllowedError
897
            access_dict, allowed = \
898
                self.permissions.access_get_for_bulk(access_objects[path])
899
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
900
                                         access_dict)
901
        self._lookup_objects(permissions_path)
902
        return nobject_permissions
903

    
904
    @debug_method
905
    def get_object_permissions(self, user, account, container, name):
906
        """Return the action allowed on the object, the path
907
        from which the object gets its permissions from,
908
        along with a dictionary containing the permissions."""
909

    
910
        allowed = 'write'
911
        permissions_path = self._get_permissions_path(account, container, name)
912
        if user != account:
913
            if self.permissions.access_check(permissions_path, self.WRITE,
914
                                             user):
915
                allowed = 'write'
916
            elif self.permissions.access_check(permissions_path, self.READ,
917
                                               user):
918
                allowed = 'read'
919
            else:
920
                raise NotAllowedError
921
        self._lookup_object(account, container, name)
922
        return (allowed,
923
                permissions_path,
924
                self.permissions.access_get(permissions_path))
925

    
926
    @debug_method
927
    def update_object_permissions(self, user, account, container, name,
928
                                  permissions):
929
        """Update the permissions associated with the object."""
930

    
931
        if user != account:
932
            raise NotAllowedError
933
        path = self._lookup_object(account, container, name,
934
                                   lock_container=True)[0]
935
        self._check_permissions(path, permissions)
936
        self.permissions.access_set(path, permissions)
937
        self._report_sharing_change(user, account, path, {'members':
938
                                    self.permissions.access_members(path)})
939

    
940
    @debug_method
941
    def get_object_public(self, user, account, container, name):
942
        """Return the public id of the object if applicable."""
943

    
944
        self._can_read(user, account, container, name)
945
        path = self._lookup_object(account, container, name)[0]
946
        p = self.permissions.public_get(path)
947
        return p
948

    
949
    @debug_method
950
    def update_object_public(self, user, account, container, name, public):
951
        """Update the public status of the object."""
952

    
953
        self._can_write(user, account, container, name)
954
        path = self._lookup_object(account, container, name,
955
                                   lock_container=True)[0]
956
        if not public:
957
            self.permissions.public_unset(path)
958
        else:
959
            self.permissions.public_set(
960
                path, self.public_url_security, self.public_url_alphabet)
961

    
962
    @debug_method
963
    def get_object_hashmap(self, user, account, container, name, version=None):
964
        """Return the object's size and a list with partial hashes."""
965

    
966
        self._can_read(user, account, container, name)
967
        path, node = self._lookup_object(account, container, name)
968
        props = self._get_version(node, version)
969
        if props[self.HASH] is None:
970
            return 0, ()
971
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
972
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
973

    
974
    def _update_object_hash(self, user, account, container, name, size, type,
975
                            hash, checksum, domain, meta, replace_meta,
976
                            permissions, src_node=None, src_version_id=None,
977
                            is_copy=False, report_size_change=True):
978
        if permissions is not None and user != account:
979
            raise NotAllowedError
980
        self._can_write(user, account, container, name)
981
        if permissions is not None:
982
            path = '/'.join((account, container, name))
983
            self._check_permissions(path, permissions)
984

    
985
        account_path, account_node = self._lookup_account(account, True)
986
        container_path, container_node = self._lookup_container(
987
            account, container)
988

    
989
        path, node = self._put_object_node(
990
            container_path, container_node, name)
991
        pre_version_id, dest_version_id = self._put_version_duplicate(
992
            user, node, src_node=src_node, size=size, type=type, hash=hash,
993
            checksum=checksum, is_copy=is_copy,
994
            update_statistics_ancestors_depth=1)
995

    
996
        # Handle meta.
997
        if src_version_id is None:
998
            src_version_id = pre_version_id
999
        self._put_metadata_duplicate(
1000
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1001

    
1002
        del_size = self._apply_versioning(account, container, pre_version_id,
1003
                                          update_statistics_ancestors_depth=1)
1004
        size_delta = size - del_size
1005
        if size_delta > 0:
1006
            # Check account quota.
1007
            if not self.using_external_quotaholder:
1008
                account_quota = long(self._get_policy(
1009
                    account_node, is_account_policy=True)['quota'])
1010
                account_usage = self._get_statistics(account_node,
1011
                                                     compute=True)[1]
1012
                if (account_quota > 0 and account_usage > account_quota):
1013
                    raise QuotaError(
1014
                        'Account quota exceeded: limit: %s, usage: %s' % (
1015
                            account_quota, account_usage))
1016

    
1017
            # Check container quota.
1018
            container_quota = long(self._get_policy(
1019
                container_node, is_account_policy=False)['quota'])
1020
            container_usage = self._get_statistics(container_node)[1]
1021
            if (container_quota > 0 and container_usage > container_quota):
1022
                # This must be executed in a transaction, so the version is
1023
                # never created if it fails.
1024
                raise QuotaError(
1025
                    'Container quota exceeded: limit: %s, usage: %s' % (
1026
                        container_quota, container_usage
1027
                    )
1028
                )
1029

    
1030
        if report_size_change:
1031
            self._report_size_change(
1032
                user, account, size_delta,
1033
                {'action': 'object update', 'path': path,
1034
                 'versions': ','.join([str(dest_version_id)])})
1035
        if permissions is not None:
1036
            self.permissions.access_set(path, permissions)
1037
            self._report_sharing_change(
1038
                user, account, path,
1039
                {'members': self.permissions.access_members(path)})
1040

    
1041
        self._report_object_change(
1042
            user, account, path,
1043
            details={'version': dest_version_id, 'action': 'object update'})
1044
        return dest_version_id
1045

    
1046
    @debug_method
1047
    def update_object_hashmap(self, user, account, container, name, size, type,
1048
                              hashmap, checksum, domain, meta=None,
1049
                              replace_meta=False, permissions=None):
1050
        """Create/update an object's hashmap and return the new version."""
1051

    
1052
        meta = meta or {}
1053
        if size == 0:  # No such thing as an empty hashmap.
1054
            hashmap = [self.put_block('')]
1055
        map = HashMap(self.block_size, self.hash_algorithm)
1056
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1057
        missing = self.store.block_search(map)
1058
        if missing:
1059
            ie = IndexError()
1060
            ie.data = [binascii.hexlify(x) for x in missing]
1061
            raise ie
1062

    
1063
        hash = map.hash()
1064
        hexlified = binascii.hexlify(hash)
1065
        # _update_object_hash() locks destination path
1066
        dest_version_id = self._update_object_hash(
1067
            user, account, container, name, size, type, hexlified, checksum,
1068
            domain, meta, replace_meta, permissions)
1069
        self.store.map_put(hash, map)
1070
        return dest_version_id, hexlified
1071

    
1072
    @debug_method
1073
    def update_object_checksum(self, user, account, container, name, version,
1074
                               checksum):
1075
        """Update an object's checksum."""
1076

    
1077
        # Update objects with greater version and same hashmap
1078
        # and size (fix metadata updates).
1079
        self._can_write(user, account, container, name)
1080
        path, node = self._lookup_object(account, container, name,
1081
                                         lock_container=True)
1082
        props = self._get_version(node, version)
1083
        versions = self.node.node_get_versions(node)
1084
        for x in versions:
1085
            if (x[self.SERIAL] >= int(version) and
1086
                x[self.HASH] == props[self.HASH] and
1087
                    x[self.SIZE] == props[self.SIZE]):
1088
                self.node.version_put_property(
1089
                    x[self.SERIAL], 'checksum', checksum)
1090

    
1091
    def _copy_object(self, user, src_account, src_container, src_name,
1092
                     dest_account, dest_container, dest_name, type,
1093
                     dest_domain=None, dest_meta=None, replace_meta=False,
1094
                     permissions=None, src_version=None, is_move=False,
1095
                     delimiter=None):
1096

    
1097
        report_size_change = not is_move
1098
        dest_meta = dest_meta or {}
1099
        dest_version_ids = []
1100
        self._can_read(user, src_account, src_container, src_name)
1101

    
1102
        src_container_path = '/'.join((src_account, src_container))
1103
        dest_container_path = '/'.join((dest_account, dest_container))
1104
        # Lock container paths in alphabetical order
1105
        if src_container_path < dest_container_path:
1106
            self._lookup_container(src_account, src_container)
1107
            self._lookup_container(dest_account, dest_container)
1108
        else:
1109
            self._lookup_container(dest_account, dest_container)
1110
            self._lookup_container(src_account, src_container)
1111

    
1112
        path, node = self._lookup_object(src_account, src_container, src_name)
1113
        # TODO: Will do another fetch of the properties in duplicate version...
1114
        props = self._get_version(
1115
            node, src_version)  # Check to see if source exists.
1116
        src_version_id = props[self.SERIAL]
1117
        hash = props[self.HASH]
1118
        size = props[self.SIZE]
1119
        is_copy = not is_move and (src_account, src_container, src_name) != (
1120
            dest_account, dest_container, dest_name)  # New uuid.
1121
        dest_version_ids.append(self._update_object_hash(
1122
            user, dest_account, dest_container, dest_name, size, type, hash,
1123
            None, dest_domain, dest_meta, replace_meta, permissions,
1124
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1125
            report_size_change=report_size_change))
1126
        if is_move and ((src_account, src_container, src_name) !=
1127
                        (dest_account, dest_container, dest_name)):
1128
            self._delete_object(user, src_account, src_container, src_name,
1129
                                report_size_change=report_size_change)
1130

    
1131
        if delimiter:
1132
            prefix = (src_name + delimiter if not
1133
                      src_name.endswith(delimiter) else src_name)
1134
            src_names = self._list_objects_no_limit(
1135
                user, src_account, src_container, prefix, delimiter=None,
1136
                virtual=False, domain=None, keys=[], shared=False, until=None,
1137
                size_range=None, all_props=True, public=False)
1138
            src_names.sort(key=lambda x: x[2])  # order by nodes
1139
            paths = [elem[0] for elem in src_names]
1140
            nodes = [elem[2] for elem in src_names]
1141
            # TODO: Will do another fetch of the properties
1142
            # in duplicate version...
1143
            props = self._get_versions(nodes)  # Check to see if source exists.
1144

    
1145
            for prop, path, node in zip(props, paths, nodes):
1146
                src_version_id = prop[self.SERIAL]
1147
                hash = prop[self.HASH]
1148
                vtype = prop[self.TYPE]
1149
                size = prop[self.SIZE]
1150
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1151
                    delimiter) else dest_name
1152
                vdest_name = path.replace(prefix, dest_prefix, 1)
1153
                # _update_object_hash() locks destination path
1154
                dest_version_ids.append(self._update_object_hash(
1155
                    user, dest_account, dest_container, vdest_name, size,
1156
                    vtype, hash, None, dest_domain, meta={},
1157
                    replace_meta=False, permissions=None, src_node=node,
1158
                    src_version_id=src_version_id, is_copy=is_copy,
1159
                    report_size_change=report_size_change))
1160
                if is_move and ((src_account, src_container, src_name) !=
1161
                                (dest_account, dest_container, dest_name)):
1162
                    self._delete_object(user, src_account, src_container, path,
1163
                                        report_size_change=report_size_change)
1164
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1165
                dest_version_ids)
1166

    
1167
    @debug_method
1168
    def copy_object(self, user, src_account, src_container, src_name,
1169
                    dest_account, dest_container, dest_name, type, domain,
1170
                    meta=None, replace_meta=False, permissions=None,
1171
                    src_version=None, delimiter=None):
1172
        """Copy an object's data and metadata."""
1173

    
1174
        meta = meta or {}
1175
        dest_version_id = self._copy_object(
1176
            user, src_account, src_container, src_name, dest_account,
1177
            dest_container, dest_name, type, domain, meta, replace_meta,
1178
            permissions, src_version, False, delimiter)
1179
        return dest_version_id
1180

    
1181
    @debug_method
1182
    def move_object(self, user, src_account, src_container, src_name,
1183
                    dest_account, dest_container, dest_name, type, domain,
1184
                    meta=None, replace_meta=False, permissions=None,
1185
                    delimiter=None):
1186
        """Move an object's data and metadata."""
1187

    
1188
        meta = meta or {}
1189
        if user != src_account:
1190
            raise NotAllowedError
1191
        dest_version_id = self._move_object(
1192
            user, src_account, src_container, src_name, dest_account,
1193
            dest_container, dest_name, type, domain, meta, replace_meta,
1194
            permissions, None, delimiter=delimiter)
1195
        return dest_version_id
1196

    
1197
    def _delete_object(self, user, account, container, name, until=None,
1198
                       delimiter=None, report_size_change=True):
1199
        if user != account:
1200
            raise NotAllowedError
1201

    
1202
        # lookup object and lock container path also
1203
        path, node = self._lookup_object(account, container, name,
1204
                                         lock_container=True)
1205

    
1206
        if until is not None:
1207
            if node is None:
1208
                return
1209
            hashes = []
1210
            size = 0
1211
            serials = []
1212
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1213
                                           update_statistics_ancestors_depth=1)
1214
            hashes += h
1215
            size += s
1216
            serials += v
1217
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1218
                                           update_statistics_ancestors_depth=1)
1219
            hashes += h
1220
            if not self.free_versioning:
1221
                size += s
1222
            serials += v
1223
            for h in hashes:
1224
                self.store.map_delete(h)
1225
            self.node.node_purge(node, until, CLUSTER_DELETED,
1226
                                 update_statistics_ancestors_depth=1)
1227
            try:
1228
                self._get_version(node)
1229
            except NameError:
1230
                self.permissions.access_clear(path)
1231
            self._report_size_change(
1232
                user, account, -size, {
1233
                    'action': 'object purge',
1234
                    'path': path,
1235
                    'versions': ','.join(str(i) for i in serials)
1236
                }
1237
            )
1238
            return
1239

    
1240
        if not self._exists(node):
1241
            raise ItemNotExists('Object is deleted.')
1242

    
1243
        src_version_id, dest_version_id = self._put_version_duplicate(
1244
            user, node, size=0, type='', hash=None, checksum='',
1245
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1246
        del_size = self._apply_versioning(account, container, src_version_id,
1247
                                          update_statistics_ancestors_depth=1)
1248
        if report_size_change:
1249
            self._report_size_change(
1250
                user, account, -del_size,
1251
                {'action': 'object delete',
1252
                 'path': path,
1253
                 'versions': ','.join([str(dest_version_id)])})
1254
        self._report_object_change(
1255
            user, account, path, details={'action': 'object delete'})
1256
        self.permissions.access_clear(path)
1257

    
1258
        if delimiter:
1259
            prefix = name + delimiter if not name.endswith(delimiter) else name
1260
            src_names = self._list_objects_no_limit(
1261
                user, account, container, prefix, delimiter=None,
1262
                virtual=False, domain=None, keys=[], shared=False, until=None,
1263
                size_range=None, all_props=True, public=False)
1264
            paths = []
1265
            for t in src_names:
1266
                path = '/'.join((account, container, t[0]))
1267
                node = t[2]
1268
                if not self._exists(node):
1269
                    continue
1270
                src_version_id, dest_version_id = self._put_version_duplicate(
1271
                    user, node, size=0, type='', hash=None, checksum='',
1272
                    cluster=CLUSTER_DELETED,
1273
                    update_statistics_ancestors_depth=1)
1274
                del_size = self._apply_versioning(
1275
                    account, container, src_version_id,
1276
                    update_statistics_ancestors_depth=1)
1277
                if report_size_change:
1278
                    self._report_size_change(
1279
                        user, account, -del_size,
1280
                        {'action': 'object delete',
1281
                         'path': path,
1282
                         'versions': ','.join([str(dest_version_id)])})
1283
                self._report_object_change(
1284
                    user, account, path, details={'action': 'object delete'})
1285
                paths.append(path)
1286
            self.permissions.access_clear_bulk(paths)
1287

    
1288
    @debug_method
1289
    def delete_object(self, user, account, container, name, until=None,
1290
                      prefix='', delimiter=None):
1291
        """Delete/purge an object."""
1292

    
1293
        self._delete_object(user, account, container, name, until, delimiter)
1294

    
1295
    @debug_method
1296
    def list_versions(self, user, account, container, name):
1297
        """Return a list of all object (version, version_timestamp) tuples."""
1298

    
1299
        self._can_read(user, account, container, name)
1300
        path, node = self._lookup_object(account, container, name)
1301
        versions = self.node.node_get_versions(node)
1302
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1303
                x[self.CLUSTER] != CLUSTER_DELETED]
1304

    
1305
    @debug_method
1306
    def get_uuid(self, user, uuid):
1307
        """Return the (account, container, name) for the UUID given."""
1308

    
1309
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1310
        if info is None:
1311
            raise NameError
1312
        path, serial = info
1313
        account, container, name = path.split('/', 2)
1314
        self._can_read(user, account, container, name)
1315
        return (account, container, name)
1316

    
1317
    @debug_method
1318
    def get_public(self, user, public):
1319
        """Return the (account, container, name) for the public id given."""
1320

    
1321
        path = self.permissions.public_path(public)
1322
        if path is None:
1323
            raise NameError
1324
        account, container, name = path.split('/', 2)
1325
        self._can_read(user, account, container, name)
1326
        return (account, container, name)
1327

    
1328
    def get_block(self, hash):
1329
        """Return a block's data."""
1330

    
1331
        logger.debug("get_block: %s", hash)
1332
        block = self.store.block_get(self._unhexlify_hash(hash))
1333
        if not block:
1334
            raise ItemNotExists('Block does not exist')
1335
        return block
1336

    
1337
    def put_block(self, data):
1338
        """Store a block and return the hash."""
1339

    
1340
        logger.debug("put_block: %s", len(data))
1341
        return binascii.hexlify(self.store.block_put(data))
1342

    
1343
    def update_block(self, hash, data, offset=0):
1344
        """Update a known block and return the hash."""
1345

    
1346
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1347
        if offset == 0 and len(data) == self.block_size:
1348
            return self.put_block(data)
1349
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1350
        return binascii.hexlify(h)
1351

    
1352
    # Path functions.
1353

    
1354
    def _generate_uuid(self):
1355
        return str(uuidlib.uuid4())
1356

    
1357
    def _put_object_node(self, path, parent, name):
1358
        path = '/'.join((path, name))
1359
        node = self.node.node_lookup(path)
1360
        if node is None:
1361
            node = self.node.node_create(parent, path)
1362
        return path, node
1363

    
1364
    def _put_path(self, user, parent, path,
1365
                  update_statistics_ancestors_depth=None):
1366
        node = self.node.node_create(parent, path)
1367
        self.node.version_create(node, None, 0, '', None, user,
1368
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1369
                                 update_statistics_ancestors_depth)
1370
        return node
1371

    
1372
    def _lookup_account(self, account, create=True):
1373
        node = self.node.node_lookup(account)
1374
        if node is None and create:
1375
            node = self._put_path(
1376
                account, self.ROOTNODE, account,
1377
                update_statistics_ancestors_depth=-1)  # User is account.
1378
        return account, node
1379

    
1380
    def _lookup_container(self, account, container):
1381
        for_update = True if self.lock_container_path else False
1382
        path = '/'.join((account, container))
1383
        node = self.node.node_lookup(path, for_update)
1384
        if node is None:
1385
            raise ItemNotExists('Container does not exist')
1386
        return path, node
1387

    
1388
    def _lookup_object(self, account, container, name, lock_container=False):
1389
        if lock_container:
1390
            self._lookup_container(account, container)
1391

    
1392
        path = '/'.join((account, container, name))
1393
        node = self.node.node_lookup(path)
1394
        if node is None:
1395
            raise ItemNotExists('Object does not exist')
1396
        return path, node
1397

    
1398
    def _lookup_objects(self, paths):
1399
        nodes = self.node.node_lookup_bulk(paths)
1400
        return paths, nodes
1401

    
1402
    def _get_properties(self, node, until=None):
1403
        """Return properties until the timestamp given."""
1404

    
1405
        before = until if until is not None else inf
1406
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1407
        if props is None and until is not None:
1408
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1409
        if props is None:
1410
            raise ItemNotExists('Path does not exist')
1411
        return props
1412

    
1413
    def _get_statistics(self, node, until=None, compute=False):
1414
        """Return (count, sum of size, timestamp) of everything under node."""
1415

    
1416
        if until is not None:
1417
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1418
        elif compute:
1419
            stats = self.node.statistics_latest(node,
1420
                                                except_cluster=CLUSTER_DELETED)
1421
        else:
1422
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1423
        if stats is None:
1424
            stats = (0, 0, 0)
1425
        return stats
1426

    
1427
    def _get_version(self, node, version=None):
1428
        if version is None:
1429
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1430
            if props is None:
1431
                raise ItemNotExists('Object does not exist')
1432
        else:
1433
            try:
1434
                version = int(version)
1435
            except ValueError:
1436
                raise VersionNotExists('Version does not exist')
1437
            props = self.node.version_get_properties(version, node=node)
1438
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1439
                raise VersionNotExists('Version does not exist')
1440
        return props
1441

    
1442
    def _get_versions(self, nodes):
1443
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1444

    
1445
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1446
                               type=None, hash=None, checksum=None,
1447
                               cluster=CLUSTER_NORMAL, is_copy=False,
1448
                               update_statistics_ancestors_depth=None):
1449
        """Create a new version of the node."""
1450

    
1451
        props = self.node.version_lookup(
1452
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1453
        if props is not None:
1454
            src_version_id = props[self.SERIAL]
1455
            src_hash = props[self.HASH]
1456
            src_size = props[self.SIZE]
1457
            src_type = props[self.TYPE]
1458
            src_checksum = props[self.CHECKSUM]
1459
        else:
1460
            src_version_id = None
1461
            src_hash = None
1462
            src_size = 0
1463
            src_type = ''
1464
            src_checksum = ''
1465
        if size is None:  # Set metadata.
1466
            hash = src_hash  # This way hash can be set to None
1467
                             # (account or container).
1468
            size = src_size
1469
        if type is None:
1470
            type = src_type
1471
        if checksum is None:
1472
            checksum = src_checksum
1473
        uuid = self._generate_uuid(
1474
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1475

    
1476
        if src_node is None:
1477
            pre_version_id = src_version_id
1478
        else:
1479
            pre_version_id = None
1480
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1481
            if props is not None:
1482
                pre_version_id = props[self.SERIAL]
1483
        if pre_version_id is not None:
1484
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1485
                                        update_statistics_ancestors_depth)
1486

    
1487
        dest_version_id, mtime = self.node.version_create(
1488
            node, hash, size, type, src_version_id, user, uuid, checksum,
1489
            cluster, update_statistics_ancestors_depth)
1490

    
1491
        self.node.attribute_unset_is_latest(node, dest_version_id)
1492

    
1493
        return pre_version_id, dest_version_id
1494

    
1495
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1496
                                node, meta, replace=False):
1497
        if src_version_id is not None:
1498
            self.node.attribute_copy(src_version_id, dest_version_id)
1499
        if not replace:
1500
            self.node.attribute_del(dest_version_id, domain, (
1501
                k for k, v in meta.iteritems() if v == ''))
1502
            self.node.attribute_set(dest_version_id, domain, node, (
1503
                (k, v) for k, v in meta.iteritems() if v != ''))
1504
        else:
1505
            self.node.attribute_del(dest_version_id, domain)
1506
            self.node.attribute_set(dest_version_id, domain, node, ((
1507
                k, v) for k, v in meta.iteritems()))
1508

    
1509
    def _put_metadata(self, user, node, domain, meta, replace=False,
1510
                      update_statistics_ancestors_depth=None):
1511
        """Create a new version and store metadata."""
1512

    
1513
        src_version_id, dest_version_id = self._put_version_duplicate(
1514
            user, node,
1515
            update_statistics_ancestors_depth=
1516
            update_statistics_ancestors_depth)
1517
        self._put_metadata_duplicate(
1518
            src_version_id, dest_version_id, domain, node, meta, replace)
1519
        return src_version_id, dest_version_id
1520

    
1521
    def _list_limits(self, listing, marker, limit):
1522
        start = 0
1523
        if marker:
1524
            try:
1525
                start = listing.index(marker) + 1
1526
            except ValueError:
1527
                pass
1528
        if not limit or limit > 10000:
1529
            limit = 10000
1530
        return start, limit
1531

    
1532
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1533
                                marker=None, limit=10000, virtual=True,
1534
                                domain=None, keys=None, until=None,
1535
                                size_range=None, allowed=None,
1536
                                all_props=False):
1537
        keys = keys or []
1538
        allowed = allowed or []
1539
        cont_prefix = path + '/'
1540
        prefix = cont_prefix + prefix
1541
        start = cont_prefix + marker if marker else None
1542
        before = until if until is not None else inf
1543
        filterq = keys if domain else []
1544
        sizeq = size_range
1545

    
1546
        objects, prefixes = self.node.latest_version_list(
1547
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1548
            allowed, domain, filterq, sizeq, all_props)
1549
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1550
        objects.sort(key=lambda x: x[0])
1551
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1552
        return objects
1553

    
1554
    # Reporting functions.
1555

    
1556
    @debug_method
1557
    def _report_size_change(self, user, account, size, details=None):
1558
        details = details or {}
1559

    
1560
        if size == 0:
1561
            return
1562

    
1563
        account_node = self._lookup_account(account, True)[1]
1564
        total = self._get_statistics(account_node, compute=True)[1]
1565
        details.update({'user': user, 'total': total})
1566
        self.messages.append(
1567
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1568
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1569

    
1570
        if not self.using_external_quotaholder:
1571
            return
1572

    
1573
        try:
1574
            name = details['path'] if 'path' in details else ''
1575
            serial = self.astakosclient.issue_one_commission(
1576
                token=self.service_token,
1577
                holder=account,
1578
                source=DEFAULT_SOURCE,
1579
                provisions={'pithos.diskspace': size},
1580
                name=name)
1581
        except BaseException, e:
1582
            raise QuotaError(e)
1583
        else:
1584
            self.serials.append(serial)
1585

    
1586
    @debug_method
1587
    def _report_object_change(self, user, account, path, details=None):
1588
        details = details or {}
1589
        details.update({'user': user})
1590
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1591
                              account, QUEUE_INSTANCE_ID, 'object', path,
1592
                              details))
1593

    
1594
    @debug_method
1595
    def _report_sharing_change(self, user, account, path, details=None):
1596
        details = details or {}
1597
        details.update({'user': user})
1598
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1599
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1600
                              details))
1601

    
1602
    # Policy functions.
1603

    
1604
    def _check_policy(self, policy, is_account_policy=True):
1605
        default_policy = self.default_account_policy \
1606
            if is_account_policy else self.default_container_policy
1607
        for k in policy.keys():
1608
            if policy[k] == '':
1609
                policy[k] = default_policy.get(k)
1610
        for k, v in policy.iteritems():
1611
            if k == 'quota':
1612
                q = int(v)  # May raise ValueError.
1613
                if q < 0:
1614
                    raise ValueError
1615
            elif k == 'versioning':
1616
                if v not in ['auto', 'none']:
1617
                    raise ValueError
1618
            else:
1619
                raise ValueError
1620

    
1621
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1622
        default_policy = self.default_account_policy \
1623
            if is_account_policy else self.default_container_policy
1624
        if replace:
1625
            for k, v in default_policy.iteritems():
1626
                if k not in policy:
1627
                    policy[k] = v
1628
        self.node.policy_set(node, policy)
1629

    
1630
    def _get_policy(self, node, is_account_policy=True):
1631
        default_policy = self.default_account_policy \
1632
            if is_account_policy else self.default_container_policy
1633
        policy = default_policy.copy()
1634
        policy.update(self.node.policy_get(node))
1635
        return policy
1636

    
1637
    def _apply_versioning(self, account, container, version_id,
1638
                          update_statistics_ancestors_depth=None):
1639
        """Delete the provided version if such is the policy.
1640
           Return size of object removed.
1641
        """
1642

    
1643
        if version_id is None:
1644
            return 0
1645
        path, node = self._lookup_container(account, container)
1646
        versioning = self._get_policy(
1647
            node, is_account_policy=False)['versioning']
1648
        if versioning != 'auto':
1649
            hash, size = self.node.version_remove(
1650
                version_id, update_statistics_ancestors_depth)
1651
            self.store.map_delete(hash)
1652
            return size
1653
        elif self.free_versioning:
1654
            return self.node.version_get_properties(
1655
                version_id, keys=('size',))[0]
1656
        return 0
1657

    
1658
    # Access control functions.
1659

    
1660
    def _check_groups(self, groups):
1661
        # raise ValueError('Bad characters in groups')
1662
        pass
1663

    
1664
    def _check_permissions(self, path, permissions):
1665
        # raise ValueError('Bad characters in permissions')
1666
        pass
1667

    
1668
    def _get_formatted_paths(self, paths):
1669
        formatted = []
1670
        if len(paths) == 0:
1671
            return formatted
1672
        props = self.node.get_props(paths)
1673
        if props:
1674
            for prop in props:
1675
                if prop[1].split(';', 1)[0].strip() in (
1676
                        'application/directory', 'application/folder'):
1677
                    formatted.append((prop[0].rstrip('/') + '/',
1678
                                      self.MATCH_PREFIX))
1679
                formatted.append((prop[0], self.MATCH_EXACT))
1680
        return formatted
1681

    
1682
    def _get_permissions_path(self, account, container, name):
1683
        path = '/'.join((account, container, name))
1684
        permission_paths = self.permissions.access_inherit(path)
1685
        permission_paths.sort()
1686
        permission_paths.reverse()
1687
        for p in permission_paths:
1688
            if p == path:
1689
                return p
1690
            else:
1691
                if p.count('/') < 2:
1692
                    continue
1693
                node = self.node.node_lookup(p)
1694
                props = None
1695
                if node is not None:
1696
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1697
                if props is not None:
1698
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1699
                            'application/directory', 'application/folder'):
1700
                        return p
1701
        return None
1702

    
1703
    def _get_permissions_path_bulk(self, account, container, names):
1704
        formatted_paths = []
1705
        for name in names:
1706
            path = '/'.join((account, container, name))
1707
            formatted_paths.append(path)
1708
        permission_paths = self.permissions.access_inherit_bulk(
1709
            formatted_paths)
1710
        permission_paths.sort()
1711
        permission_paths.reverse()
1712
        permission_paths_list = []
1713
        lookup_list = []
1714
        for p in permission_paths:
1715
            if p in formatted_paths:
1716
                permission_paths_list.append(p)
1717
            else:
1718
                if p.count('/') < 2:
1719
                    continue
1720
                lookup_list.append(p)
1721

    
1722
        if len(lookup_list) > 0:
1723
            props = self.node.get_props(lookup_list)
1724
            if props:
1725
                for prop in props:
1726
                    if prop[1].split(';', 1)[0].strip() in (
1727
                            'application/directory', 'application/folder'):
1728
                        permission_paths_list.append(prop[0])
1729

    
1730
        if len(permission_paths_list) > 0:
1731
            return permission_paths_list
1732

    
1733
        return None
1734

    
1735
    def _can_read(self, user, account, container, name):
1736
        if user == account:
1737
            return True
1738
        path = '/'.join((account, container, name))
1739
        if self.permissions.public_get(path) is not None:
1740
            return True
1741
        path = self._get_permissions_path(account, container, name)
1742
        if not path:
1743
            raise NotAllowedError
1744
        if (not self.permissions.access_check(path, self.READ, user) and not
1745
                self.permissions.access_check(path, self.WRITE, user)):
1746
            raise NotAllowedError
1747

    
1748
    def _can_write(self, user, account, container, name):
1749
        if user == account:
1750
            return True
1751
        path = '/'.join((account, container, name))
1752
        path = self._get_permissions_path(account, container, name)
1753
        if not path:
1754
            raise NotAllowedError
1755
        if not self.permissions.access_check(path, self.WRITE, user):
1756
            raise NotAllowedError
1757

    
1758
    def _allowed_accounts(self, user):
1759
        allow = set()
1760
        for path in self.permissions.access_list_paths(user):
1761
            allow.add(path.split('/', 1)[0])
1762
        return sorted(allow)
1763

    
1764
    def _allowed_containers(self, user, account):
1765
        allow = set()
1766
        for path in self.permissions.access_list_paths(user, account):
1767
            allow.add(path.split('/', 2)[1])
1768
        return sorted(allow)
1769

    
1770
    # Domain functions
1771

    
1772
    @debug_method
1773
    def get_domain_objects(self, domain, user=None):
1774
        allowed_paths = self.permissions.access_list_paths(
1775
            user, include_owned=user is not None, include_containers=False)
1776
        if not allowed_paths:
1777
            return []
1778
        obj_list = self.node.domain_object_list(
1779
            domain, allowed_paths, CLUSTER_NORMAL)
1780
        return [(path,
1781
                 self._build_metadata(props, user_defined_meta),
1782
                 self.permissions.access_get(path)) for
1783
                path, props, user_defined_meta in obj_list]
1784

    
1785
    # util functions
1786

    
1787
    def _build_metadata(self, props, user_defined=None,
1788
                        include_user_defined=True):
1789
        meta = {'bytes': props[self.SIZE],
1790
                'type': props[self.TYPE],
1791
                'hash': props[self.HASH],
1792
                'version': props[self.SERIAL],
1793
                'version_timestamp': props[self.MTIME],
1794
                'modified_by': props[self.MUSER],
1795
                'uuid': props[self.UUID],
1796
                'checksum': props[self.CHECKSUM]}
1797
        if include_user_defined and user_defined is not None:
1798
            meta.update(user_defined)
1799
        return meta
1800

    
1801
    def _exists(self, node):
1802
        try:
1803
            self._get_version(node)
1804
        except ItemNotExists:
1805
            return False
1806
        else:
1807
            return True
1808

    
1809
    def _unhexlify_hash(self, hash):
1810
        try:
1811
            return binascii.unhexlify(hash)
1812
        except TypeError:
1813
            raise InvalidHash(hash)