Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 33af031c

History | View | Annotate | Download (69.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
52

    
53

    
54
class DisabledAstakosClient(object):
55
    def __init__(self, *args, **kwargs):
56
        self.args = args
57
        self.kwargs = kwargs
58

    
59
    def __getattr__(self, name):
60
        m = ("AstakosClient has been disabled, "
61
             "yet an attempt to access it was made")
62
        raise AssertionError(m)
63

    
64

    
65
# Stripped-down version of the HashMap class found in tools.
66

    
67
class HashMap(list):
68

    
69
    def __init__(self, blocksize, blockhash):
70
        super(HashMap, self).__init__()
71
        self.blocksize = blocksize
72
        self.blockhash = blockhash
73

    
74
    def _hash_raw(self, v):
75
        h = hashlib.new(self.blockhash)
76
        h.update(v)
77
        return h.digest()
78

    
79
    def hash(self):
80
        if len(self) == 0:
81
            return self._hash_raw('')
82
        if len(self) == 1:
83
            return self.__getitem__(0)
84

    
85
        h = list(self)
86
        s = 2
87
        while s < len(h):
88
            s = s * 2
89
        h += [('\x00' * len(h[0]))] * (s - len(h))
90
        while len(h) > 1:
91
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
92
        return h[0]
93

    
94
# Default modules and settings.
95
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
96
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
97
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
98
DEFAULT_BLOCK_PATH = 'data/'
99
DEFAULT_BLOCK_UMASK = 0o022
100
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
101
DEFAULT_HASH_ALGORITHM = 'sha256'
102
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
103
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
104
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
105
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
106
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
107
                               'abcdefghijklmnopqrstuvwxyz'
108
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
109
DEFAULT_PUBLIC_URL_SECURITY = 16
110

    
111
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
112
QUEUE_CLIENT_ID = 'pithos'
113
QUEUE_INSTANCE_ID = '1'
114

    
115
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
116

    
117
inf = float('inf')
118

    
119
ULTIMATE_ANSWER = 42
120

    
121
DEFAULT_SOURCE = 'system'
122

    
123
logger = logging.getLogger(__name__)
124

    
125

    
126
def debug_method(func):
127
    @wraps(func)
128
    def wrapper(self, *args, **kw):
129
        try:
130
            result = func(self, *args, **kw)
131
            return result
132
        except:
133
            result = format_exc()
134
            raise
135
        finally:
136
            all_args = map(repr, args)
137
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
138
            logger.debug(">>> %s(%s) <<< %s" % (
139
                func.__name__, ', '.join(all_args).rstrip(', '), result))
140
    return wrapper
141

    
142

    
143
class ModularBackend(BaseBackend):
144
    """A modular backend.
145

146
    Uses modules for SQL functions and storage.
147
    """
148

    
149
    def __init__(self, db_module=None, db_connection=None,
150
                 block_module=None, block_path=None, block_umask=None,
151
                 block_size=None, hash_algorithm=None,
152
                 queue_module=None, queue_hosts=None, queue_exchange=None,
153
                 astakos_url=None, service_token=None,
154
                 astakosclient_poolsize=None,
155
                 free_versioning=True, block_params=None,
156
                 public_url_security=None,
157
                 public_url_alphabet=None,
158
                 account_quota_policy=None,
159
                 container_quota_policy=None,
160
                 container_versioning_policy=None):
161
        db_module = db_module or DEFAULT_DB_MODULE
162
        db_connection = db_connection or DEFAULT_DB_CONNECTION
163
        block_module = block_module or DEFAULT_BLOCK_MODULE
164
        block_path = block_path or DEFAULT_BLOCK_PATH
165
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
166
        block_params = block_params or DEFAULT_BLOCK_PARAMS
167
        block_size = block_size or DEFAULT_BLOCK_SIZE
168
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
169
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
170
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
171
        container_quota_policy = container_quota_policy \
172
            or DEFAULT_CONTAINER_QUOTA
173
        container_versioning_policy = container_versioning_policy \
174
            or DEFAULT_CONTAINER_VERSIONING
175

    
176
        self.default_account_policy = {'quota': account_quota_policy}
177
        self.default_container_policy = {
178
            'quota': container_quota_policy,
179
            'versioning': container_versioning_policy
180
        }
181
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
182
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
183

    
184
        self.public_url_security = (public_url_security or
185
                                    DEFAULT_PUBLIC_URL_SECURITY)
186
        self.public_url_alphabet = (public_url_alphabet or
187
                                    DEFAULT_PUBLIC_URL_ALPHABET)
188

    
189
        self.hash_algorithm = hash_algorithm
190
        self.block_size = block_size
191
        self.free_versioning = free_versioning
192

    
193
        def load_module(m):
194
            __import__(m)
195
            return sys.modules[m]
196

    
197
        self.db_module = load_module(db_module)
198
        self.wrapper = self.db_module.DBWrapper(db_connection)
199
        params = {'wrapper': self.wrapper}
200
        self.permissions = self.db_module.Permissions(**params)
201
        self.config = self.db_module.Config(**params)
202
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
203
        for x in ['READ', 'WRITE']:
204
            setattr(self, x, getattr(self.db_module, x))
205
        self.node = self.db_module.Node(**params)
206
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
207
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
208
                  'MATCH_PREFIX', 'MATCH_EXACT']:
209
            setattr(self, x, getattr(self.db_module, x))
210

    
211
        self.block_module = load_module(block_module)
212
        self.block_params = block_params
213
        params = {'path': block_path,
214
                  'block_size': self.block_size,
215
                  'hash_algorithm': self.hash_algorithm,
216
                  'umask': block_umask}
217
        params.update(self.block_params)
218
        self.store = self.block_module.Store(**params)
219

    
220
        if queue_module and queue_hosts:
221
            self.queue_module = load_module(queue_module)
222
            params = {'hosts': queue_hosts,
223
                      'exchange': queue_exchange,
224
                      'client_id': QUEUE_CLIENT_ID}
225
            self.queue = self.queue_module.Queue(**params)
226
        else:
227
            class NoQueue:
228
                def send(self, *args):
229
                    pass
230

    
231
                def close(self):
232
                    pass
233

    
234
            self.queue = NoQueue()
235

    
236
        self.astakos_url = astakos_url
237
        self.service_token = service_token
238

    
239
        if not astakos_url or not AstakosClient:
240
            self.astakosclient = DisabledAstakosClient(
241
                astakos_url,
242
                use_pool=True,
243
                pool_size=astakosclient_poolsize)
244
        else:
245
            self.astakosclient = AstakosClient(
246
                astakos_url,
247
                use_pool=True,
248
                pool_size=astakosclient_poolsize)
249

    
250
        self.serials = []
251
        self.messages = []
252

    
253
        self._move_object = partial(self._copy_object, is_move=True)
254

    
255
    def pre_exec(self, lock_container_path=False):
256
        self.lock_container_path = lock_container_path
257
        self.wrapper.execute()
258

    
259
    def post_exec(self, success_status=True):
260
        if success_status:
261
            # send messages produced
262
            for m in self.messages:
263
                self.queue.send(*m)
264

    
265
            # register serials
266
            if self.serials:
267
                self.commission_serials.insert_many(
268
                    self.serials)
269

    
270
                # commit to ensure that the serials are registered
271
                # even if resolve commission fails
272
                self.wrapper.commit()
273

    
274
                # start new transaction
275
                self.wrapper.execute()
276

    
277
                r = self.astakosclient.resolve_commissions(
278
                    token=self.service_token,
279
                    accept_serials=self.serials,
280
                    reject_serials=[])
281
                self.commission_serials.delete_many(
282
                    r['accepted'])
283

    
284
            self.wrapper.commit()
285
        else:
286
            if self.serials:
287
                self.astakosclient.resolve_commissions(
288
                    token=self.service_token,
289
                    accept_serials=[],
290
                    reject_serials=self.serials)
291
            self.wrapper.rollback()
292

    
293
    def close(self):
294
        self.wrapper.close()
295
        self.queue.close()
296

    
297
    @property
298
    def using_external_quotaholder(self):
299
        return not isinstance(self.astakosclient, DisabledAstakosClient)
300

    
301
    @debug_method
302
    def list_accounts(self, user, marker=None, limit=10000):
303
        """Return a list of accounts the user can access."""
304

    
305
        allowed = self._allowed_accounts(user)
306
        start, limit = self._list_limits(allowed, marker, limit)
307
        return allowed[start:start + limit]
308

    
309
    @debug_method
310
    def get_account_meta(
311
            self, user, account, domain, until=None, include_user_defined=True,
312
            external_quota=None):
313
        """Return a dictionary with the account metadata for the domain."""
314

    
315
        path, node = self._lookup_account(account, user == account)
316
        if user != account:
317
            if until or (node is None) or (account not
318
                                           in self._allowed_accounts(user)):
319
                raise NotAllowedError
320
        try:
321
            props = self._get_properties(node, until)
322
            mtime = props[self.MTIME]
323
        except NameError:
324
            props = None
325
            mtime = until
326
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
327
        tstamp = max(tstamp, mtime)
328
        if until is None:
329
            modified = tstamp
330
        else:
331
            modified = self._get_statistics(
332
                node, compute=True)[2]  # Overall last modification.
333
            modified = max(modified, mtime)
334

    
335
        if user != account:
336
            meta = {'name': account}
337
        else:
338
            meta = {}
339
            if props is not None and include_user_defined:
340
                meta.update(
341
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
342
            if until is not None:
343
                meta.update({'until_timestamp': tstamp})
344
            meta.update({'name': account, 'count': count, 'bytes': bytes})
345
            if self.using_external_quotaholder:
346
                external_quota = external_quota or {}
347
                meta['bytes'] = external_quota.get('usage', 0)
348
        meta.update({'modified': modified})
349
        return meta
350

    
351
    @debug_method
352
    def update_account_meta(self, user, account, domain, meta, replace=False):
353
        """Update the metadata associated with the account for the domain."""
354

    
355
        if user != account:
356
            raise NotAllowedError
357
        path, node = self._lookup_account(account, True)
358
        self._put_metadata(user, node, domain, meta, replace,
359
                           update_statistics_ancestors_depth=-1)
360

    
361
    @debug_method
362
    def get_account_groups(self, user, account):
363
        """Return a dictionary with the user groups defined for the account."""
364

    
365
        if user != account:
366
            if account not in self._allowed_accounts(user):
367
                raise NotAllowedError
368
            return {}
369
        self._lookup_account(account, True)
370
        return self.permissions.group_dict(account)
371

    
372
    @debug_method
373
    def update_account_groups(self, user, account, groups, replace=False):
374
        """Update the groups associated with the account."""
375

    
376
        if user != account:
377
            raise NotAllowedError
378
        self._lookup_account(account, True)
379
        self._check_groups(groups)
380
        if replace:
381
            self.permissions.group_destroy(account)
382
        for k, v in groups.iteritems():
383
            if not replace:  # If not already deleted.
384
                self.permissions.group_delete(account, k)
385
            if v:
386
                self.permissions.group_addmany(account, k, v)
387

    
388
    @debug_method
389
    def get_account_policy(self, user, account, external_quota=None):
390
        """Return a dictionary with the account policy."""
391

    
392
        if user != account:
393
            if account not in self._allowed_accounts(user):
394
                raise NotAllowedError
395
            return {}
396
        path, node = self._lookup_account(account, True)
397
        policy = self._get_policy(node, is_account_policy=True)
398
        if self.using_external_quotaholder:
399
            external_quota = external_quota or {}
400
            policy['quota'] = external_quota.get('limit', 0)
401
        return policy
402

    
403
    @debug_method
404
    def update_account_policy(self, user, account, policy, replace=False):
405
        """Update the policy associated with the account."""
406

    
407
        if user != account:
408
            raise NotAllowedError
409
        path, node = self._lookup_account(account, True)
410
        self._check_policy(policy, is_account_policy=True)
411
        self._put_policy(node, policy, replace, is_account_policy=True)
412

    
413
    @debug_method
414
    def put_account(self, user, account, policy=None):
415
        """Create a new account with the given name."""
416

    
417
        policy = policy or {}
418
        if user != account:
419
            raise NotAllowedError
420
        node = self.node.node_lookup(account)
421
        if node is not None:
422
            raise AccountExists('Account already exists')
423
        if policy:
424
            self._check_policy(policy, is_account_policy=True)
425
        node = self._put_path(user, self.ROOTNODE, account,
426
                              update_statistics_ancestors_depth=-1)
427
        self._put_policy(node, policy, True, is_account_policy=True)
428

    
429
    @debug_method
430
    def delete_account(self, user, account):
431
        """Delete the account with the given name."""
432

    
433
        if user != account:
434
            raise NotAllowedError
435
        node = self.node.node_lookup(account)
436
        if node is None:
437
            return
438
        if not self.node.node_remove(node,
439
                                     update_statistics_ancestors_depth=-1):
440
            raise AccountNotEmpty('Account is not empty')
441
        self.permissions.group_destroy(account)
442

    
443
    @debug_method
444
    def list_containers(self, user, account, marker=None, limit=10000,
445
                        shared=False, until=None, public=False):
446
        """Return a list of containers existing under an account."""
447

    
448
        if user != account:
449
            if until or account not in self._allowed_accounts(user):
450
                raise NotAllowedError
451
            allowed = self._allowed_containers(user, account)
452
            start, limit = self._list_limits(allowed, marker, limit)
453
            return allowed[start:start + limit]
454
        if shared or public:
455
            allowed = set()
456
            if shared:
457
                allowed.update([x.split('/', 2)[1] for x in
458
                               self.permissions.access_list_shared(account)])
459
            if public:
460
                allowed.update([x[0].split('/', 2)[1] for x in
461
                               self.permissions.public_list(account)])
462
            allowed = sorted(allowed)
463
            start, limit = self._list_limits(allowed, marker, limit)
464
            return allowed[start:start + limit]
465
        node = self.node.node_lookup(account)
466
        containers = [x[0] for x in self._list_object_properties(
467
            node, account, '', '/', marker, limit, False, None, [], until)]
468
        start, limit = self._list_limits(
469
            [x[0] for x in containers], marker, limit)
470
        return containers[start:start + limit]
471

    
472
    @debug_method
473
    def list_container_meta(self, user, account, container, domain,
474
                            until=None):
475
        """Return a list of the container's object meta keys for a domain."""
476

    
477
        allowed = []
478
        if user != account:
479
            if until:
480
                raise NotAllowedError
481
            allowed = self.permissions.access_list_paths(
482
                user, '/'.join((account, container)))
483
            if not allowed:
484
                raise NotAllowedError
485
        path, node = self._lookup_container(account, container)
486
        before = until if until is not None else inf
487
        allowed = self._get_formatted_paths(allowed)
488
        return self.node.latest_attribute_keys(node, domain, before,
489
                                               CLUSTER_DELETED, allowed)
490

    
491
    @debug_method
492
    def get_container_meta(self, user, account, container, domain, until=None,
493
                           include_user_defined=True):
494
        """Return a dictionary with the container metadata for the domain."""
495

    
496
        if user != account:
497
            if until or container not in self._allowed_containers(user,
498
                                                                  account):
499
                raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501
        props = self._get_properties(node, until)
502
        mtime = props[self.MTIME]
503
        count, bytes, tstamp = self._get_statistics(node, until)
504
        tstamp = max(tstamp, mtime)
505
        if until is None:
506
            modified = tstamp
507
        else:
508
            modified = self._get_statistics(
509
                node)[2]  # Overall last modification.
510
            modified = max(modified, mtime)
511

    
512
        if user != account:
513
            meta = {'name': container}
514
        else:
515
            meta = {}
516
            if include_user_defined:
517
                meta.update(
518
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
519
            if until is not None:
520
                meta.update({'until_timestamp': tstamp})
521
            meta.update({'name': container, 'count': count, 'bytes': bytes})
522
        meta.update({'modified': modified})
523
        return meta
524

    
525
    @debug_method
526
    def update_container_meta(self, user, account, container, domain, meta,
527
                              replace=False):
528
        """Update the metadata associated with the container for the domain."""
529

    
530
        if user != account:
531
            raise NotAllowedError
532
        path, node = self._lookup_container(account, container)
533
        src_version_id, dest_version_id = self._put_metadata(
534
            user, node, domain, meta, replace,
535
            update_statistics_ancestors_depth=0)
536
        if src_version_id is not None:
537
            versioning = self._get_policy(
538
                node, is_account_policy=False)['versioning']
539
            if versioning != 'auto':
540
                self.node.version_remove(src_version_id,
541
                                         update_statistics_ancestors_depth=0)
542

    
543
    @debug_method
544
    def get_container_policy(self, user, account, container):
545
        """Return a dictionary with the container policy."""
546

    
547
        if user != account:
548
            if container not in self._allowed_containers(user, account):
549
                raise NotAllowedError
550
            return {}
551
        path, node = self._lookup_container(account, container)
552
        return self._get_policy(node, is_account_policy=False)
553

    
554
    @debug_method
555
    def update_container_policy(self, user, account, container, policy,
556
                                replace=False):
557
        """Update the policy associated with the container."""
558

    
559
        if user != account:
560
            raise NotAllowedError
561
        path, node = self._lookup_container(account, container)
562
        self._check_policy(policy, is_account_policy=False)
563
        self._put_policy(node, policy, replace, is_account_policy=False)
564

    
565
    @debug_method
566
    def put_container(self, user, account, container, policy=None):
567
        """Create a new container with the given name."""
568

    
569
        policy = policy or {}
570
        if user != account:
571
            raise NotAllowedError
572
        try:
573
            path, node = self._lookup_container(account, container)
574
        except NameError:
575
            pass
576
        else:
577
            raise ContainerExists('Container already exists')
578
        if policy:
579
            self._check_policy(policy, is_account_policy=False)
580
        path = '/'.join((account, container))
581
        node = self._put_path(
582
            user, self._lookup_account(account, True)[1], path,
583
            update_statistics_ancestors_depth=-1)
584
        self._put_policy(node, policy, True, is_account_policy=False)
585

    
586
    @debug_method
587
    def delete_container(self, user, account, container, until=None, prefix='',
588
                         delimiter=None):
589
        """Delete/purge the container with the given name."""
590

    
591
        if user != account:
592
            raise NotAllowedError
593
        path, node = self._lookup_container(account, container)
594

    
595
        if until is not None:
596
            hashes, size, serials = self.node.node_purge_children(
597
                node, until, CLUSTER_HISTORY,
598
                update_statistics_ancestors_depth=0)
599
            for h in hashes:
600
                self.store.map_delete(h)
601
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
602
                                          update_statistics_ancestors_depth=0)
603
            if not self.free_versioning:
604
                self._report_size_change(
605
                    user, account, -size, {
606
                        'action': 'container purge',
607
                        'path': path,
608
                        'versions': ','.join(str(i) for i in serials)
609
                    }
610
                )
611
            return
612

    
613
        if not delimiter:
614
            if self._get_statistics(node)[0] > 0:
615
                raise ContainerNotEmpty('Container is not empty')
616
            hashes, size, serials = self.node.node_purge_children(
617
                node, inf, CLUSTER_HISTORY,
618
                update_statistics_ancestors_depth=0)
619
            for h in hashes:
620
                self.store.map_delete(h)
621
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
622
                                          update_statistics_ancestors_depth=0)
623
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
624
            if not self.free_versioning:
625
                self._report_size_change(
626
                    user, account, -size, {
627
                        'action': 'container purge',
628
                        'path': path,
629
                        'versions': ','.join(str(i) for i in serials)
630
                    }
631
                )
632
        else:
633
            # remove only contents
634
            src_names = self._list_objects_no_limit(
635
                user, account, container, prefix='', delimiter=None,
636
                virtual=False, domain=None, keys=[], shared=False, until=None,
637
                size_range=None, all_props=True, public=False)
638
            paths = []
639
            for t in src_names:
640
                path = '/'.join((account, container, t[0]))
641
                node = t[2]
642
                if not self._exists(node):
643
                    continue
644
                src_version_id, dest_version_id = self._put_version_duplicate(
645
                    user, node, size=0, type='', hash=None, checksum='',
646
                    cluster=CLUSTER_DELETED,
647
                    update_statistics_ancestors_depth=1)
648
                del_size = self._apply_versioning(
649
                    account, container, src_version_id,
650
                    update_statistics_ancestors_depth=1)
651
                self._report_size_change(
652
                    user, account, -del_size, {
653
                        'action': 'object delete',
654
                        'path': path,
655
                        'versions': ','.join([str(dest_version_id)])})
656
                self._report_object_change(
657
                    user, account, path, details={'action': 'object delete'})
658
                paths.append(path)
659
            self.permissions.access_clear_bulk(paths)
660

    
661
    def _list_objects(self, user, account, container, prefix, delimiter,
662
                      marker, limit, virtual, domain, keys, shared, until,
663
                      size_range, all_props, public):
664
        if user != account and until:
665
            raise NotAllowedError
666
        if shared and public:
667
            # get shared first
668
            shared_paths = self._list_object_permissions(
669
                user, account, container, prefix, shared=True, public=False)
670
            objects = set()
671
            if shared_paths:
672
                path, node = self._lookup_container(account, container)
673
                shared_paths = self._get_formatted_paths(shared_paths)
674
                objects |= set(self._list_object_properties(
675
                    node, path, prefix, delimiter, marker, limit, virtual,
676
                    domain, keys, until, size_range, shared_paths, all_props))
677

    
678
            # get public
679
            objects |= set(self._list_public_object_properties(
680
                user, account, container, prefix, all_props))
681
            objects = list(objects)
682

    
683
            objects.sort(key=lambda x: x[0])
684
            start, limit = self._list_limits(
685
                [x[0] for x in objects], marker, limit)
686
            return objects[start:start + limit]
687
        elif public:
688
            objects = self._list_public_object_properties(
689
                user, account, container, prefix, all_props)
690
            start, limit = self._list_limits(
691
                [x[0] for x in objects], marker, limit)
692
            return objects[start:start + limit]
693

    
694
        allowed = self._list_object_permissions(
695
            user, account, container, prefix, shared, public)
696
        if shared and not allowed:
697
            return []
698
        path, node = self._lookup_container(account, container)
699
        allowed = self._get_formatted_paths(allowed)
700
        objects = self._list_object_properties(
701
            node, path, prefix, delimiter, marker, limit, virtual, domain,
702
            keys, until, size_range, allowed, all_props)
703
        start, limit = self._list_limits(
704
            [x[0] for x in objects], marker, limit)
705
        return objects[start:start + limit]
706

    
707
    def _list_public_object_properties(self, user, account, container, prefix,
708
                                       all_props):
709
        public = self._list_object_permissions(
710
            user, account, container, prefix, shared=False, public=True)
711
        paths, nodes = self._lookup_objects(public)
712
        path = '/'.join((account, container))
713
        cont_prefix = path + '/'
714
        paths = [x[len(cont_prefix):] for x in paths]
715
        objects = [(p,) + props for p, props in
716
                   zip(paths, self.node.version_lookup_bulk(
717
                       nodes, all_props=all_props))]
718
        return objects
719

    
720
    def _list_objects_no_limit(self, user, account, container, prefix,
721
                               delimiter, virtual, domain, keys, shared, until,
722
                               size_range, all_props, public):
723
        objects = []
724
        while True:
725
            marker = objects[-1] if objects else None
726
            limit = 10000
727
            l = self._list_objects(
728
                user, account, container, prefix, delimiter, marker, limit,
729
                virtual, domain, keys, shared, until, size_range, all_props,
730
                public)
731
            objects.extend(l)
732
            if not l or len(l) < limit:
733
                break
734
        return objects
735

    
736
    def _list_object_permissions(self, user, account, container, prefix,
737
                                 shared, public):
738
        allowed = []
739
        path = '/'.join((account, container, prefix)).rstrip('/')
740
        if user != account:
741
            allowed = self.permissions.access_list_paths(user, path)
742
            if not allowed:
743
                raise NotAllowedError
744
        else:
745
            allowed = set()
746
            if shared:
747
                allowed.update(self.permissions.access_list_shared(path))
748
            if public:
749
                allowed.update(
750
                    [x[0] for x in self.permissions.public_list(path)])
751
            allowed = sorted(allowed)
752
            if not allowed:
753
                return []
754
        return allowed
755

    
756
    @debug_method
757
    def list_objects(self, user, account, container, prefix='', delimiter=None,
758
                     marker=None, limit=10000, virtual=True, domain=None,
759
                     keys=None, shared=False, until=None, size_range=None,
760
                     public=False):
761
        """List (object name, object version_id) under a container."""
762

    
763
        keys = keys or []
764
        return self._list_objects(
765
            user, account, container, prefix, delimiter, marker, limit,
766
            virtual, domain, keys, shared, until, size_range, False, public)
767

    
768
    @debug_method
769
    def list_object_meta(self, user, account, container, prefix='',
770
                         delimiter=None, marker=None, limit=10000,
771
                         virtual=True, domain=None, keys=None, shared=False,
772
                         until=None, size_range=None, public=False):
773
        """Return a list of metadata dicts of objects under a container."""
774

    
775
        keys = keys or []
776
        props = self._list_objects(
777
            user, account, container, prefix, delimiter, marker, limit,
778
            virtual, domain, keys, shared, until, size_range, True, public)
779
        objects = []
780
        for p in props:
781
            if len(p) == 2:
782
                objects.append({'subdir': p[0]})
783
            else:
784
                objects.append({
785
                    'name': p[0],
786
                    'bytes': p[self.SIZE + 1],
787
                    'type': p[self.TYPE + 1],
788
                    'hash': p[self.HASH + 1],
789
                    'version': p[self.SERIAL + 1],
790
                    'version_timestamp': p[self.MTIME + 1],
791
                    'modified': p[self.MTIME + 1] if until is None else None,
792
                    'modified_by': p[self.MUSER + 1],
793
                    'uuid': p[self.UUID + 1],
794
                    'checksum': p[self.CHECKSUM + 1]})
795
        return objects
796

    
797
    @debug_method
798
    def list_object_permissions(self, user, account, container, prefix=''):
799
        """Return a list of paths enforce permissions under a container."""
800

    
801
        return self._list_object_permissions(user, account, container, prefix,
802
                                             True, False)
803

    
804
    @debug_method
805
    def list_object_public(self, user, account, container, prefix=''):
806
        """Return a mapping of object paths to public ids under a container."""
807

    
808
        public = {}
809
        for path, p in self.permissions.public_list('/'.join((account,
810
                                                              container,
811
                                                              prefix))):
812
            public[path] = p
813
        return public
814

    
815
    @debug_method
816
    def get_object_meta(self, user, account, container, name, domain,
817
                        version=None, include_user_defined=True):
818
        """Return a dictionary with the object metadata for the domain."""
819

    
820
        self._can_read(user, account, container, name)
821
        path, node = self._lookup_object(account, container, name)
822
        props = self._get_version(node, version)
823
        if version is None:
824
            modified = props[self.MTIME]
825
        else:
826
            try:
827
                modified = self._get_version(
828
                    node)[self.MTIME]  # Overall last modification.
829
            except NameError:  # Object may be deleted.
830
                del_props = self.node.version_lookup(
831
                    node, inf, CLUSTER_DELETED)
832
                if del_props is None:
833
                    raise ItemNotExists('Object does not exist')
834
                modified = del_props[self.MTIME]
835

    
836
        meta = {}
837
        if include_user_defined:
838
            meta.update(
839
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
840
        meta.update({'name': name,
841
                     'bytes': props[self.SIZE],
842
                     'type': props[self.TYPE],
843
                     'hash': props[self.HASH],
844
                     'version': props[self.SERIAL],
845
                     'version_timestamp': props[self.MTIME],
846
                     'modified': modified,
847
                     'modified_by': props[self.MUSER],
848
                     'uuid': props[self.UUID],
849
                     'checksum': props[self.CHECKSUM]})
850
        return meta
851

    
852
    @debug_method
853
    def update_object_meta(self, user, account, container, name, domain, meta,
854
                           replace=False):
855
        """Update object metadata for a domain and return the new version."""
856

    
857
        self._can_write(user, account, container, name)
858
        path, node = self._lookup_object(account, container, name)
859
        src_version_id, dest_version_id = self._put_metadata(
860
            user, node, domain, meta, replace,
861
            update_statistics_ancestors_depth=1)
862
        self._apply_versioning(account, container, src_version_id,
863
                               update_statistics_ancestors_depth=1)
864
        return dest_version_id
865

    
866
    @debug_method
867
    def get_object_permissions(self, user, account, container, name):
868
        """Return the action allowed on the object, the path
869
        from which the object gets its permissions from,
870
        along with a dictionary containing the permissions."""
871

    
872
        allowed = 'write'
873
        permissions_path = self._get_permissions_path(account, container, name)
874
        if user != account:
875
            if self.permissions.access_check(permissions_path, self.WRITE,
876
                                             user):
877
                allowed = 'write'
878
            elif self.permissions.access_check(permissions_path, self.READ,
879
                                               user):
880
                allowed = 'read'
881
            else:
882
                raise NotAllowedError
883
        self._lookup_object(account, container, name)
884
        return (allowed,
885
                permissions_path,
886
                self.permissions.access_get(permissions_path))
887

    
888
    @debug_method
889
    def update_object_permissions(self, user, account, container, name,
890
                                  permissions):
891
        """Update the permissions associated with the object."""
892

    
893
        if user != account:
894
            raise NotAllowedError
895
        path = self._lookup_object(account, container, name)[0]
896
        self._check_permissions(path, permissions)
897
        self.permissions.access_set(path, permissions)
898
        self._report_sharing_change(user, account, path, {'members':
899
                                    self.permissions.access_members(path)})
900

    
901
    @debug_method
902
    def get_object_public(self, user, account, container, name):
903
        """Return the public id of the object if applicable."""
904

    
905
        self._can_read(user, account, container, name)
906
        path = self._lookup_object(account, container, name)[0]
907
        p = self.permissions.public_get(path)
908
        return p
909

    
910
    @debug_method
911
    def update_object_public(self, user, account, container, name, public):
912
        """Update the public status of the object."""
913

    
914
        self._can_write(user, account, container, name)
915
        path = self._lookup_object(account, container, name)[0]
916
        if not public:
917
            self.permissions.public_unset(path)
918
        else:
919
            self.permissions.public_set(
920
                path, self.public_url_security, self.public_url_alphabet)
921

    
922
    @debug_method
923
    def get_object_hashmap(self, user, account, container, name, version=None):
924
        """Return the object's size and a list with partial hashes."""
925

    
926
        self._can_read(user, account, container, name)
927
        path, node = self._lookup_object(account, container, name)
928
        props = self._get_version(node, version)
929
        if props[self.HASH] is None:
930
            return 0, ()
931
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
932
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
933

    
934
    def _update_object_hash(self, user, account, container, name, size, type,
935
                            hash, checksum, domain, meta, replace_meta,
936
                            permissions, src_node=None, src_version_id=None,
937
                            is_copy=False, report_size_change=True):
938
        if permissions is not None and user != account:
939
            raise NotAllowedError
940
        self._can_write(user, account, container, name)
941
        if permissions is not None:
942
            path = '/'.join((account, container, name))
943
            self._check_permissions(path, permissions)
944

    
945
        account_path, account_node = self._lookup_account(account, True)
946
        container_path, container_node = self._lookup_container(
947
            account, container)
948

    
949
        path, node = self._put_object_node(
950
            container_path, container_node, name)
951
        pre_version_id, dest_version_id = self._put_version_duplicate(
952
            user, node, src_node=src_node, size=size, type=type, hash=hash,
953
            checksum=checksum, is_copy=is_copy,
954
            update_statistics_ancestors_depth=1)
955

    
956
        # Handle meta.
957
        if src_version_id is None:
958
            src_version_id = pre_version_id
959
        self._put_metadata_duplicate(
960
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
961

    
962
        del_size = self._apply_versioning(account, container, pre_version_id,
963
                                          update_statistics_ancestors_depth=1)
964
        size_delta = size - del_size
965
        if size_delta > 0:
966
            # Check account quota.
967
            if not self.using_external_quotaholder:
968
                account_quota = long(self._get_policy(
969
                    account_node, is_account_policy=True)['quota'])
970
                account_usage = self._get_statistics(account_node,
971
                                                     compute=True)[1]
972
                if (account_quota > 0 and account_usage > account_quota):
973
                    raise QuotaError(
974
                        'Account quota exceeded: limit: %s, usage: %s' % (
975
                            account_quota, account_usage))
976

    
977
            # Check container quota.
978
            container_quota = long(self._get_policy(
979
                container_node, is_account_policy=False)['quota'])
980
            container_usage = self._get_statistics(container_node)[1]
981
            if (container_quota > 0 and container_usage > container_quota):
982
                # This must be executed in a transaction, so the version is
983
                # never created if it fails.
984
                raise QuotaError(
985
                    'Container quota exceeded: limit: %s, usage: %s' % (
986
                        container_quota, container_usage
987
                    )
988
                )
989

    
990
        if report_size_change:
991
            self._report_size_change(
992
                user, account, size_delta,
993
                {'action': 'object update', 'path': path,
994
                 'versions': ','.join([str(dest_version_id)])})
995
        if permissions is not None:
996
            self.permissions.access_set(path, permissions)
997
            self._report_sharing_change(
998
                user, account, path,
999
                {'members': self.permissions.access_members(path)})
1000

    
1001
        self._report_object_change(
1002
            user, account, path,
1003
            details={'version': dest_version_id, 'action': 'object update'})
1004
        return dest_version_id
1005

    
1006
    @debug_method
1007
    def update_object_hashmap(self, user, account, container, name, size, type,
1008
                              hashmap, checksum, domain, meta=None,
1009
                              replace_meta=False, permissions=None):
1010
        """Create/update an object's hashmap and return the new version."""
1011

    
1012
        meta = meta or {}
1013
        if size == 0:  # No such thing as an empty hashmap.
1014
            hashmap = [self.put_block('')]
1015
        map = HashMap(self.block_size, self.hash_algorithm)
1016
        map.extend([binascii.unhexlify(x) for x in hashmap])
1017
        missing = self.store.block_search(map)
1018
        if missing:
1019
            ie = IndexError()
1020
            ie.data = [binascii.hexlify(x) for x in missing]
1021
            raise ie
1022

    
1023
        hash = map.hash()
1024
        hexlified = binascii.hexlify(hash)
1025
        dest_version_id = self._update_object_hash(
1026
            user, account, container, name, size, type, hexlified, checksum,
1027
            domain, meta, replace_meta, permissions)
1028
        self.store.map_put(hash, map)
1029
        return dest_version_id, hexlified
1030

    
1031
    @debug_method
1032
    def update_object_checksum(self, user, account, container, name, version,
1033
                               checksum):
1034
        """Update an object's checksum."""
1035

    
1036
        # Update objects with greater version and same hashmap
1037
        # and size (fix metadata updates).
1038
        self._can_write(user, account, container, name)
1039
        path, node = self._lookup_object(account, container, name)
1040
        props = self._get_version(node, version)
1041
        versions = self.node.node_get_versions(node)
1042
        for x in versions:
1043
            if (x[self.SERIAL] >= int(version) and
1044
                x[self.HASH] == props[self.HASH] and
1045
                    x[self.SIZE] == props[self.SIZE]):
1046
                self.node.version_put_property(
1047
                    x[self.SERIAL], 'checksum', checksum)
1048

    
1049
    def _copy_object(self, user, src_account, src_container, src_name,
1050
                     dest_account, dest_container, dest_name, type,
1051
                     dest_domain=None, dest_meta=None, replace_meta=False,
1052
                     permissions=None, src_version=None, is_move=False,
1053
                     delimiter=None):
1054

    
1055
        report_size_change = not is_move
1056
        dest_meta = dest_meta or {}
1057
        dest_version_ids = []
1058
        self._can_read(user, src_account, src_container, src_name)
1059
        path, node = self._lookup_object(src_account, src_container, src_name)
1060
        # TODO: Will do another fetch of the properties in duplicate version...
1061
        props = self._get_version(
1062
            node, src_version)  # Check to see if source exists.
1063
        src_version_id = props[self.SERIAL]
1064
        hash = props[self.HASH]
1065
        size = props[self.SIZE]
1066
        is_copy = not is_move and (src_account, src_container, src_name) != (
1067
            dest_account, dest_container, dest_name)  # New uuid.
1068
        dest_version_ids.append(self._update_object_hash(
1069
            user, dest_account, dest_container, dest_name, size, type, hash,
1070
            None, dest_domain, dest_meta, replace_meta, permissions,
1071
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1072
            report_size_change=report_size_change))
1073
        if is_move and ((src_account, src_container, src_name) !=
1074
                        (dest_account, dest_container, dest_name)):
1075
            self._delete_object(user, src_account, src_container, src_name,
1076
                                report_size_change=report_size_change)
1077

    
1078
        if delimiter:
1079
            prefix = (src_name + delimiter if not
1080
                      src_name.endswith(delimiter) else src_name)
1081
            src_names = self._list_objects_no_limit(
1082
                user, src_account, src_container, prefix, delimiter=None,
1083
                virtual=False, domain=None, keys=[], shared=False, until=None,
1084
                size_range=None, all_props=True, public=False)
1085
            src_names.sort(key=lambda x: x[2])  # order by nodes
1086
            paths = [elem[0] for elem in src_names]
1087
            nodes = [elem[2] for elem in src_names]
1088
            # TODO: Will do another fetch of the properties
1089
            # in duplicate version...
1090
            props = self._get_versions(nodes)  # Check to see if source exists.
1091

    
1092
            for prop, path, node in zip(props, paths, nodes):
1093
                src_version_id = prop[self.SERIAL]
1094
                hash = prop[self.HASH]
1095
                vtype = prop[self.TYPE]
1096
                size = prop[self.SIZE]
1097
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1098
                    delimiter) else dest_name
1099
                vdest_name = path.replace(prefix, dest_prefix, 1)
1100
                dest_version_ids.append(self._update_object_hash(
1101
                    user, dest_account, dest_container, vdest_name, size,
1102
                    vtype, hash, None, dest_domain, meta={},
1103
                    replace_meta=False, permissions=None, src_node=node,
1104
                    src_version_id=src_version_id, is_copy=is_copy))
1105
                if is_move and ((src_account, src_container, src_name) !=
1106
                                (dest_account, dest_container, dest_name)):
1107
                    self._delete_object(user, src_account, src_container, path)
1108
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1109
                dest_version_ids)
1110

    
1111
    @debug_method
1112
    def copy_object(self, user, src_account, src_container, src_name,
1113
                    dest_account, dest_container, dest_name, type, domain,
1114
                    meta=None, replace_meta=False, permissions=None,
1115
                    src_version=None, delimiter=None):
1116
        """Copy an object's data and metadata."""
1117

    
1118
        meta = meta or {}
1119
        dest_version_id = self._copy_object(
1120
            user, src_account, src_container, src_name, dest_account,
1121
            dest_container, dest_name, type, domain, meta, replace_meta,
1122
            permissions, src_version, False, delimiter)
1123
        return dest_version_id
1124

    
1125
    @debug_method
1126
    def move_object(self, user, src_account, src_container, src_name,
1127
                    dest_account, dest_container, dest_name, type, domain,
1128
                    meta=None, replace_meta=False, permissions=None,
1129
                    delimiter=None):
1130
        """Move an object's data and metadata."""
1131

    
1132
        meta = meta or {}
1133
        if user != src_account:
1134
            raise NotAllowedError
1135
        dest_version_id = self._move_object(
1136
            user, src_account, src_container, src_name, dest_account,
1137
            dest_container, dest_name, type, domain, meta, replace_meta,
1138
            permissions, None, delimiter=delimiter)
1139
        return dest_version_id
1140

    
1141
    def _delete_object(self, user, account, container, name, until=None,
1142
                       delimiter=None, report_size_change=True):
1143
        if user != account:
1144
            raise NotAllowedError
1145

    
1146
        if until is not None:
1147
            path = '/'.join((account, container, name))
1148
            node = self.node.node_lookup(path)
1149
            if node is None:
1150
                return
1151
            hashes = []
1152
            size = 0
1153
            serials = []
1154
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1155
                                           update_statistics_ancestors_depth=1)
1156
            hashes += h
1157
            size += s
1158
            serials += v
1159
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1160
                                           update_statistics_ancestors_depth=1)
1161
            hashes += h
1162
            if not self.free_versioning:
1163
                size += s
1164
            serials += v
1165
            for h in hashes:
1166
                self.store.map_delete(h)
1167
            self.node.node_purge(node, until, CLUSTER_DELETED,
1168
                                 update_statistics_ancestors_depth=1)
1169
            try:
1170
                self._get_version(node)
1171
            except NameError:
1172
                self.permissions.access_clear(path)
1173
            self._report_size_change(
1174
                user, account, -size, {
1175
                    'action': 'object purge',
1176
                    'path': path,
1177
                    'versions': ','.join(str(i) for i in serials)
1178
                }
1179
            )
1180
            return
1181

    
1182
        path, node = self._lookup_object(account, container, name)
1183
        if not self._exists(node):
1184
            raise ItemNotExists('Object is deleted.')
1185
        src_version_id, dest_version_id = self._put_version_duplicate(
1186
            user, node, size=0, type='', hash=None, checksum='',
1187
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1188
        del_size = self._apply_versioning(account, container, src_version_id,
1189
                                          update_statistics_ancestors_depth=1)
1190
        if report_size_change:
1191
            self._report_size_change(
1192
                user, account, -del_size,
1193
                {'action': 'object delete',
1194
                 'path': path,
1195
                 'versions': ','.join([str(dest_version_id)])})
1196
        self._report_object_change(
1197
            user, account, path, details={'action': 'object delete'})
1198
        self.permissions.access_clear(path)
1199

    
1200
        if delimiter:
1201
            prefix = name + delimiter if not name.endswith(delimiter) else name
1202
            src_names = self._list_objects_no_limit(
1203
                user, account, container, prefix, delimiter=None,
1204
                virtual=False, domain=None, keys=[], shared=False, until=None,
1205
                size_range=None, all_props=True, public=False)
1206
            paths = []
1207
            for t in src_names:
1208
                path = '/'.join((account, container, t[0]))
1209
                node = t[2]
1210
                if not self._exists(node):
1211
                    continue
1212
                src_version_id, dest_version_id = self._put_version_duplicate(
1213
                    user, node, size=0, type='', hash=None, checksum='',
1214
                    cluster=CLUSTER_DELETED,
1215
                    update_statistics_ancestors_depth=1)
1216
                del_size = self._apply_versioning(
1217
                    account, container, src_version_id,
1218
                    update_statistics_ancestors_depth=1)
1219
                if report_size_change:
1220
                    self._report_size_change(
1221
                        user, account, -del_size,
1222
                        {'action': 'object delete',
1223
                         'path': path,
1224
                         'versions': ','.join([str(dest_version_id)])})
1225
                self._report_object_change(
1226
                    user, account, path, details={'action': 'object delete'})
1227
                paths.append(path)
1228
            self.permissions.access_clear_bulk(paths)
1229

    
1230
    @debug_method
1231
    def delete_object(self, user, account, container, name, until=None,
1232
                      prefix='', delimiter=None):
1233
        """Delete/purge an object."""
1234

    
1235
        self._delete_object(user, account, container, name, until, delimiter)
1236

    
1237
    @debug_method
1238
    def list_versions(self, user, account, container, name):
1239
        """Return a list of all object (version, version_timestamp) tuples."""
1240

    
1241
        self._can_read(user, account, container, name)
1242
        path, node = self._lookup_object(account, container, name)
1243
        versions = self.node.node_get_versions(node)
1244
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1245
                x[self.CLUSTER] != CLUSTER_DELETED]
1246

    
1247
    @debug_method
1248
    def get_uuid(self, user, uuid):
1249
        """Return the (account, container, name) for the UUID given."""
1250

    
1251
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1252
        if info is None:
1253
            raise NameError
1254
        path, serial = info
1255
        account, container, name = path.split('/', 2)
1256
        self._can_read(user, account, container, name)
1257
        return (account, container, name)
1258

    
1259
    @debug_method
1260
    def get_public(self, user, public):
1261
        """Return the (account, container, name) for the public id given."""
1262

    
1263
        path = self.permissions.public_path(public)
1264
        if path is None:
1265
            raise NameError
1266
        account, container, name = path.split('/', 2)
1267
        self._can_read(user, account, container, name)
1268
        return (account, container, name)
1269

    
1270
    def get_block(self, hash):
1271
        """Return a block's data."""
1272

    
1273
        logger.debug("get_block: %s", hash)
1274
        block = self.store.block_get(binascii.unhexlify(hash))
1275
        if not block:
1276
            raise ItemNotExists('Block does not exist')
1277
        return block
1278

    
1279
    def put_block(self, data):
1280
        """Store a block and return the hash."""
1281

    
1282
        logger.debug("put_block: %s", len(data))
1283
        return binascii.hexlify(self.store.block_put(data))
1284

    
1285
    def update_block(self, hash, data, offset=0):
1286
        """Update a known block and return the hash."""
1287

    
1288
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1289
        if offset == 0 and len(data) == self.block_size:
1290
            return self.put_block(data)
1291
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1292
        return binascii.hexlify(h)
1293

    
1294
    # Path functions.
1295

    
1296
    def _generate_uuid(self):
1297
        return str(uuidlib.uuid4())
1298

    
1299
    def _put_object_node(self, path, parent, name):
1300
        path = '/'.join((path, name))
1301
        node = self.node.node_lookup(path)
1302
        if node is None:
1303
            node = self.node.node_create(parent, path)
1304
        return path, node
1305

    
1306
    def _put_path(self, user, parent, path,
1307
                  update_statistics_ancestors_depth=None):
1308
        node = self.node.node_create(parent, path)
1309
        self.node.version_create(node, None, 0, '', None, user,
1310
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1311
                                 update_statistics_ancestors_depth)
1312
        return node
1313

    
1314
    def _lookup_account(self, account, create=True):
1315
        for_update = True if create else False
1316
        node = self.node.node_lookup(account, for_update=for_update)
1317
        if node is None and create:
1318
            node = self._put_path(
1319
                account, self.ROOTNODE, account,
1320
                update_statistics_ancestors_depth=-1)  # User is account.
1321
        return account, node
1322

    
1323
    def _lookup_container(self, account, container):
1324
        for_update = True if self.lock_container_path else False
1325
        path = '/'.join((account, container))
1326
        node = self.node.node_lookup(path, for_update)
1327
        if node is None:
1328
            raise ItemNotExists('Container does not exist')
1329
        return path, node
1330

    
1331
    def _lookup_object(self, account, container, name):
1332
        path = '/'.join((account, container, name))
1333
        node = self.node.node_lookup(path)
1334
        if node is None:
1335
            raise ItemNotExists('Object does not exist')
1336
        return path, node
1337

    
1338
    def _lookup_objects(self, paths):
1339
        nodes = self.node.node_lookup_bulk(paths)
1340
        return paths, nodes
1341

    
1342
    def _get_properties(self, node, until=None):
1343
        """Return properties until the timestamp given."""
1344

    
1345
        before = until if until is not None else inf
1346
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1347
        if props is None and until is not None:
1348
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1349
        if props is None:
1350
            raise ItemNotExists('Path does not exist')
1351
        return props
1352

    
1353
    def _get_statistics(self, node, until=None, compute=False):
1354
        """Return (count, sum of size, timestamp) of everything under node."""
1355

    
1356
        if until is not None:
1357
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1358
        elif compute:
1359
            stats = self.node.statistics_latest(node,
1360
                                                except_cluster=CLUSTER_DELETED)
1361
        else:
1362
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1363
        if stats is None:
1364
            stats = (0, 0, 0)
1365
        return stats
1366

    
1367
    def _get_version(self, node, version=None):
1368
        if version is None:
1369
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1370
            if props is None:
1371
                raise ItemNotExists('Object does not exist')
1372
        else:
1373
            try:
1374
                version = int(version)
1375
            except ValueError:
1376
                raise VersionNotExists('Version does not exist')
1377
            props = self.node.version_get_properties(version, node=node)
1378
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1379
                raise VersionNotExists('Version does not exist')
1380
        return props
1381

    
1382
    def _get_versions(self, nodes):
1383
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1384

    
1385
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1386
                               type=None, hash=None, checksum=None,
1387
                               cluster=CLUSTER_NORMAL, is_copy=False,
1388
                               update_statistics_ancestors_depth=None):
1389
        """Create a new version of the node."""
1390

    
1391
        props = self.node.version_lookup(
1392
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1393
        if props is not None:
1394
            src_version_id = props[self.SERIAL]
1395
            src_hash = props[self.HASH]
1396
            src_size = props[self.SIZE]
1397
            src_type = props[self.TYPE]
1398
            src_checksum = props[self.CHECKSUM]
1399
        else:
1400
            src_version_id = None
1401
            src_hash = None
1402
            src_size = 0
1403
            src_type = ''
1404
            src_checksum = ''
1405
        if size is None:  # Set metadata.
1406
            hash = src_hash  # This way hash can be set to None
1407
                             # (account or container).
1408
            size = src_size
1409
        if type is None:
1410
            type = src_type
1411
        if checksum is None:
1412
            checksum = src_checksum
1413
        uuid = self._generate_uuid(
1414
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1415

    
1416
        if src_node is None:
1417
            pre_version_id = src_version_id
1418
        else:
1419
            pre_version_id = None
1420
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1421
            if props is not None:
1422
                pre_version_id = props[self.SERIAL]
1423
        if pre_version_id is not None:
1424
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1425
                                        update_statistics_ancestors_depth)
1426

    
1427
        dest_version_id, mtime = self.node.version_create(
1428
            node, hash, size, type, src_version_id, user, uuid, checksum,
1429
            cluster, update_statistics_ancestors_depth)
1430

    
1431
        self.node.attribute_unset_is_latest(node, dest_version_id)
1432

    
1433
        return pre_version_id, dest_version_id
1434

    
1435
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1436
                                node, meta, replace=False):
1437
        if src_version_id is not None:
1438
            self.node.attribute_copy(src_version_id, dest_version_id)
1439
        if not replace:
1440
            self.node.attribute_del(dest_version_id, domain, (
1441
                k for k, v in meta.iteritems() if v == ''))
1442
            self.node.attribute_set(dest_version_id, domain, node, (
1443
                (k, v) for k, v in meta.iteritems() if v != ''))
1444
        else:
1445
            self.node.attribute_del(dest_version_id, domain)
1446
            self.node.attribute_set(dest_version_id, domain, node, ((
1447
                k, v) for k, v in meta.iteritems()))
1448

    
1449
    def _put_metadata(self, user, node, domain, meta, replace=False,
1450
                      update_statistics_ancestors_depth=None):
1451
        """Create a new version and store metadata."""
1452

    
1453
        src_version_id, dest_version_id = self._put_version_duplicate(
1454
            user, node,
1455
            update_statistics_ancestors_depth=
1456
            update_statistics_ancestors_depth)
1457
        self._put_metadata_duplicate(
1458
            src_version_id, dest_version_id, domain, node, meta, replace)
1459
        return src_version_id, dest_version_id
1460

    
1461
    def _list_limits(self, listing, marker, limit):
1462
        start = 0
1463
        if marker:
1464
            try:
1465
                start = listing.index(marker) + 1
1466
            except ValueError:
1467
                pass
1468
        if not limit or limit > 10000:
1469
            limit = 10000
1470
        return start, limit
1471

    
1472
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1473
                                marker=None, limit=10000, virtual=True,
1474
                                domain=None, keys=None, until=None,
1475
                                size_range=None, allowed=None,
1476
                                all_props=False):
1477
        keys = keys or []
1478
        allowed = allowed or []
1479
        cont_prefix = path + '/'
1480
        prefix = cont_prefix + prefix
1481
        start = cont_prefix + marker if marker else None
1482
        before = until if until is not None else inf
1483
        filterq = keys if domain else []
1484
        sizeq = size_range
1485

    
1486
        objects, prefixes = self.node.latest_version_list(
1487
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1488
            allowed, domain, filterq, sizeq, all_props)
1489
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1490
        objects.sort(key=lambda x: x[0])
1491
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1492
        return objects
1493

    
1494
    # Reporting functions.
1495

    
1496
    @debug_method
1497
    def _report_size_change(self, user, account, size, details=None):
1498
        details = details or {}
1499

    
1500
        if size == 0:
1501
            return
1502

    
1503
        account_node = self._lookup_account(account, True)[1]
1504
        total = self._get_statistics(account_node, compute=True)[1]
1505
        details.update({'user': user, 'total': total})
1506
        self.messages.append(
1507
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1508
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1509

    
1510
        if not self.using_external_quotaholder:
1511
            return
1512

    
1513
        try:
1514
            name = details['path'] if 'path' in details else ''
1515
            serial = self.astakosclient.issue_one_commission(
1516
                token=self.service_token,
1517
                holder=account,
1518
                source=DEFAULT_SOURCE,
1519
                provisions={'pithos.diskspace': size},
1520
                name=name)
1521
        except BaseException, e:
1522
            raise QuotaError(e)
1523
        else:
1524
            self.serials.append(serial)
1525

    
1526
    @debug_method
1527
    def _report_object_change(self, user, account, path, details=None):
1528
        details = details or {}
1529
        details.update({'user': user})
1530
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1531
                              account, QUEUE_INSTANCE_ID, 'object', path,
1532
                              details))
1533

    
1534
    @debug_method
1535
    def _report_sharing_change(self, user, account, path, details=None):
1536
        details = details or {}
1537
        details.update({'user': user})
1538
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1539
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1540
                              details))
1541

    
1542
    # Policy functions.
1543

    
1544
    def _check_policy(self, policy, is_account_policy=True):
1545
        default_policy = self.default_account_policy \
1546
            if is_account_policy else self.default_container_policy
1547
        for k in policy.keys():
1548
            if policy[k] == '':
1549
                policy[k] = default_policy.get(k)
1550
        for k, v in policy.iteritems():
1551
            if k == 'quota':
1552
                q = int(v)  # May raise ValueError.
1553
                if q < 0:
1554
                    raise ValueError
1555
            elif k == 'versioning':
1556
                if v not in ['auto', 'none']:
1557
                    raise ValueError
1558
            else:
1559
                raise ValueError
1560

    
1561
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1562
        default_policy = self.default_account_policy \
1563
            if is_account_policy else self.default_container_policy
1564
        if replace:
1565
            for k, v in default_policy.iteritems():
1566
                if k not in policy:
1567
                    policy[k] = v
1568
        self.node.policy_set(node, policy)
1569

    
1570
    def _get_policy(self, node, is_account_policy=True):
1571
        default_policy = self.default_account_policy \
1572
            if is_account_policy else self.default_container_policy
1573
        policy = default_policy.copy()
1574
        policy.update(self.node.policy_get(node))
1575
        return policy
1576

    
1577
    def _apply_versioning(self, account, container, version_id,
1578
                          update_statistics_ancestors_depth=None):
1579
        """Delete the provided version if such is the policy.
1580
           Return size of object removed.
1581
        """
1582

    
1583
        if version_id is None:
1584
            return 0
1585
        path, node = self._lookup_container(account, container)
1586
        versioning = self._get_policy(
1587
            node, is_account_policy=False)['versioning']
1588
        if versioning != 'auto':
1589
            hash, size = self.node.version_remove(
1590
                version_id, update_statistics_ancestors_depth)
1591
            self.store.map_delete(hash)
1592
            return size
1593
        elif self.free_versioning:
1594
            return self.node.version_get_properties(
1595
                version_id, keys=('size',))[0]
1596
        return 0
1597

    
1598
    # Access control functions.
1599

    
1600
    def _check_groups(self, groups):
1601
        # raise ValueError('Bad characters in groups')
1602
        pass
1603

    
1604
    def _check_permissions(self, path, permissions):
1605
        # raise ValueError('Bad characters in permissions')
1606
        pass
1607

    
1608
    def _get_formatted_paths(self, paths):
1609
        formatted = []
1610
        for p in paths:
1611
            node = self.node.node_lookup(p)
1612
            props = None
1613
            if node is not None:
1614
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1615
            if props is not None:
1616
                if props[self.TYPE].split(';', 1)[0].strip() in (
1617
                        'application/directory', 'application/folder'):
1618
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1619
                formatted.append((p, self.MATCH_EXACT))
1620
        return formatted
1621

    
1622
    def _get_permissions_path(self, account, container, name):
1623
        path = '/'.join((account, container, name))
1624
        permission_paths = self.permissions.access_inherit(path)
1625
        permission_paths.sort()
1626
        permission_paths.reverse()
1627
        for p in permission_paths:
1628
            if p == path:
1629
                return p
1630
            else:
1631
                if p.count('/') < 2:
1632
                    continue
1633
                node = self.node.node_lookup(p)
1634
                props = None
1635
                if node is not None:
1636
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1637
                if props is not None:
1638
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1639
                            'application/directory', 'application/folder'):
1640
                        return p
1641
        return None
1642

    
1643
    def _can_read(self, user, account, container, name):
1644
        if user == account:
1645
            return True
1646
        path = '/'.join((account, container, name))
1647
        if self.permissions.public_get(path) is not None:
1648
            return True
1649
        path = self._get_permissions_path(account, container, name)
1650
        if not path:
1651
            raise NotAllowedError
1652
        if (not self.permissions.access_check(path, self.READ, user) and not
1653
                self.permissions.access_check(path, self.WRITE, user)):
1654
            raise NotAllowedError
1655

    
1656
    def _can_write(self, user, account, container, name):
1657
        if user == account:
1658
            return True
1659
        path = '/'.join((account, container, name))
1660
        path = self._get_permissions_path(account, container, name)
1661
        if not path:
1662
            raise NotAllowedError
1663
        if not self.permissions.access_check(path, self.WRITE, user):
1664
            raise NotAllowedError
1665

    
1666
    def _allowed_accounts(self, user):
1667
        allow = set()
1668
        for path in self.permissions.access_list_paths(user):
1669
            allow.add(path.split('/', 1)[0])
1670
        return sorted(allow)
1671

    
1672
    def _allowed_containers(self, user, account):
1673
        allow = set()
1674
        for path in self.permissions.access_list_paths(user, account):
1675
            allow.add(path.split('/', 2)[1])
1676
        return sorted(allow)
1677

    
1678
    # Domain functions
1679

    
1680
    @debug_method
1681
    def get_domain_objects(self, domain, user=None):
1682
        allowed_paths = self.permissions.access_list_paths(
1683
            user, include_owned=user is not None, include_containers=False)
1684
        if not allowed_paths:
1685
            return []
1686
        obj_list = self.node.domain_object_list(
1687
            domain, allowed_paths, CLUSTER_NORMAL)
1688
        return [(path,
1689
                 self._build_metadata(props, user_defined_meta),
1690
                 self.permissions.access_get(path)) for
1691
                path, props, user_defined_meta in obj_list]
1692

    
1693
    # util functions
1694

    
1695
    def _build_metadata(self, props, user_defined=None,
1696
                        include_user_defined=True):
1697
        meta = {'bytes': props[self.SIZE],
1698
                'type': props[self.TYPE],
1699
                'hash': props[self.HASH],
1700
                'version': props[self.SERIAL],
1701
                'version_timestamp': props[self.MTIME],
1702
                'modified_by': props[self.MUSER],
1703
                'uuid': props[self.UUID],
1704
                'checksum': props[self.CHECKSUM]}
1705
        if include_user_defined and user_defined is not None:
1706
            meta.update(user_defined)
1707
        return meta
1708

    
1709
    def _has_read_access(self, user, path):
1710
        try:
1711
            account, container, object = path.split('/', 2)
1712
        except ValueError:
1713
            raise ValueError('Invalid object path')
1714

    
1715
        assert isinstance(user, basestring), "Invalid user"
1716

    
1717
        try:
1718
            self._can_read(user, account, container, object)
1719
        except NotAllowedError:
1720
            return False
1721
        else:
1722
            return True
1723

    
1724
    def _exists(self, node):
1725
        try:
1726
            self._get_version(node)
1727
        except ItemNotExists:
1728
            return False
1729
        else:
1730
            return True