Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ a06c276e

History | View | Annotate | Download (70.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.block_module = load_module(block_module)
213
        self.block_params = block_params
214
        params = {'path': block_path,
215
                  'block_size': self.block_size,
216
                  'hash_algorithm': self.hash_algorithm,
217
                  'umask': block_umask}
218
        params.update(self.block_params)
219
        self.store = self.block_module.Store(**params)
220

    
221
        if queue_module and queue_hosts:
222
            self.queue_module = load_module(queue_module)
223
            params = {'hosts': queue_hosts,
224
                      'exchange': queue_exchange,
225
                      'client_id': QUEUE_CLIENT_ID}
226
            self.queue = self.queue_module.Queue(**params)
227
        else:
228
            class NoQueue:
229
                def send(self, *args):
230
                    pass
231

    
232
                def close(self):
233
                    pass
234

    
235
            self.queue = NoQueue()
236

    
237
        self.astakos_url = astakos_url
238
        self.service_token = service_token
239

    
240
        if not astakos_url or not AstakosClient:
241
            self.astakosclient = DisabledAstakosClient(
242
                astakos_url,
243
                use_pool=True,
244
                pool_size=astakosclient_poolsize)
245
        else:
246
            self.astakosclient = AstakosClient(
247
                astakos_url,
248
                use_pool=True,
249
                pool_size=astakosclient_poolsize)
250

    
251
        self.serials = []
252
        self.messages = []
253

    
254
        self._move_object = partial(self._copy_object, is_move=True)
255

    
256
        self.lock_container_path = False
257

    
258
    def pre_exec(self, lock_container_path=False):
259
        self.lock_container_path = lock_container_path
260
        self.wrapper.execute()
261

    
262
    def post_exec(self, success_status=True):
263
        if success_status:
264
            # send messages produced
265
            for m in self.messages:
266
                self.queue.send(*m)
267

    
268
            # register serials
269
            if self.serials:
270
                self.commission_serials.insert_many(
271
                    self.serials)
272

    
273
                # commit to ensure that the serials are registered
274
                # even if resolve commission fails
275
                self.wrapper.commit()
276

    
277
                # start new transaction
278
                self.wrapper.execute()
279

    
280
                r = self.astakosclient.resolve_commissions(
281
                    token=self.service_token,
282
                    accept_serials=self.serials,
283
                    reject_serials=[])
284
                self.commission_serials.delete_many(
285
                    r['accepted'])
286

    
287
            self.wrapper.commit()
288
        else:
289
            if self.serials:
290
                self.astakosclient.resolve_commissions(
291
                    token=self.service_token,
292
                    accept_serials=[],
293
                    reject_serials=self.serials)
294
            self.wrapper.rollback()
295

    
296
    def close(self):
297
        self.wrapper.close()
298
        self.queue.close()
299

    
300
    @property
301
    def using_external_quotaholder(self):
302
        return not isinstance(self.astakosclient, DisabledAstakosClient)
303

    
304
    @debug_method
305
    def list_accounts(self, user, marker=None, limit=10000):
306
        """Return a list of accounts the user can access."""
307

    
308
        allowed = self._allowed_accounts(user)
309
        start, limit = self._list_limits(allowed, marker, limit)
310
        return allowed[start:start + limit]
311

    
312
    @debug_method
313
    def get_account_meta(
314
            self, user, account, domain, until=None, include_user_defined=True,
315
            external_quota=None):
316
        """Return a dictionary with the account metadata for the domain."""
317

    
318
        path, node = self._lookup_account(account, user == account)
319
        if user != account:
320
            if until or (node is None) or (account not
321
                                           in self._allowed_accounts(user)):
322
                raise NotAllowedError
323
        try:
324
            props = self._get_properties(node, until)
325
            mtime = props[self.MTIME]
326
        except NameError:
327
            props = None
328
            mtime = until
329
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
330
        tstamp = max(tstamp, mtime)
331
        if until is None:
332
            modified = tstamp
333
        else:
334
            modified = self._get_statistics(
335
                node, compute=True)[2]  # Overall last modification.
336
            modified = max(modified, mtime)
337

    
338
        if user != account:
339
            meta = {'name': account}
340
        else:
341
            meta = {}
342
            if props is not None and include_user_defined:
343
                meta.update(
344
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
345
            if until is not None:
346
                meta.update({'until_timestamp': tstamp})
347
            meta.update({'name': account, 'count': count, 'bytes': bytes})
348
            if self.using_external_quotaholder:
349
                external_quota = external_quota or {}
350
                meta['bytes'] = external_quota.get('usage', 0)
351
        meta.update({'modified': modified})
352
        return meta
353

    
354
    @debug_method
355
    def update_account_meta(self, user, account, domain, meta, replace=False):
356
        """Update the metadata associated with the account for the domain."""
357

    
358
        if user != account:
359
            raise NotAllowedError
360
        path, node = self._lookup_account(account, True)
361
        self._put_metadata(user, node, domain, meta, replace,
362
                           update_statistics_ancestors_depth=-1)
363

    
364
    @debug_method
365
    def get_account_groups(self, user, account):
366
        """Return a dictionary with the user groups defined for the account."""
367

    
368
        if user != account:
369
            if account not in self._allowed_accounts(user):
370
                raise NotAllowedError
371
            return {}
372
        self._lookup_account(account, True)
373
        return self.permissions.group_dict(account)
374

    
375
    @debug_method
376
    def update_account_groups(self, user, account, groups, replace=False):
377
        """Update the groups associated with the account."""
378

    
379
        if user != account:
380
            raise NotAllowedError
381
        self._lookup_account(account, True)
382
        self._check_groups(groups)
383
        if replace:
384
            self.permissions.group_destroy(account)
385
        for k, v in groups.iteritems():
386
            if not replace:  # If not already deleted.
387
                self.permissions.group_delete(account, k)
388
            if v:
389
                self.permissions.group_addmany(account, k, v)
390

    
391
    @debug_method
392
    def get_account_policy(self, user, account, external_quota=None):
393
        """Return a dictionary with the account policy."""
394

    
395
        if user != account:
396
            if account not in self._allowed_accounts(user):
397
                raise NotAllowedError
398
            return {}
399
        path, node = self._lookup_account(account, True)
400
        policy = self._get_policy(node, is_account_policy=True)
401
        if self.using_external_quotaholder:
402
            external_quota = external_quota or {}
403
            policy['quota'] = external_quota.get('limit', 0)
404
        return policy
405

    
406
    @debug_method
407
    def update_account_policy(self, user, account, policy, replace=False):
408
        """Update the policy associated with the account."""
409

    
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_account(account, True)
413
        self._check_policy(policy, is_account_policy=True)
414
        self._put_policy(node, policy, replace, is_account_policy=True)
415

    
416
    @debug_method
417
    def put_account(self, user, account, policy=None):
418
        """Create a new account with the given name."""
419

    
420
        policy = policy or {}
421
        if user != account:
422
            raise NotAllowedError
423
        node = self.node.node_lookup(account)
424
        if node is not None:
425
            raise AccountExists('Account already exists')
426
        if policy:
427
            self._check_policy(policy, is_account_policy=True)
428
        node = self._put_path(user, self.ROOTNODE, account,
429
                              update_statistics_ancestors_depth=-1)
430
        self._put_policy(node, policy, True, is_account_policy=True)
431

    
432
    @debug_method
433
    def delete_account(self, user, account):
434
        """Delete the account with the given name."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        node = self.node.node_lookup(account)
439
        if node is None:
440
            return
441
        if not self.node.node_remove(node,
442
                                     update_statistics_ancestors_depth=-1):
443
            raise AccountNotEmpty('Account is not empty')
444
        self.permissions.group_destroy(account)
445

    
446
    @debug_method
447
    def list_containers(self, user, account, marker=None, limit=10000,
448
                        shared=False, until=None, public=False):
449
        """Return a list of containers existing under an account."""
450

    
451
        if user != account:
452
            if until or account not in self._allowed_accounts(user):
453
                raise NotAllowedError
454
            allowed = self._allowed_containers(user, account)
455
            start, limit = self._list_limits(allowed, marker, limit)
456
            return allowed[start:start + limit]
457
        if shared or public:
458
            allowed = set()
459
            if shared:
460
                allowed.update([x.split('/', 2)[1] for x in
461
                               self.permissions.access_list_shared(account)])
462
            if public:
463
                allowed.update([x[0].split('/', 2)[1] for x in
464
                               self.permissions.public_list(account)])
465
            allowed = sorted(allowed)
466
            start, limit = self._list_limits(allowed, marker, limit)
467
            return allowed[start:start + limit]
468
        node = self.node.node_lookup(account)
469
        containers = [x[0] for x in self._list_object_properties(
470
            node, account, '', '/', marker, limit, False, None, [], until)]
471
        start, limit = self._list_limits(
472
            [x[0] for x in containers], marker, limit)
473
        return containers[start:start + limit]
474

    
475
    @debug_method
476
    def list_container_meta(self, user, account, container, domain,
477
                            until=None):
478
        """Return a list of the container's object meta keys for a domain."""
479

    
480
        allowed = []
481
        if user != account:
482
            if until:
483
                raise NotAllowedError
484
            allowed = self.permissions.access_list_paths(
485
                user, '/'.join((account, container)))
486
            if not allowed:
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        before = until if until is not None else inf
490
        allowed = self._get_formatted_paths(allowed)
491
        return self.node.latest_attribute_keys(node, domain, before,
492
                                               CLUSTER_DELETED, allowed)
493

    
494
    @debug_method
495
    def get_container_meta(self, user, account, container, domain, until=None,
496
                           include_user_defined=True):
497
        """Return a dictionary with the container metadata for the domain."""
498

    
499
        if user != account:
500
            if until or container not in self._allowed_containers(user,
501
                                                                  account):
502
                raise NotAllowedError
503
        path, node = self._lookup_container(account, container)
504
        props = self._get_properties(node, until)
505
        mtime = props[self.MTIME]
506
        count, bytes, tstamp = self._get_statistics(node, until)
507
        tstamp = max(tstamp, mtime)
508
        if until is None:
509
            modified = tstamp
510
        else:
511
            modified = self._get_statistics(
512
                node)[2]  # Overall last modification.
513
            modified = max(modified, mtime)
514

    
515
        if user != account:
516
            meta = {'name': container}
517
        else:
518
            meta = {}
519
            if include_user_defined:
520
                meta.update(
521
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
522
            if until is not None:
523
                meta.update({'until_timestamp': tstamp})
524
            meta.update({'name': container, 'count': count, 'bytes': bytes})
525
        meta.update({'modified': modified})
526
        return meta
527

    
528
    @debug_method
529
    def update_container_meta(self, user, account, container, domain, meta,
530
                              replace=False):
531
        """Update the metadata associated with the container for the domain."""
532

    
533
        if user != account:
534
            raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536
        src_version_id, dest_version_id = self._put_metadata(
537
            user, node, domain, meta, replace,
538
            update_statistics_ancestors_depth=0)
539
        if src_version_id is not None:
540
            versioning = self._get_policy(
541
                node, is_account_policy=False)['versioning']
542
            if versioning != 'auto':
543
                self.node.version_remove(src_version_id,
544
                                         update_statistics_ancestors_depth=0)
545

    
546
    @debug_method
547
    def get_container_policy(self, user, account, container):
548
        """Return a dictionary with the container policy."""
549

    
550
        if user != account:
551
            if container not in self._allowed_containers(user, account):
552
                raise NotAllowedError
553
            return {}
554
        path, node = self._lookup_container(account, container)
555
        return self._get_policy(node, is_account_policy=False)
556

    
557
    @debug_method
558
    def update_container_policy(self, user, account, container, policy,
559
                                replace=False):
560
        """Update the policy associated with the container."""
561

    
562
        if user != account:
563
            raise NotAllowedError
564
        path, node = self._lookup_container(account, container)
565
        self._check_policy(policy, is_account_policy=False)
566
        self._put_policy(node, policy, replace, is_account_policy=False)
567

    
568
    @debug_method
569
    def put_container(self, user, account, container, policy=None):
570
        """Create a new container with the given name."""
571

    
572
        policy = policy or {}
573
        if user != account:
574
            raise NotAllowedError
575
        try:
576
            path, node = self._lookup_container(account, container)
577
        except NameError:
578
            pass
579
        else:
580
            raise ContainerExists('Container already exists')
581
        if policy:
582
            self._check_policy(policy, is_account_policy=False)
583
        path = '/'.join((account, container))
584
        node = self._put_path(
585
            user, self._lookup_account(account, True)[1], path,
586
            update_statistics_ancestors_depth=-1)
587
        self._put_policy(node, policy, True, is_account_policy=False)
588

    
589
    @debug_method
590
    def delete_container(self, user, account, container, until=None, prefix='',
591
                         delimiter=None):
592
        """Delete/purge the container with the given name."""
593

    
594
        if user != account:
595
            raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597

    
598
        if until is not None:
599
            hashes, size, serials = self.node.node_purge_children(
600
                node, until, CLUSTER_HISTORY,
601
                update_statistics_ancestors_depth=0)
602
            for h in hashes:
603
                self.store.map_delete(h)
604
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
605
                                          update_statistics_ancestors_depth=0)
606
            if not self.free_versioning:
607
                self._report_size_change(
608
                    user, account, -size, {
609
                        'action': 'container purge',
610
                        'path': path,
611
                        'versions': ','.join(str(i) for i in serials)
612
                    }
613
                )
614
            return
615

    
616
        if not delimiter:
617
            if self._get_statistics(node)[0] > 0:
618
                raise ContainerNotEmpty('Container is not empty')
619
            hashes, size, serials = self.node.node_purge_children(
620
                node, inf, CLUSTER_HISTORY,
621
                update_statistics_ancestors_depth=0)
622
            for h in hashes:
623
                self.store.map_delete(h)
624
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
625
                                          update_statistics_ancestors_depth=0)
626
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
627
            if not self.free_versioning:
628
                self._report_size_change(
629
                    user, account, -size, {
630
                        'action': 'container purge',
631
                        'path': path,
632
                        'versions': ','.join(str(i) for i in serials)
633
                    }
634
                )
635
        else:
636
            # remove only contents
637
            src_names = self._list_objects_no_limit(
638
                user, account, container, prefix='', delimiter=None,
639
                virtual=False, domain=None, keys=[], shared=False, until=None,
640
                size_range=None, all_props=True, public=False)
641
            paths = []
642
            for t in src_names:
643
                path = '/'.join((account, container, t[0]))
644
                node = t[2]
645
                if not self._exists(node):
646
                    continue
647
                src_version_id, dest_version_id = self._put_version_duplicate(
648
                    user, node, size=0, type='', hash=None, checksum='',
649
                    cluster=CLUSTER_DELETED,
650
                    update_statistics_ancestors_depth=1)
651
                del_size = self._apply_versioning(
652
                    account, container, src_version_id,
653
                    update_statistics_ancestors_depth=1)
654
                self._report_size_change(
655
                    user, account, -del_size, {
656
                        'action': 'object delete',
657
                        'path': path,
658
                        'versions': ','.join([str(dest_version_id)])})
659
                self._report_object_change(
660
                    user, account, path, details={'action': 'object delete'})
661
                paths.append(path)
662
            self.permissions.access_clear_bulk(paths)
663

    
664
    def _list_objects(self, user, account, container, prefix, delimiter,
665
                      marker, limit, virtual, domain, keys, shared, until,
666
                      size_range, all_props, public):
667
        if user != account and until:
668
            raise NotAllowedError
669
        if shared and public:
670
            # get shared first
671
            shared_paths = self._list_object_permissions(
672
                user, account, container, prefix, shared=True, public=False)
673
            objects = set()
674
            if shared_paths:
675
                path, node = self._lookup_container(account, container)
676
                shared_paths = self._get_formatted_paths(shared_paths)
677
                objects |= set(self._list_object_properties(
678
                    node, path, prefix, delimiter, marker, limit, virtual,
679
                    domain, keys, until, size_range, shared_paths, all_props))
680

    
681
            # get public
682
            objects |= set(self._list_public_object_properties(
683
                user, account, container, prefix, all_props))
684
            objects = list(objects)
685

    
686
            objects.sort(key=lambda x: x[0])
687
            start, limit = self._list_limits(
688
                [x[0] for x in objects], marker, limit)
689
            return objects[start:start + limit]
690
        elif public:
691
            objects = self._list_public_object_properties(
692
                user, account, container, prefix, all_props)
693
            start, limit = self._list_limits(
694
                [x[0] for x in objects], marker, limit)
695
            return objects[start:start + limit]
696

    
697
        allowed = self._list_object_permissions(
698
            user, account, container, prefix, shared, public)
699
        if shared and not allowed:
700
            return []
701
        path, node = self._lookup_container(account, container)
702
        allowed = self._get_formatted_paths(allowed)
703
        objects = self._list_object_properties(
704
            node, path, prefix, delimiter, marker, limit, virtual, domain,
705
            keys, until, size_range, allowed, all_props)
706
        start, limit = self._list_limits(
707
            [x[0] for x in objects], marker, limit)
708
        return objects[start:start + limit]
709

    
710
    def _list_public_object_properties(self, user, account, container, prefix,
711
                                       all_props):
712
        public = self._list_object_permissions(
713
            user, account, container, prefix, shared=False, public=True)
714
        paths, nodes = self._lookup_objects(public)
715
        path = '/'.join((account, container))
716
        cont_prefix = path + '/'
717
        paths = [x[len(cont_prefix):] for x in paths]
718
        objects = [(p,) + props for p, props in
719
                   zip(paths, self.node.version_lookup_bulk(
720
                       nodes, all_props=all_props))]
721
        return objects
722

    
723
    def _list_objects_no_limit(self, user, account, container, prefix,
724
                               delimiter, virtual, domain, keys, shared, until,
725
                               size_range, all_props, public):
726
        objects = []
727
        while True:
728
            marker = objects[-1] if objects else None
729
            limit = 10000
730
            l = self._list_objects(
731
                user, account, container, prefix, delimiter, marker, limit,
732
                virtual, domain, keys, shared, until, size_range, all_props,
733
                public)
734
            objects.extend(l)
735
            if not l or len(l) < limit:
736
                break
737
        return objects
738

    
739
    def _list_object_permissions(self, user, account, container, prefix,
740
                                 shared, public):
741
        allowed = []
742
        path = '/'.join((account, container, prefix)).rstrip('/')
743
        if user != account:
744
            allowed = self.permissions.access_list_paths(user, path)
745
            if not allowed:
746
                raise NotAllowedError
747
        else:
748
            allowed = set()
749
            if shared:
750
                allowed.update(self.permissions.access_list_shared(path))
751
            if public:
752
                allowed.update(
753
                    [x[0] for x in self.permissions.public_list(path)])
754
            allowed = sorted(allowed)
755
            if not allowed:
756
                return []
757
        return allowed
758

    
759
    @debug_method
760
    def list_objects(self, user, account, container, prefix='', delimiter=None,
761
                     marker=None, limit=10000, virtual=True, domain=None,
762
                     keys=None, shared=False, until=None, size_range=None,
763
                     public=False):
764
        """List (object name, object version_id) under a container."""
765

    
766
        keys = keys or []
767
        return self._list_objects(
768
            user, account, container, prefix, delimiter, marker, limit,
769
            virtual, domain, keys, shared, until, size_range, False, public)
770

    
771
    @debug_method
772
    def list_object_meta(self, user, account, container, prefix='',
773
                         delimiter=None, marker=None, limit=10000,
774
                         virtual=True, domain=None, keys=None, shared=False,
775
                         until=None, size_range=None, public=False):
776
        """Return a list of metadata dicts of objects under a container."""
777

    
778
        keys = keys or []
779
        props = self._list_objects(
780
            user, account, container, prefix, delimiter, marker, limit,
781
            virtual, domain, keys, shared, until, size_range, True, public)
782
        objects = []
783
        for p in props:
784
            if len(p) == 2:
785
                objects.append({'subdir': p[0]})
786
            else:
787
                objects.append({
788
                    'name': p[0],
789
                    'bytes': p[self.SIZE + 1],
790
                    'type': p[self.TYPE + 1],
791
                    'hash': p[self.HASH + 1],
792
                    'version': p[self.SERIAL + 1],
793
                    'version_timestamp': p[self.MTIME + 1],
794
                    'modified': p[self.MTIME + 1] if until is None else None,
795
                    'modified_by': p[self.MUSER + 1],
796
                    'uuid': p[self.UUID + 1],
797
                    'checksum': p[self.CHECKSUM + 1]})
798
        return objects
799

    
800
    @debug_method
801
    def list_object_permissions(self, user, account, container, prefix=''):
802
        """Return a list of paths enforce permissions under a container."""
803

    
804
        return self._list_object_permissions(user, account, container, prefix,
805
                                             True, False)
806

    
807
    @debug_method
808
    def list_object_public(self, user, account, container, prefix=''):
809
        """Return a mapping of object paths to public ids under a container."""
810

    
811
        public = {}
812
        for path, p in self.permissions.public_list('/'.join((account,
813
                                                              container,
814
                                                              prefix))):
815
            public[path] = p
816
        return public
817

    
818
    @debug_method
819
    def get_object_meta(self, user, account, container, name, domain,
820
                        version=None, include_user_defined=True):
821
        """Return a dictionary with the object metadata for the domain."""
822

    
823
        self._can_read(user, account, container, name)
824
        path, node = self._lookup_object(account, container, name)
825
        props = self._get_version(node, version)
826
        if version is None:
827
            modified = props[self.MTIME]
828
        else:
829
            try:
830
                modified = self._get_version(
831
                    node)[self.MTIME]  # Overall last modification.
832
            except NameError:  # Object may be deleted.
833
                del_props = self.node.version_lookup(
834
                    node, inf, CLUSTER_DELETED)
835
                if del_props is None:
836
                    raise ItemNotExists('Object does not exist')
837
                modified = del_props[self.MTIME]
838

    
839
        meta = {}
840
        if include_user_defined:
841
            meta.update(
842
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
843
        meta.update({'name': name,
844
                     'bytes': props[self.SIZE],
845
                     'type': props[self.TYPE],
846
                     'hash': props[self.HASH],
847
                     'version': props[self.SERIAL],
848
                     'version_timestamp': props[self.MTIME],
849
                     'modified': modified,
850
                     'modified_by': props[self.MUSER],
851
                     'uuid': props[self.UUID],
852
                     'checksum': props[self.CHECKSUM]})
853
        return meta
854

    
855
    @debug_method
856
    def update_object_meta(self, user, account, container, name, domain, meta,
857
                           replace=False):
858
        """Update object metadata for a domain and return the new version."""
859

    
860
        self._can_write(user, account, container, name)
861

    
862
        path, node = self._lookup_object(account, container, name,
863
                                         lock_container=True)
864
        src_version_id, dest_version_id = self._put_metadata(
865
            user, node, domain, meta, replace,
866
            update_statistics_ancestors_depth=1)
867
        self._apply_versioning(account, container, src_version_id,
868
                               update_statistics_ancestors_depth=1)
869
        return dest_version_id
870

    
871
    @debug_method
872
    def get_object_permissions(self, user, account, container, name):
873
        """Return the action allowed on the object, the path
874
        from which the object gets its permissions from,
875
        along with a dictionary containing the permissions."""
876

    
877
        allowed = 'write'
878
        permissions_path = self._get_permissions_path(account, container, name)
879
        if user != account:
880
            if self.permissions.access_check(permissions_path, self.WRITE,
881
                                             user):
882
                allowed = 'write'
883
            elif self.permissions.access_check(permissions_path, self.READ,
884
                                               user):
885
                allowed = 'read'
886
            else:
887
                raise NotAllowedError
888
        self._lookup_object(account, container, name)
889
        return (allowed,
890
                permissions_path,
891
                self.permissions.access_get(permissions_path))
892

    
893
    @debug_method
894
    def update_object_permissions(self, user, account, container, name,
895
                                  permissions):
896
        """Update the permissions associated with the object."""
897

    
898
        if user != account:
899
            raise NotAllowedError
900
        path = self._lookup_object(account, container, name,
901
                                   lock_container=True)[0]
902
        self._check_permissions(path, permissions)
903
        self.permissions.access_set(path, permissions)
904
        self._report_sharing_change(user, account, path, {'members':
905
                                    self.permissions.access_members(path)})
906

    
907
    @debug_method
908
    def get_object_public(self, user, account, container, name):
909
        """Return the public id of the object if applicable."""
910

    
911
        self._can_read(user, account, container, name)
912
        path = self._lookup_object(account, container, name)[0]
913
        p = self.permissions.public_get(path)
914
        return p
915

    
916
    @debug_method
917
    def update_object_public(self, user, account, container, name, public):
918
        """Update the public status of the object."""
919

    
920
        self._can_write(user, account, container, name)
921
        path = self._lookup_object(account, container, name,
922
                                   lock_container=True)[0]
923
        if not public:
924
            self.permissions.public_unset(path)
925
        else:
926
            self.permissions.public_set(
927
                path, self.public_url_security, self.public_url_alphabet)
928

    
929
    @debug_method
930
    def get_object_hashmap(self, user, account, container, name, version=None):
931
        """Return the object's size and a list with partial hashes."""
932

    
933
        self._can_read(user, account, container, name)
934
        path, node = self._lookup_object(account, container, name)
935
        props = self._get_version(node, version)
936
        if props[self.HASH] is None:
937
            return 0, ()
938
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
939
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
940

    
941
    def _update_object_hash(self, user, account, container, name, size, type,
942
                            hash, checksum, domain, meta, replace_meta,
943
                            permissions, src_node=None, src_version_id=None,
944
                            is_copy=False, report_size_change=True):
945
        if permissions is not None and user != account:
946
            raise NotAllowedError
947
        self._can_write(user, account, container, name)
948
        if permissions is not None:
949
            path = '/'.join((account, container, name))
950
            self._check_permissions(path, permissions)
951

    
952
        account_path, account_node = self._lookup_account(account, True)
953
        container_path, container_node = self._lookup_container(
954
            account, container)
955

    
956
        path, node = self._put_object_node(
957
            container_path, container_node, name)
958
        pre_version_id, dest_version_id = self._put_version_duplicate(
959
            user, node, src_node=src_node, size=size, type=type, hash=hash,
960
            checksum=checksum, is_copy=is_copy,
961
            update_statistics_ancestors_depth=1)
962

    
963
        # Handle meta.
964
        if src_version_id is None:
965
            src_version_id = pre_version_id
966
        self._put_metadata_duplicate(
967
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
968

    
969
        del_size = self._apply_versioning(account, container, pre_version_id,
970
                                          update_statistics_ancestors_depth=1)
971
        size_delta = size - del_size
972
        if size_delta > 0:
973
            # Check account quota.
974
            if not self.using_external_quotaholder:
975
                account_quota = long(self._get_policy(
976
                    account_node, is_account_policy=True)['quota'])
977
                account_usage = self._get_statistics(account_node,
978
                                                     compute=True)[1]
979
                if (account_quota > 0 and account_usage > account_quota):
980
                    raise QuotaError(
981
                        'Account quota exceeded: limit: %s, usage: %s' % (
982
                            account_quota, account_usage))
983

    
984
            # Check container quota.
985
            container_quota = long(self._get_policy(
986
                container_node, is_account_policy=False)['quota'])
987
            container_usage = self._get_statistics(container_node)[1]
988
            if (container_quota > 0 and container_usage > container_quota):
989
                # This must be executed in a transaction, so the version is
990
                # never created if it fails.
991
                raise QuotaError(
992
                    'Container quota exceeded: limit: %s, usage: %s' % (
993
                        container_quota, container_usage
994
                    )
995
                )
996

    
997
        if report_size_change:
998
            self._report_size_change(
999
                user, account, size_delta,
1000
                {'action': 'object update', 'path': path,
1001
                 'versions': ','.join([str(dest_version_id)])})
1002
        if permissions is not None:
1003
            self.permissions.access_set(path, permissions)
1004
            self._report_sharing_change(
1005
                user, account, path,
1006
                {'members': self.permissions.access_members(path)})
1007

    
1008
        self._report_object_change(
1009
            user, account, path,
1010
            details={'version': dest_version_id, 'action': 'object update'})
1011
        return dest_version_id
1012

    
1013
    @debug_method
1014
    def update_object_hashmap(self, user, account, container, name, size, type,
1015
                              hashmap, checksum, domain, meta=None,
1016
                              replace_meta=False, permissions=None):
1017
        """Create/update an object's hashmap and return the new version."""
1018

    
1019
        meta = meta or {}
1020
        if size == 0:  # No such thing as an empty hashmap.
1021
            hashmap = [self.put_block('')]
1022
        map = HashMap(self.block_size, self.hash_algorithm)
1023
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1024
        missing = self.store.block_search(map)
1025
        if missing:
1026
            ie = IndexError()
1027
            ie.data = [binascii.hexlify(x) for x in missing]
1028
            raise ie
1029

    
1030
        hash = map.hash()
1031
        hexlified = binascii.hexlify(hash)
1032
        dest_version_id = self._update_object_hash(
1033
            user, account, container, name, size, type, hexlified, checksum,
1034
            domain, meta, replace_meta, permissions)
1035
        self.store.map_put(hash, map)
1036
        return dest_version_id, hexlified
1037

    
1038
    @debug_method
1039
    def update_object_checksum(self, user, account, container, name, version,
1040
                               checksum):
1041
        """Update an object's checksum."""
1042

    
1043
        # Update objects with greater version and same hashmap
1044
        # and size (fix metadata updates).
1045
        self._can_write(user, account, container, name)
1046
        path, node = self._lookup_object(account, container, name,
1047
                                         lock_container=True)
1048
        props = self._get_version(node, version)
1049
        versions = self.node.node_get_versions(node)
1050
        for x in versions:
1051
            if (x[self.SERIAL] >= int(version) and
1052
                x[self.HASH] == props[self.HASH] and
1053
                    x[self.SIZE] == props[self.SIZE]):
1054
                self.node.version_put_property(
1055
                    x[self.SERIAL], 'checksum', checksum)
1056

    
1057
    def _copy_object(self, user, src_account, src_container, src_name,
1058
                     dest_account, dest_container, dest_name, type,
1059
                     dest_domain=None, dest_meta=None, replace_meta=False,
1060
                     permissions=None, src_version=None, is_move=False,
1061
                     delimiter=None):
1062

    
1063
        report_size_change = not is_move
1064
        dest_meta = dest_meta or {}
1065
        dest_version_ids = []
1066
        self._can_read(user, src_account, src_container, src_name)
1067

    
1068
        src_container_path = '/'.join((src_account, src_container))
1069
        dest_container_path = '/'.join((dest_account, dest_container))
1070
        # Lock container paths in alphabetical order
1071
        if src_container_path < dest_container_path:
1072
            self._lookup_container(src_account, src_container)
1073
            self._lookup_container(dest_account, dest_container)
1074
        else:
1075
            self._lookup_container(dest_account, dest_container)
1076
            self._lookup_container(src_account, src_container)
1077

    
1078
        path, node = self._lookup_object(src_account, src_container, src_name)
1079
        # TODO: Will do another fetch of the properties in duplicate version...
1080
        props = self._get_version(
1081
            node, src_version)  # Check to see if source exists.
1082
        src_version_id = props[self.SERIAL]
1083
        hash = props[self.HASH]
1084
        size = props[self.SIZE]
1085
        is_copy = not is_move and (src_account, src_container, src_name) != (
1086
            dest_account, dest_container, dest_name)  # New uuid.
1087
        dest_version_ids.append(self._update_object_hash(
1088
            user, dest_account, dest_container, dest_name, size, type, hash,
1089
            None, dest_domain, dest_meta, replace_meta, permissions,
1090
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1091
            report_size_change=report_size_change))
1092
        if is_move and ((src_account, src_container, src_name) !=
1093
                        (dest_account, dest_container, dest_name)):
1094
            self._delete_object(user, src_account, src_container, src_name,
1095
                                report_size_change=report_size_change)
1096

    
1097
        if delimiter:
1098
            prefix = (src_name + delimiter if not
1099
                      src_name.endswith(delimiter) else src_name)
1100
            src_names = self._list_objects_no_limit(
1101
                user, src_account, src_container, prefix, delimiter=None,
1102
                virtual=False, domain=None, keys=[], shared=False, until=None,
1103
                size_range=None, all_props=True, public=False)
1104
            src_names.sort(key=lambda x: x[2])  # order by nodes
1105
            paths = [elem[0] for elem in src_names]
1106
            nodes = [elem[2] for elem in src_names]
1107
            # TODO: Will do another fetch of the properties
1108
            # in duplicate version...
1109
            props = self._get_versions(nodes)  # Check to see if source exists.
1110

    
1111
            for prop, path, node in zip(props, paths, nodes):
1112
                src_version_id = prop[self.SERIAL]
1113
                hash = prop[self.HASH]
1114
                vtype = prop[self.TYPE]
1115
                size = prop[self.SIZE]
1116
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1117
                    delimiter) else dest_name
1118
                vdest_name = path.replace(prefix, dest_prefix, 1)
1119
                dest_version_ids.append(self._update_object_hash(
1120
                    user, dest_account, dest_container, vdest_name, size,
1121
                    vtype, hash, None, dest_domain, meta={},
1122
                    replace_meta=False, permissions=None, src_node=node,
1123
                    src_version_id=src_version_id, is_copy=is_copy))
1124
                if is_move and ((src_account, src_container, src_name) !=
1125
                                (dest_account, dest_container, dest_name)):
1126
                    self._delete_object(user, src_account, src_container, path)
1127
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1128
                dest_version_ids)
1129

    
1130
    @debug_method
1131
    def copy_object(self, user, src_account, src_container, src_name,
1132
                    dest_account, dest_container, dest_name, type, domain,
1133
                    meta=None, replace_meta=False, permissions=None,
1134
                    src_version=None, delimiter=None):
1135
        """Copy an object's data and metadata."""
1136

    
1137
        meta = meta or {}
1138
        dest_version_id = self._copy_object(
1139
            user, src_account, src_container, src_name, dest_account,
1140
            dest_container, dest_name, type, domain, meta, replace_meta,
1141
            permissions, src_version, False, delimiter)
1142
        return dest_version_id
1143

    
1144
    @debug_method
1145
    def move_object(self, user, src_account, src_container, src_name,
1146
                    dest_account, dest_container, dest_name, type, domain,
1147
                    meta=None, replace_meta=False, permissions=None,
1148
                    delimiter=None):
1149
        """Move an object's data and metadata."""
1150

    
1151
        meta = meta or {}
1152
        if user != src_account:
1153
            raise NotAllowedError
1154
        dest_version_id = self._move_object(
1155
            user, src_account, src_container, src_name, dest_account,
1156
            dest_container, dest_name, type, domain, meta, replace_meta,
1157
            permissions, None, delimiter=delimiter)
1158
        return dest_version_id
1159

    
1160
    def _delete_object(self, user, account, container, name, until=None,
1161
                       delimiter=None, report_size_change=True):
1162
        if user != account:
1163
            raise NotAllowedError
1164

    
1165
        # lookup object and lock container path also
1166
        path, node = self._lookup_object(account, container, name,
1167
                                         lock_container=True)
1168

    
1169
        if until is not None:
1170
            if node is None:
1171
                return
1172
            hashes = []
1173
            size = 0
1174
            serials = []
1175
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1176
                                           update_statistics_ancestors_depth=1)
1177
            hashes += h
1178
            size += s
1179
            serials += v
1180
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1181
                                           update_statistics_ancestors_depth=1)
1182
            hashes += h
1183
            if not self.free_versioning:
1184
                size += s
1185
            serials += v
1186
            for h in hashes:
1187
                self.store.map_delete(h)
1188
            self.node.node_purge(node, until, CLUSTER_DELETED,
1189
                                 update_statistics_ancestors_depth=1)
1190
            try:
1191
                self._get_version(node)
1192
            except NameError:
1193
                self.permissions.access_clear(path)
1194
            self._report_size_change(
1195
                user, account, -size, {
1196
                    'action': 'object purge',
1197
                    'path': path,
1198
                    'versions': ','.join(str(i) for i in serials)
1199
                }
1200
            )
1201
            return
1202

    
1203
        if not self._exists(node):
1204
            raise ItemNotExists('Object is deleted.')
1205
        src_version_id, dest_version_id = self._put_version_duplicate(
1206
            user, node, size=0, type='', hash=None, checksum='',
1207
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1208
        del_size = self._apply_versioning(account, container, src_version_id,
1209
                                          update_statistics_ancestors_depth=1)
1210
        if report_size_change:
1211
            self._report_size_change(
1212
                user, account, -del_size,
1213
                {'action': 'object delete',
1214
                 'path': path,
1215
                 'versions': ','.join([str(dest_version_id)])})
1216
        self._report_object_change(
1217
            user, account, path, details={'action': 'object delete'})
1218
        self.permissions.access_clear(path)
1219

    
1220
        if delimiter:
1221
            prefix = name + delimiter if not name.endswith(delimiter) else name
1222
            src_names = self._list_objects_no_limit(
1223
                user, account, container, prefix, delimiter=None,
1224
                virtual=False, domain=None, keys=[], shared=False, until=None,
1225
                size_range=None, all_props=True, public=False)
1226
            paths = []
1227
            for t in src_names:
1228
                path = '/'.join((account, container, t[0]))
1229
                node = t[2]
1230
                if not self._exists(node):
1231
                    continue
1232
                src_version_id, dest_version_id = self._put_version_duplicate(
1233
                    user, node, size=0, type='', hash=None, checksum='',
1234
                    cluster=CLUSTER_DELETED,
1235
                    update_statistics_ancestors_depth=1)
1236
                del_size = self._apply_versioning(
1237
                    account, container, src_version_id,
1238
                    update_statistics_ancestors_depth=1)
1239
                if report_size_change:
1240
                    self._report_size_change(
1241
                        user, account, -del_size,
1242
                        {'action': 'object delete',
1243
                         'path': path,
1244
                         'versions': ','.join([str(dest_version_id)])})
1245
                self._report_object_change(
1246
                    user, account, path, details={'action': 'object delete'})
1247
                paths.append(path)
1248
            self.permissions.access_clear_bulk(paths)
1249

    
1250
    @debug_method
1251
    def delete_object(self, user, account, container, name, until=None,
1252
                      prefix='', delimiter=None):
1253
        """Delete/purge an object."""
1254

    
1255
        self._delete_object(user, account, container, name, until, delimiter)
1256

    
1257
    @debug_method
1258
    def list_versions(self, user, account, container, name):
1259
        """Return a list of all object (version, version_timestamp) tuples."""
1260

    
1261
        self._can_read(user, account, container, name)
1262
        path, node = self._lookup_object(account, container, name)
1263
        versions = self.node.node_get_versions(node)
1264
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1265
                x[self.CLUSTER] != CLUSTER_DELETED]
1266

    
1267
    @debug_method
1268
    def get_uuid(self, user, uuid):
1269
        """Return the (account, container, name) for the UUID given."""
1270

    
1271
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1272
        if info is None:
1273
            raise NameError
1274
        path, serial = info
1275
        account, container, name = path.split('/', 2)
1276
        self._can_read(user, account, container, name)
1277
        return (account, container, name)
1278

    
1279
    @debug_method
1280
    def get_public(self, user, public):
1281
        """Return the (account, container, name) for the public id given."""
1282

    
1283
        path = self.permissions.public_path(public)
1284
        if path is None:
1285
            raise NameError
1286
        account, container, name = path.split('/', 2)
1287
        self._can_read(user, account, container, name)
1288
        return (account, container, name)
1289

    
1290
    def get_block(self, hash):
1291
        """Return a block's data."""
1292

    
1293
        logger.debug("get_block: %s", hash)
1294
        block = self.store.block_get(self._unhexlify_hash(hash))
1295
        if not block:
1296
            raise ItemNotExists('Block does not exist')
1297
        return block
1298

    
1299
    def put_block(self, data):
1300
        """Store a block and return the hash."""
1301

    
1302
        logger.debug("put_block: %s", len(data))
1303
        return binascii.hexlify(self.store.block_put(data))
1304

    
1305
    def update_block(self, hash, data, offset=0):
1306
        """Update a known block and return the hash."""
1307

    
1308
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1309
        if offset == 0 and len(data) == self.block_size:
1310
            return self.put_block(data)
1311
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1312
        return binascii.hexlify(h)
1313

    
1314
    # Path functions.
1315

    
1316
    def _generate_uuid(self):
1317
        return str(uuidlib.uuid4())
1318

    
1319
    def _put_object_node(self, path, parent, name):
1320
        path = '/'.join((path, name))
1321
        node = self.node.node_lookup(path)
1322
        if node is None:
1323
            node = self.node.node_create(parent, path)
1324
        return path, node
1325

    
1326
    def _put_path(self, user, parent, path,
1327
                  update_statistics_ancestors_depth=None):
1328
        node = self.node.node_create(parent, path)
1329
        self.node.version_create(node, None, 0, '', None, user,
1330
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1331
                                 update_statistics_ancestors_depth)
1332
        return node
1333

    
1334
    def _lookup_account(self, account, create=True):
1335
        node = self.node.node_lookup(account)
1336
        if node is None and create:
1337
            node = self._put_path(
1338
                account, self.ROOTNODE, account,
1339
                update_statistics_ancestors_depth=-1)  # User is account.
1340
        return account, node
1341

    
1342
    def _lookup_container(self, account, container):
1343
        for_update = True if self.lock_container_path else False
1344
        path = '/'.join((account, container))
1345
        node = self.node.node_lookup(path, for_update)
1346
        if node is None:
1347
            raise ItemNotExists('Container does not exist')
1348
        return path, node
1349

    
1350
    def _lookup_object(self, account, container, name, lock_container=False):
1351
        if lock_container:
1352
            self._lookup_container(account, container)
1353

    
1354
        path = '/'.join((account, container, name))
1355
        node = self.node.node_lookup(path)
1356
        if node is None:
1357
            raise ItemNotExists('Object does not exist')
1358
        return path, node
1359

    
1360
    def _lookup_objects(self, paths):
1361
        nodes = self.node.node_lookup_bulk(paths)
1362
        return paths, nodes
1363

    
1364
    def _get_properties(self, node, until=None):
1365
        """Return properties until the timestamp given."""
1366

    
1367
        before = until if until is not None else inf
1368
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1369
        if props is None and until is not None:
1370
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1371
        if props is None:
1372
            raise ItemNotExists('Path does not exist')
1373
        return props
1374

    
1375
    def _get_statistics(self, node, until=None, compute=False):
1376
        """Return (count, sum of size, timestamp) of everything under node."""
1377

    
1378
        if until is not None:
1379
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1380
        elif compute:
1381
            stats = self.node.statistics_latest(node,
1382
                                                except_cluster=CLUSTER_DELETED)
1383
        else:
1384
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1385
        if stats is None:
1386
            stats = (0, 0, 0)
1387
        return stats
1388

    
1389
    def _get_version(self, node, version=None):
1390
        if version is None:
1391
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1392
            if props is None:
1393
                raise ItemNotExists('Object does not exist')
1394
        else:
1395
            try:
1396
                version = int(version)
1397
            except ValueError:
1398
                raise VersionNotExists('Version does not exist')
1399
            props = self.node.version_get_properties(version, node=node)
1400
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1401
                raise VersionNotExists('Version does not exist')
1402
        return props
1403

    
1404
    def _get_versions(self, nodes):
1405
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1406

    
1407
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1408
                               type=None, hash=None, checksum=None,
1409
                               cluster=CLUSTER_NORMAL, is_copy=False,
1410
                               update_statistics_ancestors_depth=None):
1411
        """Create a new version of the node."""
1412

    
1413
        props = self.node.version_lookup(
1414
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1415
        if props is not None:
1416
            src_version_id = props[self.SERIAL]
1417
            src_hash = props[self.HASH]
1418
            src_size = props[self.SIZE]
1419
            src_type = props[self.TYPE]
1420
            src_checksum = props[self.CHECKSUM]
1421
        else:
1422
            src_version_id = None
1423
            src_hash = None
1424
            src_size = 0
1425
            src_type = ''
1426
            src_checksum = ''
1427
        if size is None:  # Set metadata.
1428
            hash = src_hash  # This way hash can be set to None
1429
                             # (account or container).
1430
            size = src_size
1431
        if type is None:
1432
            type = src_type
1433
        if checksum is None:
1434
            checksum = src_checksum
1435
        uuid = self._generate_uuid(
1436
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1437

    
1438
        if src_node is None:
1439
            pre_version_id = src_version_id
1440
        else:
1441
            pre_version_id = None
1442
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1443
            if props is not None:
1444
                pre_version_id = props[self.SERIAL]
1445
        if pre_version_id is not None:
1446
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1447
                                        update_statistics_ancestors_depth)
1448

    
1449
        dest_version_id, mtime = self.node.version_create(
1450
            node, hash, size, type, src_version_id, user, uuid, checksum,
1451
            cluster, update_statistics_ancestors_depth)
1452

    
1453
        self.node.attribute_unset_is_latest(node, dest_version_id)
1454

    
1455
        return pre_version_id, dest_version_id
1456

    
1457
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1458
                                node, meta, replace=False):
1459
        if src_version_id is not None:
1460
            self.node.attribute_copy(src_version_id, dest_version_id)
1461
        if not replace:
1462
            self.node.attribute_del(dest_version_id, domain, (
1463
                k for k, v in meta.iteritems() if v == ''))
1464
            self.node.attribute_set(dest_version_id, domain, node, (
1465
                (k, v) for k, v in meta.iteritems() if v != ''))
1466
        else:
1467
            self.node.attribute_del(dest_version_id, domain)
1468
            self.node.attribute_set(dest_version_id, domain, node, ((
1469
                k, v) for k, v in meta.iteritems()))
1470

    
1471
    def _put_metadata(self, user, node, domain, meta, replace=False,
1472
                      update_statistics_ancestors_depth=None):
1473
        """Create a new version and store metadata."""
1474

    
1475
        src_version_id, dest_version_id = self._put_version_duplicate(
1476
            user, node,
1477
            update_statistics_ancestors_depth=
1478
            update_statistics_ancestors_depth)
1479
        self._put_metadata_duplicate(
1480
            src_version_id, dest_version_id, domain, node, meta, replace)
1481
        return src_version_id, dest_version_id
1482

    
1483
    def _list_limits(self, listing, marker, limit):
1484
        start = 0
1485
        if marker:
1486
            try:
1487
                start = listing.index(marker) + 1
1488
            except ValueError:
1489
                pass
1490
        if not limit or limit > 10000:
1491
            limit = 10000
1492
        return start, limit
1493

    
1494
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1495
                                marker=None, limit=10000, virtual=True,
1496
                                domain=None, keys=None, until=None,
1497
                                size_range=None, allowed=None,
1498
                                all_props=False):
1499
        keys = keys or []
1500
        allowed = allowed or []
1501
        cont_prefix = path + '/'
1502
        prefix = cont_prefix + prefix
1503
        start = cont_prefix + marker if marker else None
1504
        before = until if until is not None else inf
1505
        filterq = keys if domain else []
1506
        sizeq = size_range
1507

    
1508
        objects, prefixes = self.node.latest_version_list(
1509
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1510
            allowed, domain, filterq, sizeq, all_props)
1511
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1512
        objects.sort(key=lambda x: x[0])
1513
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1514
        return objects
1515

    
1516
    # Reporting functions.
1517

    
1518
    @debug_method
1519
    def _report_size_change(self, user, account, size, details=None):
1520
        details = details or {}
1521

    
1522
        if size == 0:
1523
            return
1524

    
1525
        account_node = self._lookup_account(account, True)[1]
1526
        total = self._get_statistics(account_node, compute=True)[1]
1527
        details.update({'user': user, 'total': total})
1528
        self.messages.append(
1529
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1530
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1531

    
1532
        if not self.using_external_quotaholder:
1533
            return
1534

    
1535
        try:
1536
            name = details['path'] if 'path' in details else ''
1537
            serial = self.astakosclient.issue_one_commission(
1538
                token=self.service_token,
1539
                holder=account,
1540
                source=DEFAULT_SOURCE,
1541
                provisions={'pithos.diskspace': size},
1542
                name=name)
1543
        except BaseException, e:
1544
            raise QuotaError(e)
1545
        else:
1546
            self.serials.append(serial)
1547

    
1548
    @debug_method
1549
    def _report_object_change(self, user, account, path, details=None):
1550
        details = details or {}
1551
        details.update({'user': user})
1552
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1553
                              account, QUEUE_INSTANCE_ID, 'object', path,
1554
                              details))
1555

    
1556
    @debug_method
1557
    def _report_sharing_change(self, user, account, path, details=None):
1558
        details = details or {}
1559
        details.update({'user': user})
1560
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1561
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1562
                              details))
1563

    
1564
    # Policy functions.
1565

    
1566
    def _check_policy(self, policy, is_account_policy=True):
1567
        default_policy = self.default_account_policy \
1568
            if is_account_policy else self.default_container_policy
1569
        for k in policy.keys():
1570
            if policy[k] == '':
1571
                policy[k] = default_policy.get(k)
1572
        for k, v in policy.iteritems():
1573
            if k == 'quota':
1574
                q = int(v)  # May raise ValueError.
1575
                if q < 0:
1576
                    raise ValueError
1577
            elif k == 'versioning':
1578
                if v not in ['auto', 'none']:
1579
                    raise ValueError
1580
            else:
1581
                raise ValueError
1582

    
1583
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1584
        default_policy = self.default_account_policy \
1585
            if is_account_policy else self.default_container_policy
1586
        if replace:
1587
            for k, v in default_policy.iteritems():
1588
                if k not in policy:
1589
                    policy[k] = v
1590
        self.node.policy_set(node, policy)
1591

    
1592
    def _get_policy(self, node, is_account_policy=True):
1593
        default_policy = self.default_account_policy \
1594
            if is_account_policy else self.default_container_policy
1595
        policy = default_policy.copy()
1596
        policy.update(self.node.policy_get(node))
1597
        return policy
1598

    
1599
    def _apply_versioning(self, account, container, version_id,
1600
                          update_statistics_ancestors_depth=None):
1601
        """Delete the provided version if such is the policy.
1602
           Return size of object removed.
1603
        """
1604

    
1605
        if version_id is None:
1606
            return 0
1607
        path, node = self._lookup_container(account, container)
1608
        versioning = self._get_policy(
1609
            node, is_account_policy=False)['versioning']
1610
        if versioning != 'auto':
1611
            hash, size = self.node.version_remove(
1612
                version_id, update_statistics_ancestors_depth)
1613
            self.store.map_delete(hash)
1614
            return size
1615
        elif self.free_versioning:
1616
            return self.node.version_get_properties(
1617
                version_id, keys=('size',))[0]
1618
        return 0
1619

    
1620
    # Access control functions.
1621

    
1622
    def _check_groups(self, groups):
1623
        # raise ValueError('Bad characters in groups')
1624
        pass
1625

    
1626
    def _check_permissions(self, path, permissions):
1627
        # raise ValueError('Bad characters in permissions')
1628
        pass
1629

    
1630
    def _get_formatted_paths(self, paths):
1631
        formatted = []
1632
        for p in paths:
1633
            node = self.node.node_lookup(p)
1634
            props = None
1635
            if node is not None:
1636
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1637
            if props is not None:
1638
                if props[self.TYPE].split(';', 1)[0].strip() in (
1639
                        'application/directory', 'application/folder'):
1640
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1641
                formatted.append((p, self.MATCH_EXACT))
1642
        return formatted
1643

    
1644
    def _get_permissions_path(self, account, container, name):
1645
        path = '/'.join((account, container, name))
1646
        permission_paths = self.permissions.access_inherit(path)
1647
        permission_paths.sort()
1648
        permission_paths.reverse()
1649
        for p in permission_paths:
1650
            if p == path:
1651
                return p
1652
            else:
1653
                if p.count('/') < 2:
1654
                    continue
1655
                node = self.node.node_lookup(p)
1656
                props = None
1657
                if node is not None:
1658
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1659
                if props is not None:
1660
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1661
                            'application/directory', 'application/folder'):
1662
                        return p
1663
        return None
1664

    
1665
    def _can_read(self, user, account, container, name):
1666
        if user == account:
1667
            return True
1668
        path = '/'.join((account, container, name))
1669
        if self.permissions.public_get(path) is not None:
1670
            return True
1671
        path = self._get_permissions_path(account, container, name)
1672
        if not path:
1673
            raise NotAllowedError
1674
        if (not self.permissions.access_check(path, self.READ, user) and not
1675
                self.permissions.access_check(path, self.WRITE, user)):
1676
            raise NotAllowedError
1677

    
1678
    def _can_write(self, user, account, container, name):
1679
        if user == account:
1680
            return True
1681
        path = '/'.join((account, container, name))
1682
        path = self._get_permissions_path(account, container, name)
1683
        if not path:
1684
            raise NotAllowedError
1685
        if not self.permissions.access_check(path, self.WRITE, user):
1686
            raise NotAllowedError
1687

    
1688
    def _allowed_accounts(self, user):
1689
        allow = set()
1690
        for path in self.permissions.access_list_paths(user):
1691
            allow.add(path.split('/', 1)[0])
1692
        return sorted(allow)
1693

    
1694
    def _allowed_containers(self, user, account):
1695
        allow = set()
1696
        for path in self.permissions.access_list_paths(user, account):
1697
            allow.add(path.split('/', 2)[1])
1698
        return sorted(allow)
1699

    
1700
    # Domain functions
1701

    
1702
    @debug_method
1703
    def get_domain_objects(self, domain, user=None):
1704
        allowed_paths = self.permissions.access_list_paths(
1705
            user, include_owned=user is not None, include_containers=False)
1706
        if not allowed_paths:
1707
            return []
1708
        obj_list = self.node.domain_object_list(
1709
            domain, allowed_paths, CLUSTER_NORMAL)
1710
        return [(path,
1711
                 self._build_metadata(props, user_defined_meta),
1712
                 self.permissions.access_get(path)) for
1713
                path, props, user_defined_meta in obj_list]
1714

    
1715
    # util functions
1716

    
1717
    def _build_metadata(self, props, user_defined=None,
1718
                        include_user_defined=True):
1719
        meta = {'bytes': props[self.SIZE],
1720
                'type': props[self.TYPE],
1721
                'hash': props[self.HASH],
1722
                'version': props[self.SERIAL],
1723
                'version_timestamp': props[self.MTIME],
1724
                'modified_by': props[self.MUSER],
1725
                'uuid': props[self.UUID],
1726
                'checksum': props[self.CHECKSUM]}
1727
        if include_user_defined and user_defined is not None:
1728
            meta.update(user_defined)
1729
        return meta
1730

    
1731
    def _exists(self, node):
1732
        try:
1733
            self._get_version(node)
1734
        except ItemNotExists:
1735
            return False
1736
        else:
1737
            return True
1738

    
1739
    def _unhexlify_hash(self, hash):
1740
        try:
1741
            return binascii.unhexlify(hash)
1742
        except TypeError:
1743
            raise InvalidHash(hash)