Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 3a5994a8

History | View | Annotate | Download (69.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.block_module = load_module(block_module)
213
        self.block_params = block_params
214
        params = {'path': block_path,
215
                  'block_size': self.block_size,
216
                  'hash_algorithm': self.hash_algorithm,
217
                  'umask': block_umask}
218
        params.update(self.block_params)
219
        self.store = self.block_module.Store(**params)
220

    
221
        if queue_module and queue_hosts:
222
            self.queue_module = load_module(queue_module)
223
            params = {'hosts': queue_hosts,
224
                      'exchange': queue_exchange,
225
                      'client_id': QUEUE_CLIENT_ID}
226
            self.queue = self.queue_module.Queue(**params)
227
        else:
228
            class NoQueue:
229
                def send(self, *args):
230
                    pass
231

    
232
                def close(self):
233
                    pass
234

    
235
            self.queue = NoQueue()
236

    
237
        self.astakos_url = astakos_url
238
        self.service_token = service_token
239

    
240
        if not astakos_url or not AstakosClient:
241
            self.astakosclient = DisabledAstakosClient(
242
                astakos_url,
243
                use_pool=True,
244
                pool_size=astakosclient_poolsize)
245
        else:
246
            self.astakosclient = AstakosClient(
247
                astakos_url,
248
                use_pool=True,
249
                pool_size=astakosclient_poolsize)
250

    
251
        self.serials = []
252
        self.messages = []
253

    
254
        self._move_object = partial(self._copy_object, is_move=True)
255

    
256
        self.lock_container_path = False
257

    
258
    def pre_exec(self, lock_container_path=False):
259
        self.lock_container_path = lock_container_path
260
        self.wrapper.execute()
261

    
262
    def post_exec(self, success_status=True):
263
        if success_status:
264
            # send messages produced
265
            for m in self.messages:
266
                self.queue.send(*m)
267

    
268
            # register serials
269
            if self.serials:
270
                self.commission_serials.insert_many(
271
                    self.serials)
272

    
273
                # commit to ensure that the serials are registered
274
                # even if resolve commission fails
275
                self.wrapper.commit()
276

    
277
                # start new transaction
278
                self.wrapper.execute()
279

    
280
                r = self.astakosclient.resolve_commissions(
281
                    token=self.service_token,
282
                    accept_serials=self.serials,
283
                    reject_serials=[])
284
                self.commission_serials.delete_many(
285
                    r['accepted'])
286

    
287
            self.wrapper.commit()
288
        else:
289
            if self.serials:
290
                self.astakosclient.resolve_commissions(
291
                    token=self.service_token,
292
                    accept_serials=[],
293
                    reject_serials=self.serials)
294
            self.wrapper.rollback()
295

    
296
    def close(self):
297
        self.wrapper.close()
298
        self.queue.close()
299

    
300
    @property
301
    def using_external_quotaholder(self):
302
        return not isinstance(self.astakosclient, DisabledAstakosClient)
303

    
304
    @debug_method
305
    def list_accounts(self, user, marker=None, limit=10000):
306
        """Return a list of accounts the user can access."""
307

    
308
        allowed = self._allowed_accounts(user)
309
        start, limit = self._list_limits(allowed, marker, limit)
310
        return allowed[start:start + limit]
311

    
312
    @debug_method
313
    def get_account_meta(
314
            self, user, account, domain, until=None, include_user_defined=True,
315
            external_quota=None):
316
        """Return a dictionary with the account metadata for the domain."""
317

    
318
        path, node = self._lookup_account(account, user == account)
319
        if user != account:
320
            if until or (node is None) or (account not
321
                                           in self._allowed_accounts(user)):
322
                raise NotAllowedError
323
        try:
324
            props = self._get_properties(node, until)
325
            mtime = props[self.MTIME]
326
        except NameError:
327
            props = None
328
            mtime = until
329
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
330
        tstamp = max(tstamp, mtime)
331
        if until is None:
332
            modified = tstamp
333
        else:
334
            modified = self._get_statistics(
335
                node, compute=True)[2]  # Overall last modification.
336
            modified = max(modified, mtime)
337

    
338
        if user != account:
339
            meta = {'name': account}
340
        else:
341
            meta = {}
342
            if props is not None and include_user_defined:
343
                meta.update(
344
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
345
            if until is not None:
346
                meta.update({'until_timestamp': tstamp})
347
            meta.update({'name': account, 'count': count, 'bytes': bytes})
348
            if self.using_external_quotaholder:
349
                external_quota = external_quota or {}
350
                meta['bytes'] = external_quota.get('usage', 0)
351
        meta.update({'modified': modified})
352
        return meta
353

    
354
    @debug_method
355
    def update_account_meta(self, user, account, domain, meta, replace=False):
356
        """Update the metadata associated with the account for the domain."""
357

    
358
        if user != account:
359
            raise NotAllowedError
360
        path, node = self._lookup_account(account, True)
361
        self._put_metadata(user, node, domain, meta, replace,
362
                           update_statistics_ancestors_depth=-1)
363

    
364
    @debug_method
365
    def get_account_groups(self, user, account):
366
        """Return a dictionary with the user groups defined for the account."""
367

    
368
        if user != account:
369
            if account not in self._allowed_accounts(user):
370
                raise NotAllowedError
371
            return {}
372
        self._lookup_account(account, True)
373
        return self.permissions.group_dict(account)
374

    
375
    @debug_method
376
    def update_account_groups(self, user, account, groups, replace=False):
377
        """Update the groups associated with the account."""
378

    
379
        if user != account:
380
            raise NotAllowedError
381
        self._lookup_account(account, True)
382
        self._check_groups(groups)
383
        if replace:
384
            self.permissions.group_destroy(account)
385
        for k, v in groups.iteritems():
386
            if not replace:  # If not already deleted.
387
                self.permissions.group_delete(account, k)
388
            if v:
389
                self.permissions.group_addmany(account, k, v)
390

    
391
    @debug_method
392
    def get_account_policy(self, user, account, external_quota=None):
393
        """Return a dictionary with the account policy."""
394

    
395
        if user != account:
396
            if account not in self._allowed_accounts(user):
397
                raise NotAllowedError
398
            return {}
399
        path, node = self._lookup_account(account, True)
400
        policy = self._get_policy(node, is_account_policy=True)
401
        if self.using_external_quotaholder:
402
            external_quota = external_quota or {}
403
            policy['quota'] = external_quota.get('limit', 0)
404
        return policy
405

    
406
    @debug_method
407
    def update_account_policy(self, user, account, policy, replace=False):
408
        """Update the policy associated with the account."""
409

    
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_account(account, True)
413
        self._check_policy(policy, is_account_policy=True)
414
        self._put_policy(node, policy, replace, is_account_policy=True)
415

    
416
    @debug_method
417
    def put_account(self, user, account, policy=None):
418
        """Create a new account with the given name."""
419

    
420
        policy = policy or {}
421
        if user != account:
422
            raise NotAllowedError
423
        node = self.node.node_lookup(account)
424
        if node is not None:
425
            raise AccountExists('Account already exists')
426
        if policy:
427
            self._check_policy(policy, is_account_policy=True)
428
        node = self._put_path(user, self.ROOTNODE, account,
429
                              update_statistics_ancestors_depth=-1)
430
        self._put_policy(node, policy, True, is_account_policy=True)
431

    
432
    @debug_method
433
    def delete_account(self, user, account):
434
        """Delete the account with the given name."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        node = self.node.node_lookup(account)
439
        if node is None:
440
            return
441
        if not self.node.node_remove(node,
442
                                     update_statistics_ancestors_depth=-1):
443
            raise AccountNotEmpty('Account is not empty')
444
        self.permissions.group_destroy(account)
445

    
446
    @debug_method
447
    def list_containers(self, user, account, marker=None, limit=10000,
448
                        shared=False, until=None, public=False):
449
        """Return a list of containers existing under an account."""
450

    
451
        if user != account:
452
            if until or account not in self._allowed_accounts(user):
453
                raise NotAllowedError
454
            allowed = self._allowed_containers(user, account)
455
            start, limit = self._list_limits(allowed, marker, limit)
456
            return allowed[start:start + limit]
457
        if shared or public:
458
            allowed = set()
459
            if shared:
460
                allowed.update([x.split('/', 2)[1] for x in
461
                               self.permissions.access_list_shared(account)])
462
            if public:
463
                allowed.update([x[0].split('/', 2)[1] for x in
464
                               self.permissions.public_list(account)])
465
            allowed = sorted(allowed)
466
            start, limit = self._list_limits(allowed, marker, limit)
467
            return allowed[start:start + limit]
468
        node = self.node.node_lookup(account)
469
        containers = [x[0] for x in self._list_object_properties(
470
            node, account, '', '/', marker, limit, False, None, [], until)]
471
        start, limit = self._list_limits(
472
            [x[0] for x in containers], marker, limit)
473
        return containers[start:start + limit]
474

    
475
    @debug_method
476
    def list_container_meta(self, user, account, container, domain,
477
                            until=None):
478
        """Return a list of the container's object meta keys for a domain."""
479

    
480
        allowed = []
481
        if user != account:
482
            if until:
483
                raise NotAllowedError
484
            allowed = self.permissions.access_list_paths(
485
                user, '/'.join((account, container)))
486
            if not allowed:
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        before = until if until is not None else inf
490
        allowed = self._get_formatted_paths(allowed)
491
        return self.node.latest_attribute_keys(node, domain, before,
492
                                               CLUSTER_DELETED, allowed)
493

    
494
    @debug_method
495
    def get_container_meta(self, user, account, container, domain, until=None,
496
                           include_user_defined=True):
497
        """Return a dictionary with the container metadata for the domain."""
498

    
499
        if user != account:
500
            if until or container not in self._allowed_containers(user,
501
                                                                  account):
502
                raise NotAllowedError
503
        path, node = self._lookup_container(account, container)
504
        props = self._get_properties(node, until)
505
        mtime = props[self.MTIME]
506
        count, bytes, tstamp = self._get_statistics(node, until)
507
        tstamp = max(tstamp, mtime)
508
        if until is None:
509
            modified = tstamp
510
        else:
511
            modified = self._get_statistics(
512
                node)[2]  # Overall last modification.
513
            modified = max(modified, mtime)
514

    
515
        if user != account:
516
            meta = {'name': container}
517
        else:
518
            meta = {}
519
            if include_user_defined:
520
                meta.update(
521
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
522
            if until is not None:
523
                meta.update({'until_timestamp': tstamp})
524
            meta.update({'name': container, 'count': count, 'bytes': bytes})
525
        meta.update({'modified': modified})
526
        return meta
527

    
528
    @debug_method
529
    def update_container_meta(self, user, account, container, domain, meta,
530
                              replace=False):
531
        """Update the metadata associated with the container for the domain."""
532

    
533
        if user != account:
534
            raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536
        src_version_id, dest_version_id = self._put_metadata(
537
            user, node, domain, meta, replace,
538
            update_statistics_ancestors_depth=0)
539
        if src_version_id is not None:
540
            versioning = self._get_policy(
541
                node, is_account_policy=False)['versioning']
542
            if versioning != 'auto':
543
                self.node.version_remove(src_version_id,
544
                                         update_statistics_ancestors_depth=0)
545

    
546
    @debug_method
547
    def get_container_policy(self, user, account, container):
548
        """Return a dictionary with the container policy."""
549

    
550
        if user != account:
551
            if container not in self._allowed_containers(user, account):
552
                raise NotAllowedError
553
            return {}
554
        path, node = self._lookup_container(account, container)
555
        return self._get_policy(node, is_account_policy=False)
556

    
557
    @debug_method
558
    def update_container_policy(self, user, account, container, policy,
559
                                replace=False):
560
        """Update the policy associated with the container."""
561

    
562
        if user != account:
563
            raise NotAllowedError
564
        path, node = self._lookup_container(account, container)
565
        self._check_policy(policy, is_account_policy=False)
566
        self._put_policy(node, policy, replace, is_account_policy=False)
567

    
568
    @debug_method
569
    def put_container(self, user, account, container, policy=None):
570
        """Create a new container with the given name."""
571

    
572
        policy = policy or {}
573
        if user != account:
574
            raise NotAllowedError
575
        try:
576
            path, node = self._lookup_container(account, container)
577
        except NameError:
578
            pass
579
        else:
580
            raise ContainerExists('Container already exists')
581
        if policy:
582
            self._check_policy(policy, is_account_policy=False)
583
        path = '/'.join((account, container))
584
        node = self._put_path(
585
            user, self._lookup_account(account, True)[1], path,
586
            update_statistics_ancestors_depth=-1)
587
        self._put_policy(node, policy, True, is_account_policy=False)
588

    
589
    @debug_method
590
    def delete_container(self, user, account, container, until=None, prefix='',
591
                         delimiter=None):
592
        """Delete/purge the container with the given name."""
593

    
594
        if user != account:
595
            raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597

    
598
        if until is not None:
599
            hashes, size, serials = self.node.node_purge_children(
600
                node, until, CLUSTER_HISTORY,
601
                update_statistics_ancestors_depth=0)
602
            for h in hashes:
603
                self.store.map_delete(h)
604
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
605
                                          update_statistics_ancestors_depth=0)
606
            if not self.free_versioning:
607
                self._report_size_change(
608
                    user, account, -size, {
609
                        'action': 'container purge',
610
                        'path': path,
611
                        'versions': ','.join(str(i) for i in serials)
612
                    }
613
                )
614
            return
615

    
616
        if not delimiter:
617
            if self._get_statistics(node)[0] > 0:
618
                raise ContainerNotEmpty('Container is not empty')
619
            hashes, size, serials = self.node.node_purge_children(
620
                node, inf, CLUSTER_HISTORY,
621
                update_statistics_ancestors_depth=0)
622
            for h in hashes:
623
                self.store.map_delete(h)
624
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
625
                                          update_statistics_ancestors_depth=0)
626
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
627
            if not self.free_versioning:
628
                self._report_size_change(
629
                    user, account, -size, {
630
                        'action': 'container purge',
631
                        'path': path,
632
                        'versions': ','.join(str(i) for i in serials)
633
                    }
634
                )
635
        else:
636
            # remove only contents
637
            src_names = self._list_objects_no_limit(
638
                user, account, container, prefix='', delimiter=None,
639
                virtual=False, domain=None, keys=[], shared=False, until=None,
640
                size_range=None, all_props=True, public=False)
641
            paths = []
642
            for t in src_names:
643
                path = '/'.join((account, container, t[0]))
644
                node = t[2]
645
                if not self._exists(node):
646
                    continue
647
                src_version_id, dest_version_id = self._put_version_duplicate(
648
                    user, node, size=0, type='', hash=None, checksum='',
649
                    cluster=CLUSTER_DELETED,
650
                    update_statistics_ancestors_depth=1)
651
                del_size = self._apply_versioning(
652
                    account, container, src_version_id,
653
                    update_statistics_ancestors_depth=1)
654
                self._report_size_change(
655
                    user, account, -del_size, {
656
                        'action': 'object delete',
657
                        'path': path,
658
                        'versions': ','.join([str(dest_version_id)])})
659
                self._report_object_change(
660
                    user, account, path, details={'action': 'object delete'})
661
                paths.append(path)
662
            self.permissions.access_clear_bulk(paths)
663

    
664
    def _list_objects(self, user, account, container, prefix, delimiter,
665
                      marker, limit, virtual, domain, keys, shared, until,
666
                      size_range, all_props, public):
667
        if user != account and until:
668
            raise NotAllowedError
669
        if shared and public:
670
            # get shared first
671
            shared_paths = self._list_object_permissions(
672
                user, account, container, prefix, shared=True, public=False)
673
            objects = set()
674
            if shared_paths:
675
                path, node = self._lookup_container(account, container)
676
                shared_paths = self._get_formatted_paths(shared_paths)
677
                objects |= set(self._list_object_properties(
678
                    node, path, prefix, delimiter, marker, limit, virtual,
679
                    domain, keys, until, size_range, shared_paths, all_props))
680

    
681
            # get public
682
            objects |= set(self._list_public_object_properties(
683
                user, account, container, prefix, all_props))
684
            objects = list(objects)
685

    
686
            objects.sort(key=lambda x: x[0])
687
            start, limit = self._list_limits(
688
                [x[0] for x in objects], marker, limit)
689
            return objects[start:start + limit]
690
        elif public:
691
            objects = self._list_public_object_properties(
692
                user, account, container, prefix, all_props)
693
            start, limit = self._list_limits(
694
                [x[0] for x in objects], marker, limit)
695
            return objects[start:start + limit]
696

    
697
        allowed = self._list_object_permissions(
698
            user, account, container, prefix, shared, public)
699
        if shared and not allowed:
700
            return []
701
        path, node = self._lookup_container(account, container)
702
        allowed = self._get_formatted_paths(allowed)
703
        objects = self._list_object_properties(
704
            node, path, prefix, delimiter, marker, limit, virtual, domain,
705
            keys, until, size_range, allowed, all_props)
706
        start, limit = self._list_limits(
707
            [x[0] for x in objects], marker, limit)
708
        return objects[start:start + limit]
709

    
710
    def _list_public_object_properties(self, user, account, container, prefix,
711
                                       all_props):
712
        public = self._list_object_permissions(
713
            user, account, container, prefix, shared=False, public=True)
714
        paths, nodes = self._lookup_objects(public)
715
        path = '/'.join((account, container))
716
        cont_prefix = path + '/'
717
        paths = [x[len(cont_prefix):] for x in paths]
718
        objects = [(p,) + props for p, props in
719
                   zip(paths, self.node.version_lookup_bulk(
720
                       nodes, all_props=all_props))]
721
        return objects
722

    
723
    def _list_objects_no_limit(self, user, account, container, prefix,
724
                               delimiter, virtual, domain, keys, shared, until,
725
                               size_range, all_props, public):
726
        objects = []
727
        while True:
728
            marker = objects[-1] if objects else None
729
            limit = 10000
730
            l = self._list_objects(
731
                user, account, container, prefix, delimiter, marker, limit,
732
                virtual, domain, keys, shared, until, size_range, all_props,
733
                public)
734
            objects.extend(l)
735
            if not l or len(l) < limit:
736
                break
737
        return objects
738

    
739
    def _list_object_permissions(self, user, account, container, prefix,
740
                                 shared, public):
741
        allowed = []
742
        path = '/'.join((account, container, prefix)).rstrip('/')
743
        if user != account:
744
            allowed = self.permissions.access_list_paths(user, path)
745
            if not allowed:
746
                raise NotAllowedError
747
        else:
748
            allowed = set()
749
            if shared:
750
                allowed.update(self.permissions.access_list_shared(path))
751
            if public:
752
                allowed.update(
753
                    [x[0] for x in self.permissions.public_list(path)])
754
            allowed = sorted(allowed)
755
            if not allowed:
756
                return []
757
        return allowed
758

    
759
    @debug_method
760
    def list_objects(self, user, account, container, prefix='', delimiter=None,
761
                     marker=None, limit=10000, virtual=True, domain=None,
762
                     keys=None, shared=False, until=None, size_range=None,
763
                     public=False):
764
        """List (object name, object version_id) under a container."""
765

    
766
        keys = keys or []
767
        return self._list_objects(
768
            user, account, container, prefix, delimiter, marker, limit,
769
            virtual, domain, keys, shared, until, size_range, False, public)
770

    
771
    @debug_method
772
    def list_object_meta(self, user, account, container, prefix='',
773
                         delimiter=None, marker=None, limit=10000,
774
                         virtual=True, domain=None, keys=None, shared=False,
775
                         until=None, size_range=None, public=False):
776
        """Return a list of metadata dicts of objects under a container."""
777

    
778
        keys = keys or []
779
        props = self._list_objects(
780
            user, account, container, prefix, delimiter, marker, limit,
781
            virtual, domain, keys, shared, until, size_range, True, public)
782
        objects = []
783
        for p in props:
784
            if len(p) == 2:
785
                objects.append({'subdir': p[0]})
786
            else:
787
                objects.append({
788
                    'name': p[0],
789
                    'bytes': p[self.SIZE + 1],
790
                    'type': p[self.TYPE + 1],
791
                    'hash': p[self.HASH + 1],
792
                    'version': p[self.SERIAL + 1],
793
                    'version_timestamp': p[self.MTIME + 1],
794
                    'modified': p[self.MTIME + 1] if until is None else None,
795
                    'modified_by': p[self.MUSER + 1],
796
                    'uuid': p[self.UUID + 1],
797
                    'checksum': p[self.CHECKSUM + 1]})
798
        return objects
799

    
800
    @debug_method
801
    def list_object_permissions(self, user, account, container, prefix=''):
802
        """Return a list of paths enforce permissions under a container."""
803

    
804
        return self._list_object_permissions(user, account, container, prefix,
805
                                             True, False)
806

    
807
    @debug_method
808
    def list_object_public(self, user, account, container, prefix=''):
809
        """Return a mapping of object paths to public ids under a container."""
810

    
811
        public = {}
812
        for path, p in self.permissions.public_list('/'.join((account,
813
                                                              container,
814
                                                              prefix))):
815
            public[path] = p
816
        return public
817

    
818
    @debug_method
819
    def get_object_meta(self, user, account, container, name, domain,
820
                        version=None, include_user_defined=True):
821
        """Return a dictionary with the object metadata for the domain."""
822

    
823
        self._can_read(user, account, container, name)
824
        path, node = self._lookup_object(account, container, name)
825
        props = self._get_version(node, version)
826
        if version is None:
827
            modified = props[self.MTIME]
828
        else:
829
            try:
830
                modified = self._get_version(
831
                    node)[self.MTIME]  # Overall last modification.
832
            except NameError:  # Object may be deleted.
833
                del_props = self.node.version_lookup(
834
                    node, inf, CLUSTER_DELETED)
835
                if del_props is None:
836
                    raise ItemNotExists('Object does not exist')
837
                modified = del_props[self.MTIME]
838

    
839
        meta = {}
840
        if include_user_defined:
841
            meta.update(
842
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
843
        meta.update({'name': name,
844
                     'bytes': props[self.SIZE],
845
                     'type': props[self.TYPE],
846
                     'hash': props[self.HASH],
847
                     'version': props[self.SERIAL],
848
                     'version_timestamp': props[self.MTIME],
849
                     'modified': modified,
850
                     'modified_by': props[self.MUSER],
851
                     'uuid': props[self.UUID],
852
                     'checksum': props[self.CHECKSUM]})
853
        return meta
854

    
855
    @debug_method
856
    def update_object_meta(self, user, account, container, name, domain, meta,
857
                           replace=False):
858
        """Update object metadata for a domain and return the new version."""
859

    
860
        self._can_write(user, account, container, name)
861
        path, node = self._lookup_object(account, container, name)
862
        src_version_id, dest_version_id = self._put_metadata(
863
            user, node, domain, meta, replace,
864
            update_statistics_ancestors_depth=1)
865
        self._apply_versioning(account, container, src_version_id,
866
                               update_statistics_ancestors_depth=1)
867
        return dest_version_id
868

    
869
    @debug_method
870
    def get_object_permissions(self, user, account, container, name):
871
        """Return the action allowed on the object, the path
872
        from which the object gets its permissions from,
873
        along with a dictionary containing the permissions."""
874

    
875
        allowed = 'write'
876
        permissions_path = self._get_permissions_path(account, container, name)
877
        if user != account:
878
            if self.permissions.access_check(permissions_path, self.WRITE,
879
                                             user):
880
                allowed = 'write'
881
            elif self.permissions.access_check(permissions_path, self.READ,
882
                                               user):
883
                allowed = 'read'
884
            else:
885
                raise NotAllowedError
886
        self._lookup_object(account, container, name)
887
        return (allowed,
888
                permissions_path,
889
                self.permissions.access_get(permissions_path))
890

    
891
    @debug_method
892
    def update_object_permissions(self, user, account, container, name,
893
                                  permissions):
894
        """Update the permissions associated with the object."""
895

    
896
        if user != account:
897
            raise NotAllowedError
898
        path = self._lookup_object(account, container, name)[0]
899
        self._check_permissions(path, permissions)
900
        self.permissions.access_set(path, permissions)
901
        self._report_sharing_change(user, account, path, {'members':
902
                                    self.permissions.access_members(path)})
903

    
904
    @debug_method
905
    def get_object_public(self, user, account, container, name):
906
        """Return the public id of the object if applicable."""
907

    
908
        self._can_read(user, account, container, name)
909
        path = self._lookup_object(account, container, name)[0]
910
        p = self.permissions.public_get(path)
911
        return p
912

    
913
    @debug_method
914
    def update_object_public(self, user, account, container, name, public):
915
        """Update the public status of the object."""
916

    
917
        self._can_write(user, account, container, name)
918
        path = self._lookup_object(account, container, name)[0]
919
        if not public:
920
            self.permissions.public_unset(path)
921
        else:
922
            self.permissions.public_set(
923
                path, self.public_url_security, self.public_url_alphabet)
924

    
925
    @debug_method
926
    def get_object_hashmap(self, user, account, container, name, version=None):
927
        """Return the object's size and a list with partial hashes."""
928

    
929
        self._can_read(user, account, container, name)
930
        path, node = self._lookup_object(account, container, name)
931
        props = self._get_version(node, version)
932
        if props[self.HASH] is None:
933
            return 0, ()
934
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
935
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
936

    
937
    def _update_object_hash(self, user, account, container, name, size, type,
938
                            hash, checksum, domain, meta, replace_meta,
939
                            permissions, src_node=None, src_version_id=None,
940
                            is_copy=False, report_size_change=True):
941
        if permissions is not None and user != account:
942
            raise NotAllowedError
943
        self._can_write(user, account, container, name)
944
        if permissions is not None:
945
            path = '/'.join((account, container, name))
946
            self._check_permissions(path, permissions)
947

    
948
        account_path, account_node = self._lookup_account(account, True)
949
        container_path, container_node = self._lookup_container(
950
            account, container)
951

    
952
        path, node = self._put_object_node(
953
            container_path, container_node, name)
954
        pre_version_id, dest_version_id = self._put_version_duplicate(
955
            user, node, src_node=src_node, size=size, type=type, hash=hash,
956
            checksum=checksum, is_copy=is_copy,
957
            update_statistics_ancestors_depth=1)
958

    
959
        # Handle meta.
960
        if src_version_id is None:
961
            src_version_id = pre_version_id
962
        self._put_metadata_duplicate(
963
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
964

    
965
        del_size = self._apply_versioning(account, container, pre_version_id,
966
                                          update_statistics_ancestors_depth=1)
967
        size_delta = size - del_size
968
        if size_delta > 0:
969
            # Check account quota.
970
            if not self.using_external_quotaholder:
971
                account_quota = long(self._get_policy(
972
                    account_node, is_account_policy=True)['quota'])
973
                account_usage = self._get_statistics(account_node,
974
                                                     compute=True)[1]
975
                if (account_quota > 0 and account_usage > account_quota):
976
                    raise QuotaError(
977
                        'Account quota exceeded: limit: %s, usage: %s' % (
978
                            account_quota, account_usage))
979

    
980
            # Check container quota.
981
            container_quota = long(self._get_policy(
982
                container_node, is_account_policy=False)['quota'])
983
            container_usage = self._get_statistics(container_node)[1]
984
            if (container_quota > 0 and container_usage > container_quota):
985
                # This must be executed in a transaction, so the version is
986
                # never created if it fails.
987
                raise QuotaError(
988
                    'Container quota exceeded: limit: %s, usage: %s' % (
989
                        container_quota, container_usage
990
                    )
991
                )
992

    
993
        if report_size_change:
994
            self._report_size_change(
995
                user, account, size_delta,
996
                {'action': 'object update', 'path': path,
997
                 'versions': ','.join([str(dest_version_id)])})
998
        if permissions is not None:
999
            self.permissions.access_set(path, permissions)
1000
            self._report_sharing_change(
1001
                user, account, path,
1002
                {'members': self.permissions.access_members(path)})
1003

    
1004
        self._report_object_change(
1005
            user, account, path,
1006
            details={'version': dest_version_id, 'action': 'object update'})
1007
        return dest_version_id
1008

    
1009
    @debug_method
1010
    def update_object_hashmap(self, user, account, container, name, size, type,
1011
                              hashmap, checksum, domain, meta=None,
1012
                              replace_meta=False, permissions=None):
1013
        """Create/update an object's hashmap and return the new version."""
1014

    
1015
        meta = meta or {}
1016
        if size == 0:  # No such thing as an empty hashmap.
1017
            hashmap = [self.put_block('')]
1018
        map = HashMap(self.block_size, self.hash_algorithm)
1019
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1020
        missing = self.store.block_search(map)
1021
        if missing:
1022
            ie = IndexError()
1023
            ie.data = [binascii.hexlify(x) for x in missing]
1024
            raise ie
1025

    
1026
        hash = map.hash()
1027
        hexlified = binascii.hexlify(hash)
1028
        dest_version_id = self._update_object_hash(
1029
            user, account, container, name, size, type, hexlified, checksum,
1030
            domain, meta, replace_meta, permissions)
1031
        self.store.map_put(hash, map)
1032
        return dest_version_id, hexlified
1033

    
1034
    @debug_method
1035
    def update_object_checksum(self, user, account, container, name, version,
1036
                               checksum):
1037
        """Update an object's checksum."""
1038

    
1039
        # Update objects with greater version and same hashmap
1040
        # and size (fix metadata updates).
1041
        self._can_write(user, account, container, name)
1042
        path, node = self._lookup_object(account, container, name)
1043
        props = self._get_version(node, version)
1044
        versions = self.node.node_get_versions(node)
1045
        for x in versions:
1046
            if (x[self.SERIAL] >= int(version) and
1047
                x[self.HASH] == props[self.HASH] and
1048
                    x[self.SIZE] == props[self.SIZE]):
1049
                self.node.version_put_property(
1050
                    x[self.SERIAL], 'checksum', checksum)
1051

    
1052
    def _copy_object(self, user, src_account, src_container, src_name,
1053
                     dest_account, dest_container, dest_name, type,
1054
                     dest_domain=None, dest_meta=None, replace_meta=False,
1055
                     permissions=None, src_version=None, is_move=False,
1056
                     delimiter=None):
1057

    
1058
        report_size_change = not is_move
1059
        dest_meta = dest_meta or {}
1060
        dest_version_ids = []
1061
        self._can_read(user, src_account, src_container, src_name)
1062
        path, node = self._lookup_object(src_account, src_container, src_name)
1063
        # TODO: Will do another fetch of the properties in duplicate version...
1064
        props = self._get_version(
1065
            node, src_version)  # Check to see if source exists.
1066
        src_version_id = props[self.SERIAL]
1067
        hash = props[self.HASH]
1068
        size = props[self.SIZE]
1069
        is_copy = not is_move and (src_account, src_container, src_name) != (
1070
            dest_account, dest_container, dest_name)  # New uuid.
1071
        dest_version_ids.append(self._update_object_hash(
1072
            user, dest_account, dest_container, dest_name, size, type, hash,
1073
            None, dest_domain, dest_meta, replace_meta, permissions,
1074
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1075
            report_size_change=report_size_change))
1076
        if is_move and ((src_account, src_container, src_name) !=
1077
                        (dest_account, dest_container, dest_name)):
1078
            self._delete_object(user, src_account, src_container, src_name,
1079
                                report_size_change=report_size_change)
1080

    
1081
        if delimiter:
1082
            prefix = (src_name + delimiter if not
1083
                      src_name.endswith(delimiter) else src_name)
1084
            src_names = self._list_objects_no_limit(
1085
                user, src_account, src_container, prefix, delimiter=None,
1086
                virtual=False, domain=None, keys=[], shared=False, until=None,
1087
                size_range=None, all_props=True, public=False)
1088
            src_names.sort(key=lambda x: x[2])  # order by nodes
1089
            paths = [elem[0] for elem in src_names]
1090
            nodes = [elem[2] for elem in src_names]
1091
            # TODO: Will do another fetch of the properties
1092
            # in duplicate version...
1093
            props = self._get_versions(nodes)  # Check to see if source exists.
1094

    
1095
            for prop, path, node in zip(props, paths, nodes):
1096
                src_version_id = prop[self.SERIAL]
1097
                hash = prop[self.HASH]
1098
                vtype = prop[self.TYPE]
1099
                size = prop[self.SIZE]
1100
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1101
                    delimiter) else dest_name
1102
                vdest_name = path.replace(prefix, dest_prefix, 1)
1103
                dest_version_ids.append(self._update_object_hash(
1104
                    user, dest_account, dest_container, vdest_name, size,
1105
                    vtype, hash, None, dest_domain, meta={},
1106
                    replace_meta=False, permissions=None, src_node=node,
1107
                    src_version_id=src_version_id, is_copy=is_copy))
1108
                if is_move and ((src_account, src_container, src_name) !=
1109
                                (dest_account, dest_container, dest_name)):
1110
                    self._delete_object(user, src_account, src_container, path)
1111
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1112
                dest_version_ids)
1113

    
1114
    @debug_method
1115
    def copy_object(self, user, src_account, src_container, src_name,
1116
                    dest_account, dest_container, dest_name, type, domain,
1117
                    meta=None, replace_meta=False, permissions=None,
1118
                    src_version=None, delimiter=None):
1119
        """Copy an object's data and metadata."""
1120

    
1121
        meta = meta or {}
1122
        dest_version_id = self._copy_object(
1123
            user, src_account, src_container, src_name, dest_account,
1124
            dest_container, dest_name, type, domain, meta, replace_meta,
1125
            permissions, src_version, False, delimiter)
1126
        return dest_version_id
1127

    
1128
    @debug_method
1129
    def move_object(self, user, src_account, src_container, src_name,
1130
                    dest_account, dest_container, dest_name, type, domain,
1131
                    meta=None, replace_meta=False, permissions=None,
1132
                    delimiter=None):
1133
        """Move an object's data and metadata."""
1134

    
1135
        meta = meta or {}
1136
        if user != src_account:
1137
            raise NotAllowedError
1138
        dest_version_id = self._move_object(
1139
            user, src_account, src_container, src_name, dest_account,
1140
            dest_container, dest_name, type, domain, meta, replace_meta,
1141
            permissions, None, delimiter=delimiter)
1142
        return dest_version_id
1143

    
1144
    def _delete_object(self, user, account, container, name, until=None,
1145
                       delimiter=None, report_size_change=True):
1146
        if user != account:
1147
            raise NotAllowedError
1148

    
1149
        if until is not None:
1150
            path = '/'.join((account, container, name))
1151
            node = self.node.node_lookup(path)
1152
            if node is None:
1153
                return
1154
            hashes = []
1155
            size = 0
1156
            serials = []
1157
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1158
                                           update_statistics_ancestors_depth=1)
1159
            hashes += h
1160
            size += s
1161
            serials += v
1162
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1163
                                           update_statistics_ancestors_depth=1)
1164
            hashes += h
1165
            if not self.free_versioning:
1166
                size += s
1167
            serials += v
1168
            for h in hashes:
1169
                self.store.map_delete(h)
1170
            self.node.node_purge(node, until, CLUSTER_DELETED,
1171
                                 update_statistics_ancestors_depth=1)
1172
            try:
1173
                self._get_version(node)
1174
            except NameError:
1175
                self.permissions.access_clear(path)
1176
            self._report_size_change(
1177
                user, account, -size, {
1178
                    'action': 'object purge',
1179
                    'path': path,
1180
                    'versions': ','.join(str(i) for i in serials)
1181
                }
1182
            )
1183
            return
1184

    
1185
        path, node = self._lookup_object(account, container, name)
1186
        if not self._exists(node):
1187
            raise ItemNotExists('Object is deleted.')
1188
        src_version_id, dest_version_id = self._put_version_duplicate(
1189
            user, node, size=0, type='', hash=None, checksum='',
1190
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1191
        del_size = self._apply_versioning(account, container, src_version_id,
1192
                                          update_statistics_ancestors_depth=1)
1193
        if report_size_change:
1194
            self._report_size_change(
1195
                user, account, -del_size,
1196
                {'action': 'object delete',
1197
                 'path': path,
1198
                 'versions': ','.join([str(dest_version_id)])})
1199
        self._report_object_change(
1200
            user, account, path, details={'action': 'object delete'})
1201
        self.permissions.access_clear(path)
1202

    
1203
        if delimiter:
1204
            prefix = name + delimiter if not name.endswith(delimiter) else name
1205
            src_names = self._list_objects_no_limit(
1206
                user, account, container, prefix, delimiter=None,
1207
                virtual=False, domain=None, keys=[], shared=False, until=None,
1208
                size_range=None, all_props=True, public=False)
1209
            paths = []
1210
            for t in src_names:
1211
                path = '/'.join((account, container, t[0]))
1212
                node = t[2]
1213
                if not self._exists(node):
1214
                    continue
1215
                src_version_id, dest_version_id = self._put_version_duplicate(
1216
                    user, node, size=0, type='', hash=None, checksum='',
1217
                    cluster=CLUSTER_DELETED,
1218
                    update_statistics_ancestors_depth=1)
1219
                del_size = self._apply_versioning(
1220
                    account, container, src_version_id,
1221
                    update_statistics_ancestors_depth=1)
1222
                if report_size_change:
1223
                    self._report_size_change(
1224
                        user, account, -del_size,
1225
                        {'action': 'object delete',
1226
                         'path': path,
1227
                         'versions': ','.join([str(dest_version_id)])})
1228
                self._report_object_change(
1229
                    user, account, path, details={'action': 'object delete'})
1230
                paths.append(path)
1231
            self.permissions.access_clear_bulk(paths)
1232

    
1233
    @debug_method
1234
    def delete_object(self, user, account, container, name, until=None,
1235
                      prefix='', delimiter=None):
1236
        """Delete/purge an object."""
1237

    
1238
        self._delete_object(user, account, container, name, until, delimiter)
1239

    
1240
    @debug_method
1241
    def list_versions(self, user, account, container, name):
1242
        """Return a list of all object (version, version_timestamp) tuples."""
1243

    
1244
        self._can_read(user, account, container, name)
1245
        path, node = self._lookup_object(account, container, name)
1246
        versions = self.node.node_get_versions(node)
1247
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1248
                x[self.CLUSTER] != CLUSTER_DELETED]
1249

    
1250
    @debug_method
1251
    def get_uuid(self, user, uuid):
1252
        """Return the (account, container, name) for the UUID given."""
1253

    
1254
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1255
        if info is None:
1256
            raise NameError
1257
        path, serial = info
1258
        account, container, name = path.split('/', 2)
1259
        self._can_read(user, account, container, name)
1260
        return (account, container, name)
1261

    
1262
    @debug_method
1263
    def get_public(self, user, public):
1264
        """Return the (account, container, name) for the public id given."""
1265

    
1266
        path = self.permissions.public_path(public)
1267
        if path is None:
1268
            raise NameError
1269
        account, container, name = path.split('/', 2)
1270
        self._can_read(user, account, container, name)
1271
        return (account, container, name)
1272

    
1273
    def get_block(self, hash):
1274
        """Return a block's data."""
1275

    
1276
        logger.debug("get_block: %s", hash)
1277
        block = self.store.block_get(self._unhexlify_hash(hash))
1278
        if not block:
1279
            raise ItemNotExists('Block does not exist')
1280
        return block
1281

    
1282
    def put_block(self, data):
1283
        """Store a block and return the hash."""
1284

    
1285
        logger.debug("put_block: %s", len(data))
1286
        return binascii.hexlify(self.store.block_put(data))
1287

    
1288
    def update_block(self, hash, data, offset=0):
1289
        """Update a known block and return the hash."""
1290

    
1291
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1292
        if offset == 0 and len(data) == self.block_size:
1293
            return self.put_block(data)
1294
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1295
        return binascii.hexlify(h)
1296

    
1297
    # Path functions.
1298

    
1299
    def _generate_uuid(self):
1300
        return str(uuidlib.uuid4())
1301

    
1302
    def _put_object_node(self, path, parent, name):
1303
        path = '/'.join((path, name))
1304
        node = self.node.node_lookup(path)
1305
        if node is None:
1306
            node = self.node.node_create(parent, path)
1307
        return path, node
1308

    
1309
    def _put_path(self, user, parent, path,
1310
                  update_statistics_ancestors_depth=None):
1311
        node = self.node.node_create(parent, path)
1312
        self.node.version_create(node, None, 0, '', None, user,
1313
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1314
                                 update_statistics_ancestors_depth)
1315
        return node
1316

    
1317
    def _lookup_account(self, account, create=True):
1318
        for_update = True if create else False
1319
        node = self.node.node_lookup(account, for_update=for_update)
1320
        if node is None and create:
1321
            node = self._put_path(
1322
                account, self.ROOTNODE, account,
1323
                update_statistics_ancestors_depth=-1)  # User is account.
1324
        return account, node
1325

    
1326
    def _lookup_container(self, account, container):
1327
        for_update = True if self.lock_container_path else False
1328
        path = '/'.join((account, container))
1329
        node = self.node.node_lookup(path, for_update)
1330
        if node is None:
1331
            raise ItemNotExists('Container does not exist')
1332
        return path, node
1333

    
1334
    def _lookup_object(self, account, container, name):
1335
        path = '/'.join((account, container, name))
1336
        node = self.node.node_lookup(path)
1337
        if node is None:
1338
            raise ItemNotExists('Object does not exist')
1339
        return path, node
1340

    
1341
    def _lookup_objects(self, paths):
1342
        nodes = self.node.node_lookup_bulk(paths)
1343
        return paths, nodes
1344

    
1345
    def _get_properties(self, node, until=None):
1346
        """Return properties until the timestamp given."""
1347

    
1348
        before = until if until is not None else inf
1349
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1350
        if props is None and until is not None:
1351
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1352
        if props is None:
1353
            raise ItemNotExists('Path does not exist')
1354
        return props
1355

    
1356
    def _get_statistics(self, node, until=None, compute=False):
1357
        """Return (count, sum of size, timestamp) of everything under node."""
1358

    
1359
        if until is not None:
1360
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1361
        elif compute:
1362
            stats = self.node.statistics_latest(node,
1363
                                                except_cluster=CLUSTER_DELETED)
1364
        else:
1365
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1366
        if stats is None:
1367
            stats = (0, 0, 0)
1368
        return stats
1369

    
1370
    def _get_version(self, node, version=None):
1371
        if version is None:
1372
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1373
            if props is None:
1374
                raise ItemNotExists('Object does not exist')
1375
        else:
1376
            try:
1377
                version = int(version)
1378
            except ValueError:
1379
                raise VersionNotExists('Version does not exist')
1380
            props = self.node.version_get_properties(version, node=node)
1381
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1382
                raise VersionNotExists('Version does not exist')
1383
        return props
1384

    
1385
    def _get_versions(self, nodes):
1386
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1387

    
1388
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1389
                               type=None, hash=None, checksum=None,
1390
                               cluster=CLUSTER_NORMAL, is_copy=False,
1391
                               update_statistics_ancestors_depth=None):
1392
        """Create a new version of the node."""
1393

    
1394
        props = self.node.version_lookup(
1395
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1396
        if props is not None:
1397
            src_version_id = props[self.SERIAL]
1398
            src_hash = props[self.HASH]
1399
            src_size = props[self.SIZE]
1400
            src_type = props[self.TYPE]
1401
            src_checksum = props[self.CHECKSUM]
1402
        else:
1403
            src_version_id = None
1404
            src_hash = None
1405
            src_size = 0
1406
            src_type = ''
1407
            src_checksum = ''
1408
        if size is None:  # Set metadata.
1409
            hash = src_hash  # This way hash can be set to None
1410
                             # (account or container).
1411
            size = src_size
1412
        if type is None:
1413
            type = src_type
1414
        if checksum is None:
1415
            checksum = src_checksum
1416
        uuid = self._generate_uuid(
1417
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1418

    
1419
        if src_node is None:
1420
            pre_version_id = src_version_id
1421
        else:
1422
            pre_version_id = None
1423
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1424
            if props is not None:
1425
                pre_version_id = props[self.SERIAL]
1426
        if pre_version_id is not None:
1427
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1428
                                        update_statistics_ancestors_depth)
1429

    
1430
        dest_version_id, mtime = self.node.version_create(
1431
            node, hash, size, type, src_version_id, user, uuid, checksum,
1432
            cluster, update_statistics_ancestors_depth)
1433

    
1434
        self.node.attribute_unset_is_latest(node, dest_version_id)
1435

    
1436
        return pre_version_id, dest_version_id
1437

    
1438
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1439
                                node, meta, replace=False):
1440
        if src_version_id is not None:
1441
            self.node.attribute_copy(src_version_id, dest_version_id)
1442
        if not replace:
1443
            self.node.attribute_del(dest_version_id, domain, (
1444
                k for k, v in meta.iteritems() if v == ''))
1445
            self.node.attribute_set(dest_version_id, domain, node, (
1446
                (k, v) for k, v in meta.iteritems() if v != ''))
1447
        else:
1448
            self.node.attribute_del(dest_version_id, domain)
1449
            self.node.attribute_set(dest_version_id, domain, node, ((
1450
                k, v) for k, v in meta.iteritems()))
1451

    
1452
    def _put_metadata(self, user, node, domain, meta, replace=False,
1453
                      update_statistics_ancestors_depth=None):
1454
        """Create a new version and store metadata."""
1455

    
1456
        src_version_id, dest_version_id = self._put_version_duplicate(
1457
            user, node,
1458
            update_statistics_ancestors_depth=
1459
            update_statistics_ancestors_depth)
1460
        self._put_metadata_duplicate(
1461
            src_version_id, dest_version_id, domain, node, meta, replace)
1462
        return src_version_id, dest_version_id
1463

    
1464
    def _list_limits(self, listing, marker, limit):
1465
        start = 0
1466
        if marker:
1467
            try:
1468
                start = listing.index(marker) + 1
1469
            except ValueError:
1470
                pass
1471
        if not limit or limit > 10000:
1472
            limit = 10000
1473
        return start, limit
1474

    
1475
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1476
                                marker=None, limit=10000, virtual=True,
1477
                                domain=None, keys=None, until=None,
1478
                                size_range=None, allowed=None,
1479
                                all_props=False):
1480
        keys = keys or []
1481
        allowed = allowed or []
1482
        cont_prefix = path + '/'
1483
        prefix = cont_prefix + prefix
1484
        start = cont_prefix + marker if marker else None
1485
        before = until if until is not None else inf
1486
        filterq = keys if domain else []
1487
        sizeq = size_range
1488

    
1489
        objects, prefixes = self.node.latest_version_list(
1490
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1491
            allowed, domain, filterq, sizeq, all_props)
1492
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1493
        objects.sort(key=lambda x: x[0])
1494
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1495
        return objects
1496

    
1497
    # Reporting functions.
1498

    
1499
    @debug_method
1500
    def _report_size_change(self, user, account, size, details=None):
1501
        details = details or {}
1502

    
1503
        if size == 0:
1504
            return
1505

    
1506
        account_node = self._lookup_account(account, True)[1]
1507
        total = self._get_statistics(account_node, compute=True)[1]
1508
        details.update({'user': user, 'total': total})
1509
        self.messages.append(
1510
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1511
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1512

    
1513
        if not self.using_external_quotaholder:
1514
            return
1515

    
1516
        try:
1517
            name = details['path'] if 'path' in details else ''
1518
            serial = self.astakosclient.issue_one_commission(
1519
                token=self.service_token,
1520
                holder=account,
1521
                source=DEFAULT_SOURCE,
1522
                provisions={'pithos.diskspace': size},
1523
                name=name)
1524
        except BaseException, e:
1525
            raise QuotaError(e)
1526
        else:
1527
            self.serials.append(serial)
1528

    
1529
    @debug_method
1530
    def _report_object_change(self, user, account, path, details=None):
1531
        details = details or {}
1532
        details.update({'user': user})
1533
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1534
                              account, QUEUE_INSTANCE_ID, 'object', path,
1535
                              details))
1536

    
1537
    @debug_method
1538
    def _report_sharing_change(self, user, account, path, details=None):
1539
        details = details or {}
1540
        details.update({'user': user})
1541
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1542
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1543
                              details))
1544

    
1545
    # Policy functions.
1546

    
1547
    def _check_policy(self, policy, is_account_policy=True):
1548
        default_policy = self.default_account_policy \
1549
            if is_account_policy else self.default_container_policy
1550
        for k in policy.keys():
1551
            if policy[k] == '':
1552
                policy[k] = default_policy.get(k)
1553
        for k, v in policy.iteritems():
1554
            if k == 'quota':
1555
                q = int(v)  # May raise ValueError.
1556
                if q < 0:
1557
                    raise ValueError
1558
            elif k == 'versioning':
1559
                if v not in ['auto', 'none']:
1560
                    raise ValueError
1561
            else:
1562
                raise ValueError
1563

    
1564
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1565
        default_policy = self.default_account_policy \
1566
            if is_account_policy else self.default_container_policy
1567
        if replace:
1568
            for k, v in default_policy.iteritems():
1569
                if k not in policy:
1570
                    policy[k] = v
1571
        self.node.policy_set(node, policy)
1572

    
1573
    def _get_policy(self, node, is_account_policy=True):
1574
        default_policy = self.default_account_policy \
1575
            if is_account_policy else self.default_container_policy
1576
        policy = default_policy.copy()
1577
        policy.update(self.node.policy_get(node))
1578
        return policy
1579

    
1580
    def _apply_versioning(self, account, container, version_id,
1581
                          update_statistics_ancestors_depth=None):
1582
        """Delete the provided version if such is the policy.
1583
           Return size of object removed.
1584
        """
1585

    
1586
        if version_id is None:
1587
            return 0
1588
        path, node = self._lookup_container(account, container)
1589
        versioning = self._get_policy(
1590
            node, is_account_policy=False)['versioning']
1591
        if versioning != 'auto':
1592
            hash, size = self.node.version_remove(
1593
                version_id, update_statistics_ancestors_depth)
1594
            self.store.map_delete(hash)
1595
            return size
1596
        elif self.free_versioning:
1597
            return self.node.version_get_properties(
1598
                version_id, keys=('size',))[0]
1599
        return 0
1600

    
1601
    # Access control functions.
1602

    
1603
    def _check_groups(self, groups):
1604
        # raise ValueError('Bad characters in groups')
1605
        pass
1606

    
1607
    def _check_permissions(self, path, permissions):
1608
        # raise ValueError('Bad characters in permissions')
1609
        pass
1610

    
1611
    def _get_formatted_paths(self, paths):
1612
        formatted = []
1613
        for p in paths:
1614
            node = self.node.node_lookup(p)
1615
            props = None
1616
            if node is not None:
1617
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1618
            if props is not None:
1619
                if props[self.TYPE].split(';', 1)[0].strip() in (
1620
                        'application/directory', 'application/folder'):
1621
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1622
                formatted.append((p, self.MATCH_EXACT))
1623
        return formatted
1624

    
1625
    def _get_permissions_path(self, account, container, name):
1626
        path = '/'.join((account, container, name))
1627
        permission_paths = self.permissions.access_inherit(path)
1628
        permission_paths.sort()
1629
        permission_paths.reverse()
1630
        for p in permission_paths:
1631
            if p == path:
1632
                return p
1633
            else:
1634
                if p.count('/') < 2:
1635
                    continue
1636
                node = self.node.node_lookup(p)
1637
                props = None
1638
                if node is not None:
1639
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1640
                if props is not None:
1641
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1642
                            'application/directory', 'application/folder'):
1643
                        return p
1644
        return None
1645

    
1646
    def _can_read(self, user, account, container, name):
1647
        if user == account:
1648
            return True
1649
        path = '/'.join((account, container, name))
1650
        if self.permissions.public_get(path) is not None:
1651
            return True
1652
        path = self._get_permissions_path(account, container, name)
1653
        if not path:
1654
            raise NotAllowedError
1655
        if (not self.permissions.access_check(path, self.READ, user) and not
1656
                self.permissions.access_check(path, self.WRITE, user)):
1657
            raise NotAllowedError
1658

    
1659
    def _can_write(self, user, account, container, name):
1660
        if user == account:
1661
            return True
1662
        path = '/'.join((account, container, name))
1663
        path = self._get_permissions_path(account, container, name)
1664
        if not path:
1665
            raise NotAllowedError
1666
        if not self.permissions.access_check(path, self.WRITE, user):
1667
            raise NotAllowedError
1668

    
1669
    def _allowed_accounts(self, user):
1670
        allow = set()
1671
        for path in self.permissions.access_list_paths(user):
1672
            allow.add(path.split('/', 1)[0])
1673
        return sorted(allow)
1674

    
1675
    def _allowed_containers(self, user, account):
1676
        allow = set()
1677
        for path in self.permissions.access_list_paths(user, account):
1678
            allow.add(path.split('/', 2)[1])
1679
        return sorted(allow)
1680

    
1681
    # Domain functions
1682

    
1683
    @debug_method
1684
    def get_domain_objects(self, domain, user=None):
1685
        allowed_paths = self.permissions.access_list_paths(
1686
            user, include_owned=user is not None, include_containers=False)
1687
        if not allowed_paths:
1688
            return []
1689
        obj_list = self.node.domain_object_list(
1690
            domain, allowed_paths, CLUSTER_NORMAL)
1691
        return [(path,
1692
                 self._build_metadata(props, user_defined_meta),
1693
                 self.permissions.access_get(path)) for
1694
                path, props, user_defined_meta in obj_list]
1695

    
1696
    # util functions
1697

    
1698
    def _build_metadata(self, props, user_defined=None,
1699
                        include_user_defined=True):
1700
        meta = {'bytes': props[self.SIZE],
1701
                'type': props[self.TYPE],
1702
                'hash': props[self.HASH],
1703
                'version': props[self.SERIAL],
1704
                'version_timestamp': props[self.MTIME],
1705
                'modified_by': props[self.MUSER],
1706
                'uuid': props[self.UUID],
1707
                'checksum': props[self.CHECKSUM]}
1708
        if include_user_defined and user_defined is not None:
1709
            meta.update(user_defined)
1710
        return meta
1711

    
1712
    def _has_read_access(self, user, path):
1713
        try:
1714
            account, container, object = path.split('/', 2)
1715
        except ValueError:
1716
            raise ValueError('Invalid object path')
1717

    
1718
        assert isinstance(user, basestring), "Invalid user"
1719

    
1720
        try:
1721
            self._can_read(user, account, container, object)
1722
        except NotAllowedError:
1723
            return False
1724
        else:
1725
            return True
1726

    
1727
    def _exists(self, node):
1728
        try:
1729
            self._get_version(node)
1730
        except ItemNotExists:
1731
            return False
1732
        else:
1733
            return True
1734

    
1735
    def _unhexlify_hash(self, hash):
1736
        try:
1737
            return binascii.unhexlify(hash)
1738
        except TypeError:
1739
            raise InvalidHash(hash)