Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ f6d499b0

History | View | Annotate | Download (69.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
52

    
53

    
54
class DisabledAstakosClient(object):
55
    def __init__(self, *args, **kwargs):
56
        self.args = args
57
        self.kwargs = kwargs
58

    
59
    def __getattr__(self, name):
60
        m = ("AstakosClient has been disabled, "
61
             "yet an attempt to access it was made")
62
        raise AssertionError(m)
63

    
64

    
65
# Stripped-down version of the HashMap class found in tools.
66

    
67
class HashMap(list):
68

    
69
    def __init__(self, blocksize, blockhash):
70
        super(HashMap, self).__init__()
71
        self.blocksize = blocksize
72
        self.blockhash = blockhash
73

    
74
    def _hash_raw(self, v):
75
        h = hashlib.new(self.blockhash)
76
        h.update(v)
77
        return h.digest()
78

    
79
    def hash(self):
80
        if len(self) == 0:
81
            return self._hash_raw('')
82
        if len(self) == 1:
83
            return self.__getitem__(0)
84

    
85
        h = list(self)
86
        s = 2
87
        while s < len(h):
88
            s = s * 2
89
        h += [('\x00' * len(h[0]))] * (s - len(h))
90
        while len(h) > 1:
91
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
92
        return h[0]
93

    
94
# Default modules and settings.
95
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
96
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
97
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
98
DEFAULT_BLOCK_PATH = 'data/'
99
DEFAULT_BLOCK_UMASK = 0o022
100
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
101
DEFAULT_HASH_ALGORITHM = 'sha256'
102
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
103
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
104
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
105
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
106
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
107
                               'abcdefghijklmnopqrstuvwxyz'
108
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
109
DEFAULT_PUBLIC_URL_SECURITY = 16
110

    
111
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
112
QUEUE_CLIENT_ID = 'pithos'
113
QUEUE_INSTANCE_ID = '1'
114

    
115
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
116

    
117
inf = float('inf')
118

    
119
ULTIMATE_ANSWER = 42
120

    
121
DEFAULT_SOURCE = 'system'
122

    
123
logger = logging.getLogger(__name__)
124

    
125

    
126
def debug_method(func):
127
    @wraps(func)
128
    def wrapper(self, *args, **kw):
129
        try:
130
            result = func(self, *args, **kw)
131
            return result
132
        except:
133
            result = format_exc()
134
            raise
135
        finally:
136
            all_args = map(repr, args)
137
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
138
            logger.debug(">>> %s(%s) <<< %s" % (
139
                func.__name__, ', '.join(all_args).rstrip(', '), result))
140
    return wrapper
141

    
142

    
143
class ModularBackend(BaseBackend):
144
    """A modular backend.
145

146
    Uses modules for SQL functions and storage.
147
    """
148

    
149
    def __init__(self, db_module=None, db_connection=None,
150
                 block_module=None, block_path=None, block_umask=None,
151
                 block_size=None, hash_algorithm=None,
152
                 queue_module=None, queue_hosts=None, queue_exchange=None,
153
                 astakos_url=None, service_token=None,
154
                 astakosclient_poolsize=None,
155
                 free_versioning=True, block_params=None,
156
                 public_url_security=None,
157
                 public_url_alphabet=None,
158
                 account_quota_policy=None,
159
                 container_quota_policy=None,
160
                 container_versioning_policy=None):
161
        db_module = db_module or DEFAULT_DB_MODULE
162
        db_connection = db_connection or DEFAULT_DB_CONNECTION
163
        block_module = block_module or DEFAULT_BLOCK_MODULE
164
        block_path = block_path or DEFAULT_BLOCK_PATH
165
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
166
        block_params = block_params or DEFAULT_BLOCK_PARAMS
167
        block_size = block_size or DEFAULT_BLOCK_SIZE
168
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
169
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
170
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
171
        container_quota_policy = container_quota_policy \
172
            or DEFAULT_CONTAINER_QUOTA
173
        container_versioning_policy = container_versioning_policy \
174
            or DEFAULT_CONTAINER_VERSIONING
175

    
176
        self.default_account_policy = {'quota': account_quota_policy}
177
        self.default_container_policy = {
178
            'quota': container_quota_policy,
179
            'versioning': container_versioning_policy
180
        }
181
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
182
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
183

    
184
        self.public_url_security = (public_url_security or
185
                                    DEFAULT_PUBLIC_URL_SECURITY)
186
        self.public_url_alphabet = (public_url_alphabet or
187
                                    DEFAULT_PUBLIC_URL_ALPHABET)
188

    
189
        self.hash_algorithm = hash_algorithm
190
        self.block_size = block_size
191
        self.free_versioning = free_versioning
192

    
193
        def load_module(m):
194
            __import__(m)
195
            return sys.modules[m]
196

    
197
        self.db_module = load_module(db_module)
198
        self.wrapper = self.db_module.DBWrapper(db_connection)
199
        params = {'wrapper': self.wrapper}
200
        self.permissions = self.db_module.Permissions(**params)
201
        self.config = self.db_module.Config(**params)
202
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
203
        for x in ['READ', 'WRITE']:
204
            setattr(self, x, getattr(self.db_module, x))
205
        self.node = self.db_module.Node(**params)
206
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
207
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
208
                  'MATCH_PREFIX', 'MATCH_EXACT']:
209
            setattr(self, x, getattr(self.db_module, x))
210

    
211
        self.block_module = load_module(block_module)
212
        self.block_params = block_params
213
        params = {'path': block_path,
214
                  'block_size': self.block_size,
215
                  'hash_algorithm': self.hash_algorithm,
216
                  'umask': block_umask}
217
        params.update(self.block_params)
218
        self.store = self.block_module.Store(**params)
219

    
220
        if queue_module and queue_hosts:
221
            self.queue_module = load_module(queue_module)
222
            params = {'hosts': queue_hosts,
223
                      'exchange': queue_exchange,
224
                      'client_id': QUEUE_CLIENT_ID}
225
            self.queue = self.queue_module.Queue(**params)
226
        else:
227
            class NoQueue:
228
                def send(self, *args):
229
                    pass
230

    
231
                def close(self):
232
                    pass
233

    
234
            self.queue = NoQueue()
235

    
236
        self.astakos_url = astakos_url
237
        self.service_token = service_token
238

    
239
        if not astakos_url or not AstakosClient:
240
            self.astakosclient = DisabledAstakosClient(
241
                astakos_url,
242
                use_pool=True,
243
                pool_size=astakosclient_poolsize)
244
        else:
245
            self.astakosclient = AstakosClient(
246
                astakos_url,
247
                use_pool=True,
248
                pool_size=astakosclient_poolsize)
249

    
250
        self.serials = []
251
        self.messages = []
252

    
253
        self._move_object = partial(self._copy_object, is_move=True)
254

    
255
        self.lock_container_path = False
256

    
257
    def pre_exec(self, lock_container_path=False):
258
        self.lock_container_path = lock_container_path
259
        self.wrapper.execute()
260

    
261
    def post_exec(self, success_status=True):
262
        if success_status:
263
            # send messages produced
264
            for m in self.messages:
265
                self.queue.send(*m)
266

    
267
            # register serials
268
            if self.serials:
269
                self.commission_serials.insert_many(
270
                    self.serials)
271

    
272
                # commit to ensure that the serials are registered
273
                # even if resolve commission fails
274
                self.wrapper.commit()
275

    
276
                # start new transaction
277
                self.wrapper.execute()
278

    
279
                r = self.astakosclient.resolve_commissions(
280
                    token=self.service_token,
281
                    accept_serials=self.serials,
282
                    reject_serials=[])
283
                self.commission_serials.delete_many(
284
                    r['accepted'])
285

    
286
            self.wrapper.commit()
287
        else:
288
            if self.serials:
289
                self.astakosclient.resolve_commissions(
290
                    token=self.service_token,
291
                    accept_serials=[],
292
                    reject_serials=self.serials)
293
            self.wrapper.rollback()
294

    
295
    def close(self):
296
        self.wrapper.close()
297
        self.queue.close()
298

    
299
    @property
300
    def using_external_quotaholder(self):
301
        return not isinstance(self.astakosclient, DisabledAstakosClient)
302

    
303
    @debug_method
304
    def list_accounts(self, user, marker=None, limit=10000):
305
        """Return a list of accounts the user can access."""
306

    
307
        allowed = self._allowed_accounts(user)
308
        start, limit = self._list_limits(allowed, marker, limit)
309
        return allowed[start:start + limit]
310

    
311
    @debug_method
312
    def get_account_meta(
313
            self, user, account, domain, until=None, include_user_defined=True,
314
            external_quota=None):
315
        """Return a dictionary with the account metadata for the domain."""
316

    
317
        path, node = self._lookup_account(account, user == account)
318
        if user != account:
319
            if until or (node is None) or (account not
320
                                           in self._allowed_accounts(user)):
321
                raise NotAllowedError
322
        try:
323
            props = self._get_properties(node, until)
324
            mtime = props[self.MTIME]
325
        except NameError:
326
            props = None
327
            mtime = until
328
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
329
        tstamp = max(tstamp, mtime)
330
        if until is None:
331
            modified = tstamp
332
        else:
333
            modified = self._get_statistics(
334
                node, compute=True)[2]  # Overall last modification.
335
            modified = max(modified, mtime)
336

    
337
        if user != account:
338
            meta = {'name': account}
339
        else:
340
            meta = {}
341
            if props is not None and include_user_defined:
342
                meta.update(
343
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
344
            if until is not None:
345
                meta.update({'until_timestamp': tstamp})
346
            meta.update({'name': account, 'count': count, 'bytes': bytes})
347
            if self.using_external_quotaholder:
348
                external_quota = external_quota or {}
349
                meta['bytes'] = external_quota.get('usage', 0)
350
        meta.update({'modified': modified})
351
        return meta
352

    
353
    @debug_method
354
    def update_account_meta(self, user, account, domain, meta, replace=False):
355
        """Update the metadata associated with the account for the domain."""
356

    
357
        if user != account:
358
            raise NotAllowedError
359
        path, node = self._lookup_account(account, True)
360
        self._put_metadata(user, node, domain, meta, replace,
361
                           update_statistics_ancestors_depth=-1)
362

    
363
    @debug_method
364
    def get_account_groups(self, user, account):
365
        """Return a dictionary with the user groups defined for the account."""
366

    
367
        if user != account:
368
            if account not in self._allowed_accounts(user):
369
                raise NotAllowedError
370
            return {}
371
        self._lookup_account(account, True)
372
        return self.permissions.group_dict(account)
373

    
374
    @debug_method
375
    def update_account_groups(self, user, account, groups, replace=False):
376
        """Update the groups associated with the account."""
377

    
378
        if user != account:
379
            raise NotAllowedError
380
        self._lookup_account(account, True)
381
        self._check_groups(groups)
382
        if replace:
383
            self.permissions.group_destroy(account)
384
        for k, v in groups.iteritems():
385
            if not replace:  # If not already deleted.
386
                self.permissions.group_delete(account, k)
387
            if v:
388
                self.permissions.group_addmany(account, k, v)
389

    
390
    @debug_method
391
    def get_account_policy(self, user, account, external_quota=None):
392
        """Return a dictionary with the account policy."""
393

    
394
        if user != account:
395
            if account not in self._allowed_accounts(user):
396
                raise NotAllowedError
397
            return {}
398
        path, node = self._lookup_account(account, True)
399
        policy = self._get_policy(node, is_account_policy=True)
400
        if self.using_external_quotaholder:
401
            external_quota = external_quota or {}
402
            policy['quota'] = external_quota.get('limit', 0)
403
        return policy
404

    
405
    @debug_method
406
    def update_account_policy(self, user, account, policy, replace=False):
407
        """Update the policy associated with the account."""
408

    
409
        if user != account:
410
            raise NotAllowedError
411
        path, node = self._lookup_account(account, True)
412
        self._check_policy(policy, is_account_policy=True)
413
        self._put_policy(node, policy, replace, is_account_policy=True)
414

    
415
    @debug_method
416
    def put_account(self, user, account, policy=None):
417
        """Create a new account with the given name."""
418

    
419
        policy = policy or {}
420
        if user != account:
421
            raise NotAllowedError
422
        node = self.node.node_lookup(account)
423
        if node is not None:
424
            raise AccountExists('Account already exists')
425
        if policy:
426
            self._check_policy(policy, is_account_policy=True)
427
        node = self._put_path(user, self.ROOTNODE, account,
428
                              update_statistics_ancestors_depth=-1)
429
        self._put_policy(node, policy, True, is_account_policy=True)
430

    
431
    @debug_method
432
    def delete_account(self, user, account):
433
        """Delete the account with the given name."""
434

    
435
        if user != account:
436
            raise NotAllowedError
437
        node = self.node.node_lookup(account)
438
        if node is None:
439
            return
440
        if not self.node.node_remove(node,
441
                                     update_statistics_ancestors_depth=-1):
442
            raise AccountNotEmpty('Account is not empty')
443
        self.permissions.group_destroy(account)
444

    
445
    @debug_method
446
    def list_containers(self, user, account, marker=None, limit=10000,
447
                        shared=False, until=None, public=False):
448
        """Return a list of containers existing under an account."""
449

    
450
        if user != account:
451
            if until or account not in self._allowed_accounts(user):
452
                raise NotAllowedError
453
            allowed = self._allowed_containers(user, account)
454
            start, limit = self._list_limits(allowed, marker, limit)
455
            return allowed[start:start + limit]
456
        if shared or public:
457
            allowed = set()
458
            if shared:
459
                allowed.update([x.split('/', 2)[1] for x in
460
                               self.permissions.access_list_shared(account)])
461
            if public:
462
                allowed.update([x[0].split('/', 2)[1] for x in
463
                               self.permissions.public_list(account)])
464
            allowed = sorted(allowed)
465
            start, limit = self._list_limits(allowed, marker, limit)
466
            return allowed[start:start + limit]
467
        node = self.node.node_lookup(account)
468
        containers = [x[0] for x in self._list_object_properties(
469
            node, account, '', '/', marker, limit, False, None, [], until)]
470
        start, limit = self._list_limits(
471
            [x[0] for x in containers], marker, limit)
472
        return containers[start:start + limit]
473

    
474
    @debug_method
475
    def list_container_meta(self, user, account, container, domain,
476
                            until=None):
477
        """Return a list of the container's object meta keys for a domain."""
478

    
479
        allowed = []
480
        if user != account:
481
            if until:
482
                raise NotAllowedError
483
            allowed = self.permissions.access_list_paths(
484
                user, '/'.join((account, container)))
485
            if not allowed:
486
                raise NotAllowedError
487
        path, node = self._lookup_container(account, container)
488
        before = until if until is not None else inf
489
        allowed = self._get_formatted_paths(allowed)
490
        return self.node.latest_attribute_keys(node, domain, before,
491
                                               CLUSTER_DELETED, allowed)
492

    
493
    @debug_method
494
    def get_container_meta(self, user, account, container, domain, until=None,
495
                           include_user_defined=True):
496
        """Return a dictionary with the container metadata for the domain."""
497

    
498
        if user != account:
499
            if until or container not in self._allowed_containers(user,
500
                                                                  account):
501
                raise NotAllowedError
502
        path, node = self._lookup_container(account, container)
503
        props = self._get_properties(node, until)
504
        mtime = props[self.MTIME]
505
        count, bytes, tstamp = self._get_statistics(node, until)
506
        tstamp = max(tstamp, mtime)
507
        if until is None:
508
            modified = tstamp
509
        else:
510
            modified = self._get_statistics(
511
                node)[2]  # Overall last modification.
512
            modified = max(modified, mtime)
513

    
514
        if user != account:
515
            meta = {'name': container}
516
        else:
517
            meta = {}
518
            if include_user_defined:
519
                meta.update(
520
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
521
            if until is not None:
522
                meta.update({'until_timestamp': tstamp})
523
            meta.update({'name': container, 'count': count, 'bytes': bytes})
524
        meta.update({'modified': modified})
525
        return meta
526

    
527
    @debug_method
528
    def update_container_meta(self, user, account, container, domain, meta,
529
                              replace=False):
530
        """Update the metadata associated with the container for the domain."""
531

    
532
        if user != account:
533
            raise NotAllowedError
534
        path, node = self._lookup_container(account, container)
535
        src_version_id, dest_version_id = self._put_metadata(
536
            user, node, domain, meta, replace,
537
            update_statistics_ancestors_depth=0)
538
        if src_version_id is not None:
539
            versioning = self._get_policy(
540
                node, is_account_policy=False)['versioning']
541
            if versioning != 'auto':
542
                self.node.version_remove(src_version_id,
543
                                         update_statistics_ancestors_depth=0)
544

    
545
    @debug_method
546
    def get_container_policy(self, user, account, container):
547
        """Return a dictionary with the container policy."""
548

    
549
        if user != account:
550
            if container not in self._allowed_containers(user, account):
551
                raise NotAllowedError
552
            return {}
553
        path, node = self._lookup_container(account, container)
554
        return self._get_policy(node, is_account_policy=False)
555

    
556
    @debug_method
557
    def update_container_policy(self, user, account, container, policy,
558
                                replace=False):
559
        """Update the policy associated with the container."""
560

    
561
        if user != account:
562
            raise NotAllowedError
563
        path, node = self._lookup_container(account, container)
564
        self._check_policy(policy, is_account_policy=False)
565
        self._put_policy(node, policy, replace, is_account_policy=False)
566

    
567
    @debug_method
568
    def put_container(self, user, account, container, policy=None):
569
        """Create a new container with the given name."""
570

    
571
        policy = policy or {}
572
        if user != account:
573
            raise NotAllowedError
574
        try:
575
            path, node = self._lookup_container(account, container)
576
        except NameError:
577
            pass
578
        else:
579
            raise ContainerExists('Container already exists')
580
        if policy:
581
            self._check_policy(policy, is_account_policy=False)
582
        path = '/'.join((account, container))
583
        node = self._put_path(
584
            user, self._lookup_account(account, True)[1], path,
585
            update_statistics_ancestors_depth=-1)
586
        self._put_policy(node, policy, True, is_account_policy=False)
587

    
588
    @debug_method
589
    def delete_container(self, user, account, container, until=None, prefix='',
590
                         delimiter=None):
591
        """Delete/purge the container with the given name."""
592

    
593
        if user != account:
594
            raise NotAllowedError
595
        path, node = self._lookup_container(account, container)
596

    
597
        if until is not None:
598
            hashes, size, serials = self.node.node_purge_children(
599
                node, until, CLUSTER_HISTORY,
600
                update_statistics_ancestors_depth=0)
601
            for h in hashes:
602
                self.store.map_delete(h)
603
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
604
                                          update_statistics_ancestors_depth=0)
605
            if not self.free_versioning:
606
                self._report_size_change(
607
                    user, account, -size, {
608
                        'action': 'container purge',
609
                        'path': path,
610
                        'versions': ','.join(str(i) for i in serials)
611
                    }
612
                )
613
            return
614

    
615
        if not delimiter:
616
            if self._get_statistics(node)[0] > 0:
617
                raise ContainerNotEmpty('Container is not empty')
618
            hashes, size, serials = self.node.node_purge_children(
619
                node, inf, CLUSTER_HISTORY,
620
                update_statistics_ancestors_depth=0)
621
            for h in hashes:
622
                self.store.map_delete(h)
623
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
624
                                          update_statistics_ancestors_depth=0)
625
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
626
            if not self.free_versioning:
627
                self._report_size_change(
628
                    user, account, -size, {
629
                        'action': 'container purge',
630
                        'path': path,
631
                        'versions': ','.join(str(i) for i in serials)
632
                    }
633
                )
634
        else:
635
            # remove only contents
636
            src_names = self._list_objects_no_limit(
637
                user, account, container, prefix='', delimiter=None,
638
                virtual=False, domain=None, keys=[], shared=False, until=None,
639
                size_range=None, all_props=True, public=False)
640
            paths = []
641
            for t in src_names:
642
                path = '/'.join((account, container, t[0]))
643
                node = t[2]
644
                if not self._exists(node):
645
                    continue
646
                src_version_id, dest_version_id = self._put_version_duplicate(
647
                    user, node, size=0, type='', hash=None, checksum='',
648
                    cluster=CLUSTER_DELETED,
649
                    update_statistics_ancestors_depth=1)
650
                del_size = self._apply_versioning(
651
                    account, container, src_version_id,
652
                    update_statistics_ancestors_depth=1)
653
                self._report_size_change(
654
                    user, account, -del_size, {
655
                        'action': 'object delete',
656
                        'path': path,
657
                        'versions': ','.join([str(dest_version_id)])})
658
                self._report_object_change(
659
                    user, account, path, details={'action': 'object delete'})
660
                paths.append(path)
661
            self.permissions.access_clear_bulk(paths)
662

    
663
    def _list_objects(self, user, account, container, prefix, delimiter,
664
                      marker, limit, virtual, domain, keys, shared, until,
665
                      size_range, all_props, public):
666
        if user != account and until:
667
            raise NotAllowedError
668
        if shared and public:
669
            # get shared first
670
            shared_paths = self._list_object_permissions(
671
                user, account, container, prefix, shared=True, public=False)
672
            objects = set()
673
            if shared_paths:
674
                path, node = self._lookup_container(account, container)
675
                shared_paths = self._get_formatted_paths(shared_paths)
676
                objects |= set(self._list_object_properties(
677
                    node, path, prefix, delimiter, marker, limit, virtual,
678
                    domain, keys, until, size_range, shared_paths, all_props))
679

    
680
            # get public
681
            objects |= set(self._list_public_object_properties(
682
                user, account, container, prefix, all_props))
683
            objects = list(objects)
684

    
685
            objects.sort(key=lambda x: x[0])
686
            start, limit = self._list_limits(
687
                [x[0] for x in objects], marker, limit)
688
            return objects[start:start + limit]
689
        elif public:
690
            objects = self._list_public_object_properties(
691
                user, account, container, prefix, all_props)
692
            start, limit = self._list_limits(
693
                [x[0] for x in objects], marker, limit)
694
            return objects[start:start + limit]
695

    
696
        allowed = self._list_object_permissions(
697
            user, account, container, prefix, shared, public)
698
        if shared and not allowed:
699
            return []
700
        path, node = self._lookup_container(account, container)
701
        allowed = self._get_formatted_paths(allowed)
702
        objects = self._list_object_properties(
703
            node, path, prefix, delimiter, marker, limit, virtual, domain,
704
            keys, until, size_range, allowed, all_props)
705
        start, limit = self._list_limits(
706
            [x[0] for x in objects], marker, limit)
707
        return objects[start:start + limit]
708

    
709
    def _list_public_object_properties(self, user, account, container, prefix,
710
                                       all_props):
711
        public = self._list_object_permissions(
712
            user, account, container, prefix, shared=False, public=True)
713
        paths, nodes = self._lookup_objects(public)
714
        path = '/'.join((account, container))
715
        cont_prefix = path + '/'
716
        paths = [x[len(cont_prefix):] for x in paths]
717
        objects = [(p,) + props for p, props in
718
                   zip(paths, self.node.version_lookup_bulk(
719
                       nodes, all_props=all_props))]
720
        return objects
721

    
722
    def _list_objects_no_limit(self, user, account, container, prefix,
723
                               delimiter, virtual, domain, keys, shared, until,
724
                               size_range, all_props, public):
725
        objects = []
726
        while True:
727
            marker = objects[-1] if objects else None
728
            limit = 10000
729
            l = self._list_objects(
730
                user, account, container, prefix, delimiter, marker, limit,
731
                virtual, domain, keys, shared, until, size_range, all_props,
732
                public)
733
            objects.extend(l)
734
            if not l or len(l) < limit:
735
                break
736
        return objects
737

    
738
    def _list_object_permissions(self, user, account, container, prefix,
739
                                 shared, public):
740
        allowed = []
741
        path = '/'.join((account, container, prefix)).rstrip('/')
742
        if user != account:
743
            allowed = self.permissions.access_list_paths(user, path)
744
            if not allowed:
745
                raise NotAllowedError
746
        else:
747
            allowed = set()
748
            if shared:
749
                allowed.update(self.permissions.access_list_shared(path))
750
            if public:
751
                allowed.update(
752
                    [x[0] for x in self.permissions.public_list(path)])
753
            allowed = sorted(allowed)
754
            if not allowed:
755
                return []
756
        return allowed
757

    
758
    @debug_method
759
    def list_objects(self, user, account, container, prefix='', delimiter=None,
760
                     marker=None, limit=10000, virtual=True, domain=None,
761
                     keys=None, shared=False, until=None, size_range=None,
762
                     public=False):
763
        """List (object name, object version_id) under a container."""
764

    
765
        keys = keys or []
766
        return self._list_objects(
767
            user, account, container, prefix, delimiter, marker, limit,
768
            virtual, domain, keys, shared, until, size_range, False, public)
769

    
770
    @debug_method
771
    def list_object_meta(self, user, account, container, prefix='',
772
                         delimiter=None, marker=None, limit=10000,
773
                         virtual=True, domain=None, keys=None, shared=False,
774
                         until=None, size_range=None, public=False):
775
        """Return a list of metadata dicts of objects under a container."""
776

    
777
        keys = keys or []
778
        props = self._list_objects(
779
            user, account, container, prefix, delimiter, marker, limit,
780
            virtual, domain, keys, shared, until, size_range, True, public)
781
        objects = []
782
        for p in props:
783
            if len(p) == 2:
784
                objects.append({'subdir': p[0]})
785
            else:
786
                objects.append({
787
                    'name': p[0],
788
                    'bytes': p[self.SIZE + 1],
789
                    'type': p[self.TYPE + 1],
790
                    'hash': p[self.HASH + 1],
791
                    'version': p[self.SERIAL + 1],
792
                    'version_timestamp': p[self.MTIME + 1],
793
                    'modified': p[self.MTIME + 1] if until is None else None,
794
                    'modified_by': p[self.MUSER + 1],
795
                    'uuid': p[self.UUID + 1],
796
                    'checksum': p[self.CHECKSUM + 1]})
797
        return objects
798

    
799
    @debug_method
800
    def list_object_permissions(self, user, account, container, prefix=''):
801
        """Return a list of paths enforce permissions under a container."""
802

    
803
        return self._list_object_permissions(user, account, container, prefix,
804
                                             True, False)
805

    
806
    @debug_method
807
    def list_object_public(self, user, account, container, prefix=''):
808
        """Return a mapping of object paths to public ids under a container."""
809

    
810
        public = {}
811
        for path, p in self.permissions.public_list('/'.join((account,
812
                                                              container,
813
                                                              prefix))):
814
            public[path] = p
815
        return public
816

    
817
    @debug_method
818
    def get_object_meta(self, user, account, container, name, domain,
819
                        version=None, include_user_defined=True):
820
        """Return a dictionary with the object metadata for the domain."""
821

    
822
        self._can_read(user, account, container, name)
823
        path, node = self._lookup_object(account, container, name)
824
        props = self._get_version(node, version)
825
        if version is None:
826
            modified = props[self.MTIME]
827
        else:
828
            try:
829
                modified = self._get_version(
830
                    node)[self.MTIME]  # Overall last modification.
831
            except NameError:  # Object may be deleted.
832
                del_props = self.node.version_lookup(
833
                    node, inf, CLUSTER_DELETED)
834
                if del_props is None:
835
                    raise ItemNotExists('Object does not exist')
836
                modified = del_props[self.MTIME]
837

    
838
        meta = {}
839
        if include_user_defined:
840
            meta.update(
841
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
842
        meta.update({'name': name,
843
                     'bytes': props[self.SIZE],
844
                     'type': props[self.TYPE],
845
                     'hash': props[self.HASH],
846
                     'version': props[self.SERIAL],
847
                     'version_timestamp': props[self.MTIME],
848
                     'modified': modified,
849
                     'modified_by': props[self.MUSER],
850
                     'uuid': props[self.UUID],
851
                     'checksum': props[self.CHECKSUM]})
852
        return meta
853

    
854
    @debug_method
855
    def update_object_meta(self, user, account, container, name, domain, meta,
856
                           replace=False):
857
        """Update object metadata for a domain and return the new version."""
858

    
859
        self._can_write(user, account, container, name)
860
        path, node = self._lookup_object(account, container, name)
861
        src_version_id, dest_version_id = self._put_metadata(
862
            user, node, domain, meta, replace,
863
            update_statistics_ancestors_depth=1)
864
        self._apply_versioning(account, container, src_version_id,
865
                               update_statistics_ancestors_depth=1)
866
        return dest_version_id
867

    
868
    @debug_method
869
    def get_object_permissions(self, user, account, container, name):
870
        """Return the action allowed on the object, the path
871
        from which the object gets its permissions from,
872
        along with a dictionary containing the permissions."""
873

    
874
        allowed = 'write'
875
        permissions_path = self._get_permissions_path(account, container, name)
876
        if user != account:
877
            if self.permissions.access_check(permissions_path, self.WRITE,
878
                                             user):
879
                allowed = 'write'
880
            elif self.permissions.access_check(permissions_path, self.READ,
881
                                               user):
882
                allowed = 'read'
883
            else:
884
                raise NotAllowedError
885
        self._lookup_object(account, container, name)
886
        return (allowed,
887
                permissions_path,
888
                self.permissions.access_get(permissions_path))
889

    
890
    @debug_method
891
    def update_object_permissions(self, user, account, container, name,
892
                                  permissions):
893
        """Update the permissions associated with the object."""
894

    
895
        if user != account:
896
            raise NotAllowedError
897
        path = self._lookup_object(account, container, name)[0]
898
        self._check_permissions(path, permissions)
899
        self.permissions.access_set(path, permissions)
900
        self._report_sharing_change(user, account, path, {'members':
901
                                    self.permissions.access_members(path)})
902

    
903
    @debug_method
904
    def get_object_public(self, user, account, container, name):
905
        """Return the public id of the object if applicable."""
906

    
907
        self._can_read(user, account, container, name)
908
        path = self._lookup_object(account, container, name)[0]
909
        p = self.permissions.public_get(path)
910
        return p
911

    
912
    @debug_method
913
    def update_object_public(self, user, account, container, name, public):
914
        """Update the public status of the object."""
915

    
916
        self._can_write(user, account, container, name)
917
        path = self._lookup_object(account, container, name)[0]
918
        if not public:
919
            self.permissions.public_unset(path)
920
        else:
921
            self.permissions.public_set(
922
                path, self.public_url_security, self.public_url_alphabet)
923

    
924
    @debug_method
925
    def get_object_hashmap(self, user, account, container, name, version=None):
926
        """Return the object's size and a list with partial hashes."""
927

    
928
        self._can_read(user, account, container, name)
929
        path, node = self._lookup_object(account, container, name)
930
        props = self._get_version(node, version)
931
        if props[self.HASH] is None:
932
            return 0, ()
933
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
934
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
935

    
936
    def _update_object_hash(self, user, account, container, name, size, type,
937
                            hash, checksum, domain, meta, replace_meta,
938
                            permissions, src_node=None, src_version_id=None,
939
                            is_copy=False, report_size_change=True):
940
        if permissions is not None and user != account:
941
            raise NotAllowedError
942
        self._can_write(user, account, container, name)
943
        if permissions is not None:
944
            path = '/'.join((account, container, name))
945
            self._check_permissions(path, permissions)
946

    
947
        account_path, account_node = self._lookup_account(account, True)
948
        container_path, container_node = self._lookup_container(
949
            account, container)
950

    
951
        path, node = self._put_object_node(
952
            container_path, container_node, name)
953
        pre_version_id, dest_version_id = self._put_version_duplicate(
954
            user, node, src_node=src_node, size=size, type=type, hash=hash,
955
            checksum=checksum, is_copy=is_copy,
956
            update_statistics_ancestors_depth=1)
957

    
958
        # Handle meta.
959
        if src_version_id is None:
960
            src_version_id = pre_version_id
961
        self._put_metadata_duplicate(
962
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
963

    
964
        del_size = self._apply_versioning(account, container, pre_version_id,
965
                                          update_statistics_ancestors_depth=1)
966
        size_delta = size - del_size
967
        if size_delta > 0:
968
            # Check account quota.
969
            if not self.using_external_quotaholder:
970
                account_quota = long(self._get_policy(
971
                    account_node, is_account_policy=True)['quota'])
972
                account_usage = self._get_statistics(account_node,
973
                                                     compute=True)[1]
974
                if (account_quota > 0 and account_usage > account_quota):
975
                    raise QuotaError(
976
                        'Account quota exceeded: limit: %s, usage: %s' % (
977
                            account_quota, account_usage))
978

    
979
            # Check container quota.
980
            container_quota = long(self._get_policy(
981
                container_node, is_account_policy=False)['quota'])
982
            container_usage = self._get_statistics(container_node)[1]
983
            if (container_quota > 0 and container_usage > container_quota):
984
                # This must be executed in a transaction, so the version is
985
                # never created if it fails.
986
                raise QuotaError(
987
                    'Container quota exceeded: limit: %s, usage: %s' % (
988
                        container_quota, container_usage
989
                    )
990
                )
991

    
992
        if report_size_change:
993
            self._report_size_change(
994
                user, account, size_delta,
995
                {'action': 'object update', 'path': path,
996
                 'versions': ','.join([str(dest_version_id)])})
997
        if permissions is not None:
998
            self.permissions.access_set(path, permissions)
999
            self._report_sharing_change(
1000
                user, account, path,
1001
                {'members': self.permissions.access_members(path)})
1002

    
1003
        self._report_object_change(
1004
            user, account, path,
1005
            details={'version': dest_version_id, 'action': 'object update'})
1006
        return dest_version_id
1007

    
1008
    @debug_method
1009
    def update_object_hashmap(self, user, account, container, name, size, type,
1010
                              hashmap, checksum, domain, meta=None,
1011
                              replace_meta=False, permissions=None):
1012
        """Create/update an object's hashmap and return the new version."""
1013

    
1014
        meta = meta or {}
1015
        if size == 0:  # No such thing as an empty hashmap.
1016
            hashmap = [self.put_block('')]
1017
        map = HashMap(self.block_size, self.hash_algorithm)
1018
        map.extend([binascii.unhexlify(x) for x in hashmap])
1019
        missing = self.store.block_search(map)
1020
        if missing:
1021
            ie = IndexError()
1022
            ie.data = [binascii.hexlify(x) for x in missing]
1023
            raise ie
1024

    
1025
        hash = map.hash()
1026
        hexlified = binascii.hexlify(hash)
1027
        dest_version_id = self._update_object_hash(
1028
            user, account, container, name, size, type, hexlified, checksum,
1029
            domain, meta, replace_meta, permissions)
1030
        self.store.map_put(hash, map)
1031
        return dest_version_id, hexlified
1032

    
1033
    @debug_method
1034
    def update_object_checksum(self, user, account, container, name, version,
1035
                               checksum):
1036
        """Update an object's checksum."""
1037

    
1038
        # Update objects with greater version and same hashmap
1039
        # and size (fix metadata updates).
1040
        self._can_write(user, account, container, name)
1041
        path, node = self._lookup_object(account, container, name)
1042
        props = self._get_version(node, version)
1043
        versions = self.node.node_get_versions(node)
1044
        for x in versions:
1045
            if (x[self.SERIAL] >= int(version) and
1046
                x[self.HASH] == props[self.HASH] and
1047
                    x[self.SIZE] == props[self.SIZE]):
1048
                self.node.version_put_property(
1049
                    x[self.SERIAL], 'checksum', checksum)
1050

    
1051
    def _copy_object(self, user, src_account, src_container, src_name,
1052
                     dest_account, dest_container, dest_name, type,
1053
                     dest_domain=None, dest_meta=None, replace_meta=False,
1054
                     permissions=None, src_version=None, is_move=False,
1055
                     delimiter=None):
1056

    
1057
        report_size_change = not is_move
1058
        dest_meta = dest_meta or {}
1059
        dest_version_ids = []
1060
        self._can_read(user, src_account, src_container, src_name)
1061
        path, node = self._lookup_object(src_account, src_container, src_name)
1062
        # TODO: Will do another fetch of the properties in duplicate version...
1063
        props = self._get_version(
1064
            node, src_version)  # Check to see if source exists.
1065
        src_version_id = props[self.SERIAL]
1066
        hash = props[self.HASH]
1067
        size = props[self.SIZE]
1068
        is_copy = not is_move and (src_account, src_container, src_name) != (
1069
            dest_account, dest_container, dest_name)  # New uuid.
1070
        dest_version_ids.append(self._update_object_hash(
1071
            user, dest_account, dest_container, dest_name, size, type, hash,
1072
            None, dest_domain, dest_meta, replace_meta, permissions,
1073
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1074
            report_size_change=report_size_change))
1075
        if is_move and ((src_account, src_container, src_name) !=
1076
                        (dest_account, dest_container, dest_name)):
1077
            self._delete_object(user, src_account, src_container, src_name,
1078
                                report_size_change=report_size_change)
1079

    
1080
        if delimiter:
1081
            prefix = (src_name + delimiter if not
1082
                      src_name.endswith(delimiter) else src_name)
1083
            src_names = self._list_objects_no_limit(
1084
                user, src_account, src_container, prefix, delimiter=None,
1085
                virtual=False, domain=None, keys=[], shared=False, until=None,
1086
                size_range=None, all_props=True, public=False)
1087
            src_names.sort(key=lambda x: x[2])  # order by nodes
1088
            paths = [elem[0] for elem in src_names]
1089
            nodes = [elem[2] for elem in src_names]
1090
            # TODO: Will do another fetch of the properties
1091
            # in duplicate version...
1092
            props = self._get_versions(nodes)  # Check to see if source exists.
1093

    
1094
            for prop, path, node in zip(props, paths, nodes):
1095
                src_version_id = prop[self.SERIAL]
1096
                hash = prop[self.HASH]
1097
                vtype = prop[self.TYPE]
1098
                size = prop[self.SIZE]
1099
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1100
                    delimiter) else dest_name
1101
                vdest_name = path.replace(prefix, dest_prefix, 1)
1102
                dest_version_ids.append(self._update_object_hash(
1103
                    user, dest_account, dest_container, vdest_name, size,
1104
                    vtype, hash, None, dest_domain, meta={},
1105
                    replace_meta=False, permissions=None, src_node=node,
1106
                    src_version_id=src_version_id, is_copy=is_copy))
1107
                if is_move and ((src_account, src_container, src_name) !=
1108
                                (dest_account, dest_container, dest_name)):
1109
                    self._delete_object(user, src_account, src_container, path)
1110
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1111
                dest_version_ids)
1112

    
1113
    @debug_method
1114
    def copy_object(self, user, src_account, src_container, src_name,
1115
                    dest_account, dest_container, dest_name, type, domain,
1116
                    meta=None, replace_meta=False, permissions=None,
1117
                    src_version=None, delimiter=None):
1118
        """Copy an object's data and metadata."""
1119

    
1120
        meta = meta or {}
1121
        dest_version_id = self._copy_object(
1122
            user, src_account, src_container, src_name, dest_account,
1123
            dest_container, dest_name, type, domain, meta, replace_meta,
1124
            permissions, src_version, False, delimiter)
1125
        return dest_version_id
1126

    
1127
    @debug_method
1128
    def move_object(self, user, src_account, src_container, src_name,
1129
                    dest_account, dest_container, dest_name, type, domain,
1130
                    meta=None, replace_meta=False, permissions=None,
1131
                    delimiter=None):
1132
        """Move an object's data and metadata."""
1133

    
1134
        meta = meta or {}
1135
        if user != src_account:
1136
            raise NotAllowedError
1137
        dest_version_id = self._move_object(
1138
            user, src_account, src_container, src_name, dest_account,
1139
            dest_container, dest_name, type, domain, meta, replace_meta,
1140
            permissions, None, delimiter=delimiter)
1141
        return dest_version_id
1142

    
1143
    def _delete_object(self, user, account, container, name, until=None,
1144
                       delimiter=None, report_size_change=True):
1145
        if user != account:
1146
            raise NotAllowedError
1147

    
1148
        if until is not None:
1149
            path = '/'.join((account, container, name))
1150
            node = self.node.node_lookup(path)
1151
            if node is None:
1152
                return
1153
            hashes = []
1154
            size = 0
1155
            serials = []
1156
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1157
                                           update_statistics_ancestors_depth=1)
1158
            hashes += h
1159
            size += s
1160
            serials += v
1161
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1162
                                           update_statistics_ancestors_depth=1)
1163
            hashes += h
1164
            if not self.free_versioning:
1165
                size += s
1166
            serials += v
1167
            for h in hashes:
1168
                self.store.map_delete(h)
1169
            self.node.node_purge(node, until, CLUSTER_DELETED,
1170
                                 update_statistics_ancestors_depth=1)
1171
            try:
1172
                self._get_version(node)
1173
            except NameError:
1174
                self.permissions.access_clear(path)
1175
            self._report_size_change(
1176
                user, account, -size, {
1177
                    'action': 'object purge',
1178
                    'path': path,
1179
                    'versions': ','.join(str(i) for i in serials)
1180
                }
1181
            )
1182
            return
1183

    
1184
        path, node = self._lookup_object(account, container, name)
1185
        if not self._exists(node):
1186
            raise ItemNotExists('Object is deleted.')
1187
        src_version_id, dest_version_id = self._put_version_duplicate(
1188
            user, node, size=0, type='', hash=None, checksum='',
1189
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1190
        del_size = self._apply_versioning(account, container, src_version_id,
1191
                                          update_statistics_ancestors_depth=1)
1192
        if report_size_change:
1193
            self._report_size_change(
1194
                user, account, -del_size,
1195
                {'action': 'object delete',
1196
                 'path': path,
1197
                 'versions': ','.join([str(dest_version_id)])})
1198
        self._report_object_change(
1199
            user, account, path, details={'action': 'object delete'})
1200
        self.permissions.access_clear(path)
1201

    
1202
        if delimiter:
1203
            prefix = name + delimiter if not name.endswith(delimiter) else name
1204
            src_names = self._list_objects_no_limit(
1205
                user, account, container, prefix, delimiter=None,
1206
                virtual=False, domain=None, keys=[], shared=False, until=None,
1207
                size_range=None, all_props=True, public=False)
1208
            paths = []
1209
            for t in src_names:
1210
                path = '/'.join((account, container, t[0]))
1211
                node = t[2]
1212
                if not self._exists(node):
1213
                    continue
1214
                src_version_id, dest_version_id = self._put_version_duplicate(
1215
                    user, node, size=0, type='', hash=None, checksum='',
1216
                    cluster=CLUSTER_DELETED,
1217
                    update_statistics_ancestors_depth=1)
1218
                del_size = self._apply_versioning(
1219
                    account, container, src_version_id,
1220
                    update_statistics_ancestors_depth=1)
1221
                if report_size_change:
1222
                    self._report_size_change(
1223
                        user, account, -del_size,
1224
                        {'action': 'object delete',
1225
                         'path': path,
1226
                         'versions': ','.join([str(dest_version_id)])})
1227
                self._report_object_change(
1228
                    user, account, path, details={'action': 'object delete'})
1229
                paths.append(path)
1230
            self.permissions.access_clear_bulk(paths)
1231

    
1232
    @debug_method
1233
    def delete_object(self, user, account, container, name, until=None,
1234
                      prefix='', delimiter=None):
1235
        """Delete/purge an object."""
1236

    
1237
        self._delete_object(user, account, container, name, until, delimiter)
1238

    
1239
    @debug_method
1240
    def list_versions(self, user, account, container, name):
1241
        """Return a list of all object (version, version_timestamp) tuples."""
1242

    
1243
        self._can_read(user, account, container, name)
1244
        path, node = self._lookup_object(account, container, name)
1245
        versions = self.node.node_get_versions(node)
1246
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1247
                x[self.CLUSTER] != CLUSTER_DELETED]
1248

    
1249
    @debug_method
1250
    def get_uuid(self, user, uuid):
1251
        """Return the (account, container, name) for the UUID given."""
1252

    
1253
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1254
        if info is None:
1255
            raise NameError
1256
        path, serial = info
1257
        account, container, name = path.split('/', 2)
1258
        self._can_read(user, account, container, name)
1259
        return (account, container, name)
1260

    
1261
    @debug_method
1262
    def get_public(self, user, public):
1263
        """Return the (account, container, name) for the public id given."""
1264

    
1265
        path = self.permissions.public_path(public)
1266
        if path is None:
1267
            raise NameError
1268
        account, container, name = path.split('/', 2)
1269
        self._can_read(user, account, container, name)
1270
        return (account, container, name)
1271

    
1272
    def get_block(self, hash):
1273
        """Return a block's data."""
1274

    
1275
        logger.debug("get_block: %s", hash)
1276
        block = self.store.block_get(binascii.unhexlify(hash))
1277
        if not block:
1278
            raise ItemNotExists('Block does not exist')
1279
        return block
1280

    
1281
    def put_block(self, data):
1282
        """Store a block and return the hash."""
1283

    
1284
        logger.debug("put_block: %s", len(data))
1285
        return binascii.hexlify(self.store.block_put(data))
1286

    
1287
    def update_block(self, hash, data, offset=0):
1288
        """Update a known block and return the hash."""
1289

    
1290
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1291
        if offset == 0 and len(data) == self.block_size:
1292
            return self.put_block(data)
1293
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1294
        return binascii.hexlify(h)
1295

    
1296
    # Path functions.
1297

    
1298
    def _generate_uuid(self):
1299
        return str(uuidlib.uuid4())
1300

    
1301
    def _put_object_node(self, path, parent, name):
1302
        path = '/'.join((path, name))
1303
        node = self.node.node_lookup(path)
1304
        if node is None:
1305
            node = self.node.node_create(parent, path)
1306
        return path, node
1307

    
1308
    def _put_path(self, user, parent, path,
1309
                  update_statistics_ancestors_depth=None):
1310
        node = self.node.node_create(parent, path)
1311
        self.node.version_create(node, None, 0, '', None, user,
1312
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1313
                                 update_statistics_ancestors_depth)
1314
        return node
1315

    
1316
    def _lookup_account(self, account, create=True):
1317
        for_update = True if create else False
1318
        node = self.node.node_lookup(account, for_update=for_update)
1319
        if node is None and create:
1320
            node = self._put_path(
1321
                account, self.ROOTNODE, account,
1322
                update_statistics_ancestors_depth=-1)  # User is account.
1323
        return account, node
1324

    
1325
    def _lookup_container(self, account, container):
1326
        for_update = True if self.lock_container_path else False
1327
        path = '/'.join((account, container))
1328
        node = self.node.node_lookup(path, for_update)
1329
        if node is None:
1330
            raise ItemNotExists('Container does not exist')
1331
        return path, node
1332

    
1333
    def _lookup_object(self, account, container, name):
1334
        path = '/'.join((account, container, name))
1335
        node = self.node.node_lookup(path)
1336
        if node is None:
1337
            raise ItemNotExists('Object does not exist')
1338
        return path, node
1339

    
1340
    def _lookup_objects(self, paths):
1341
        nodes = self.node.node_lookup_bulk(paths)
1342
        return paths, nodes
1343

    
1344
    def _get_properties(self, node, until=None):
1345
        """Return properties until the timestamp given."""
1346

    
1347
        before = until if until is not None else inf
1348
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1349
        if props is None and until is not None:
1350
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1351
        if props is None:
1352
            raise ItemNotExists('Path does not exist')
1353
        return props
1354

    
1355
    def _get_statistics(self, node, until=None, compute=False):
1356
        """Return (count, sum of size, timestamp) of everything under node."""
1357

    
1358
        if until is not None:
1359
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1360
        elif compute:
1361
            stats = self.node.statistics_latest(node,
1362
                                                except_cluster=CLUSTER_DELETED)
1363
        else:
1364
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1365
        if stats is None:
1366
            stats = (0, 0, 0)
1367
        return stats
1368

    
1369
    def _get_version(self, node, version=None):
1370
        if version is None:
1371
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1372
            if props is None:
1373
                raise ItemNotExists('Object does not exist')
1374
        else:
1375
            try:
1376
                version = int(version)
1377
            except ValueError:
1378
                raise VersionNotExists('Version does not exist')
1379
            props = self.node.version_get_properties(version, node=node)
1380
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1381
                raise VersionNotExists('Version does not exist')
1382
        return props
1383

    
1384
    def _get_versions(self, nodes):
1385
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1386

    
1387
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1388
                               type=None, hash=None, checksum=None,
1389
                               cluster=CLUSTER_NORMAL, is_copy=False,
1390
                               update_statistics_ancestors_depth=None):
1391
        """Create a new version of the node."""
1392

    
1393
        props = self.node.version_lookup(
1394
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1395
        if props is not None:
1396
            src_version_id = props[self.SERIAL]
1397
            src_hash = props[self.HASH]
1398
            src_size = props[self.SIZE]
1399
            src_type = props[self.TYPE]
1400
            src_checksum = props[self.CHECKSUM]
1401
        else:
1402
            src_version_id = None
1403
            src_hash = None
1404
            src_size = 0
1405
            src_type = ''
1406
            src_checksum = ''
1407
        if size is None:  # Set metadata.
1408
            hash = src_hash  # This way hash can be set to None
1409
                             # (account or container).
1410
            size = src_size
1411
        if type is None:
1412
            type = src_type
1413
        if checksum is None:
1414
            checksum = src_checksum
1415
        uuid = self._generate_uuid(
1416
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1417

    
1418
        if src_node is None:
1419
            pre_version_id = src_version_id
1420
        else:
1421
            pre_version_id = None
1422
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1423
            if props is not None:
1424
                pre_version_id = props[self.SERIAL]
1425
        if pre_version_id is not None:
1426
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1427
                                        update_statistics_ancestors_depth)
1428

    
1429
        dest_version_id, mtime = self.node.version_create(
1430
            node, hash, size, type, src_version_id, user, uuid, checksum,
1431
            cluster, update_statistics_ancestors_depth)
1432

    
1433
        self.node.attribute_unset_is_latest(node, dest_version_id)
1434

    
1435
        return pre_version_id, dest_version_id
1436

    
1437
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1438
                                node, meta, replace=False):
1439
        if src_version_id is not None:
1440
            self.node.attribute_copy(src_version_id, dest_version_id)
1441
        if not replace:
1442
            self.node.attribute_del(dest_version_id, domain, (
1443
                k for k, v in meta.iteritems() if v == ''))
1444
            self.node.attribute_set(dest_version_id, domain, node, (
1445
                (k, v) for k, v in meta.iteritems() if v != ''))
1446
        else:
1447
            self.node.attribute_del(dest_version_id, domain)
1448
            self.node.attribute_set(dest_version_id, domain, node, ((
1449
                k, v) for k, v in meta.iteritems()))
1450

    
1451
    def _put_metadata(self, user, node, domain, meta, replace=False,
1452
                      update_statistics_ancestors_depth=None):
1453
        """Create a new version and store metadata."""
1454

    
1455
        src_version_id, dest_version_id = self._put_version_duplicate(
1456
            user, node,
1457
            update_statistics_ancestors_depth=
1458
            update_statistics_ancestors_depth)
1459
        self._put_metadata_duplicate(
1460
            src_version_id, dest_version_id, domain, node, meta, replace)
1461
        return src_version_id, dest_version_id
1462

    
1463
    def _list_limits(self, listing, marker, limit):
1464
        start = 0
1465
        if marker:
1466
            try:
1467
                start = listing.index(marker) + 1
1468
            except ValueError:
1469
                pass
1470
        if not limit or limit > 10000:
1471
            limit = 10000
1472
        return start, limit
1473

    
1474
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1475
                                marker=None, limit=10000, virtual=True,
1476
                                domain=None, keys=None, until=None,
1477
                                size_range=None, allowed=None,
1478
                                all_props=False):
1479
        keys = keys or []
1480
        allowed = allowed or []
1481
        cont_prefix = path + '/'
1482
        prefix = cont_prefix + prefix
1483
        start = cont_prefix + marker if marker else None
1484
        before = until if until is not None else inf
1485
        filterq = keys if domain else []
1486
        sizeq = size_range
1487

    
1488
        objects, prefixes = self.node.latest_version_list(
1489
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1490
            allowed, domain, filterq, sizeq, all_props)
1491
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1492
        objects.sort(key=lambda x: x[0])
1493
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1494
        return objects
1495

    
1496
    # Reporting functions.
1497

    
1498
    @debug_method
1499
    def _report_size_change(self, user, account, size, details=None):
1500
        details = details or {}
1501

    
1502
        if size == 0:
1503
            return
1504

    
1505
        account_node = self._lookup_account(account, True)[1]
1506
        total = self._get_statistics(account_node, compute=True)[1]
1507
        details.update({'user': user, 'total': total})
1508
        self.messages.append(
1509
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1510
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1511

    
1512
        if not self.using_external_quotaholder:
1513
            return
1514

    
1515
        try:
1516
            name = details['path'] if 'path' in details else ''
1517
            serial = self.astakosclient.issue_one_commission(
1518
                token=self.service_token,
1519
                holder=account,
1520
                source=DEFAULT_SOURCE,
1521
                provisions={'pithos.diskspace': size},
1522
                name=name)
1523
        except BaseException, e:
1524
            raise QuotaError(e)
1525
        else:
1526
            self.serials.append(serial)
1527

    
1528
    @debug_method
1529
    def _report_object_change(self, user, account, path, details=None):
1530
        details = details or {}
1531
        details.update({'user': user})
1532
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1533
                              account, QUEUE_INSTANCE_ID, 'object', path,
1534
                              details))
1535

    
1536
    @debug_method
1537
    def _report_sharing_change(self, user, account, path, details=None):
1538
        details = details or {}
1539
        details.update({'user': user})
1540
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1541
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1542
                              details))
1543

    
1544
    # Policy functions.
1545

    
1546
    def _check_policy(self, policy, is_account_policy=True):
1547
        default_policy = self.default_account_policy \
1548
            if is_account_policy else self.default_container_policy
1549
        for k in policy.keys():
1550
            if policy[k] == '':
1551
                policy[k] = default_policy.get(k)
1552
        for k, v in policy.iteritems():
1553
            if k == 'quota':
1554
                q = int(v)  # May raise ValueError.
1555
                if q < 0:
1556
                    raise ValueError
1557
            elif k == 'versioning':
1558
                if v not in ['auto', 'none']:
1559
                    raise ValueError
1560
            else:
1561
                raise ValueError
1562

    
1563
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1564
        default_policy = self.default_account_policy \
1565
            if is_account_policy else self.default_container_policy
1566
        if replace:
1567
            for k, v in default_policy.iteritems():
1568
                if k not in policy:
1569
                    policy[k] = v
1570
        self.node.policy_set(node, policy)
1571

    
1572
    def _get_policy(self, node, is_account_policy=True):
1573
        default_policy = self.default_account_policy \
1574
            if is_account_policy else self.default_container_policy
1575
        policy = default_policy.copy()
1576
        policy.update(self.node.policy_get(node))
1577
        return policy
1578

    
1579
    def _apply_versioning(self, account, container, version_id,
1580
                          update_statistics_ancestors_depth=None):
1581
        """Delete the provided version if such is the policy.
1582
           Return size of object removed.
1583
        """
1584

    
1585
        if version_id is None:
1586
            return 0
1587
        path, node = self._lookup_container(account, container)
1588
        versioning = self._get_policy(
1589
            node, is_account_policy=False)['versioning']
1590
        if versioning != 'auto':
1591
            hash, size = self.node.version_remove(
1592
                version_id, update_statistics_ancestors_depth)
1593
            self.store.map_delete(hash)
1594
            return size
1595
        elif self.free_versioning:
1596
            return self.node.version_get_properties(
1597
                version_id, keys=('size',))[0]
1598
        return 0
1599

    
1600
    # Access control functions.
1601

    
1602
    def _check_groups(self, groups):
1603
        # raise ValueError('Bad characters in groups')
1604
        pass
1605

    
1606
    def _check_permissions(self, path, permissions):
1607
        # raise ValueError('Bad characters in permissions')
1608
        pass
1609

    
1610
    def _get_formatted_paths(self, paths):
1611
        formatted = []
1612
        for p in paths:
1613
            node = self.node.node_lookup(p)
1614
            props = None
1615
            if node is not None:
1616
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1617
            if props is not None:
1618
                if props[self.TYPE].split(';', 1)[0].strip() in (
1619
                        'application/directory', 'application/folder'):
1620
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1621
                formatted.append((p, self.MATCH_EXACT))
1622
        return formatted
1623

    
1624
    def _get_permissions_path(self, account, container, name):
1625
        path = '/'.join((account, container, name))
1626
        permission_paths = self.permissions.access_inherit(path)
1627
        permission_paths.sort()
1628
        permission_paths.reverse()
1629
        for p in permission_paths:
1630
            if p == path:
1631
                return p
1632
            else:
1633
                if p.count('/') < 2:
1634
                    continue
1635
                node = self.node.node_lookup(p)
1636
                props = None
1637
                if node is not None:
1638
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1639
                if props is not None:
1640
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1641
                            'application/directory', 'application/folder'):
1642
                        return p
1643
        return None
1644

    
1645
    def _can_read(self, user, account, container, name):
1646
        if user == account:
1647
            return True
1648
        path = '/'.join((account, container, name))
1649
        if self.permissions.public_get(path) is not None:
1650
            return True
1651
        path = self._get_permissions_path(account, container, name)
1652
        if not path:
1653
            raise NotAllowedError
1654
        if (not self.permissions.access_check(path, self.READ, user) and not
1655
                self.permissions.access_check(path, self.WRITE, user)):
1656
            raise NotAllowedError
1657

    
1658
    def _can_write(self, user, account, container, name):
1659
        if user == account:
1660
            return True
1661
        path = '/'.join((account, container, name))
1662
        path = self._get_permissions_path(account, container, name)
1663
        if not path:
1664
            raise NotAllowedError
1665
        if not self.permissions.access_check(path, self.WRITE, user):
1666
            raise NotAllowedError
1667

    
1668
    def _allowed_accounts(self, user):
1669
        allow = set()
1670
        for path in self.permissions.access_list_paths(user):
1671
            allow.add(path.split('/', 1)[0])
1672
        return sorted(allow)
1673

    
1674
    def _allowed_containers(self, user, account):
1675
        allow = set()
1676
        for path in self.permissions.access_list_paths(user, account):
1677
            allow.add(path.split('/', 2)[1])
1678
        return sorted(allow)
1679

    
1680
    # Domain functions
1681

    
1682
    @debug_method
1683
    def get_domain_objects(self, domain, user=None):
1684
        allowed_paths = self.permissions.access_list_paths(
1685
            user, include_owned=user is not None, include_containers=False)
1686
        if not allowed_paths:
1687
            return []
1688
        obj_list = self.node.domain_object_list(
1689
            domain, allowed_paths, CLUSTER_NORMAL)
1690
        return [(path,
1691
                 self._build_metadata(props, user_defined_meta),
1692
                 self.permissions.access_get(path)) for
1693
                path, props, user_defined_meta in obj_list]
1694

    
1695
    # util functions
1696

    
1697
    def _build_metadata(self, props, user_defined=None,
1698
                        include_user_defined=True):
1699
        meta = {'bytes': props[self.SIZE],
1700
                'type': props[self.TYPE],
1701
                'hash': props[self.HASH],
1702
                'version': props[self.SERIAL],
1703
                'version_timestamp': props[self.MTIME],
1704
                'modified_by': props[self.MUSER],
1705
                'uuid': props[self.UUID],
1706
                'checksum': props[self.CHECKSUM]}
1707
        if include_user_defined and user_defined is not None:
1708
            meta.update(user_defined)
1709
        return meta
1710

    
1711
    def _has_read_access(self, user, path):
1712
        try:
1713
            account, container, object = path.split('/', 2)
1714
        except ValueError:
1715
            raise ValueError('Invalid object path')
1716

    
1717
        assert isinstance(user, basestring), "Invalid user"
1718

    
1719
        try:
1720
            self._can_read(user, account, container, object)
1721
        except NotAllowedError:
1722
            return False
1723
        else:
1724
            return True
1725

    
1726
    def _exists(self, node):
1727
        try:
1728
            self._get_version(node)
1729
        except ItemNotExists:
1730
            return False
1731
        else:
1732
            return True