Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d47565d8

History | View | Annotate | Download (69.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.block_module = load_module(block_module)
213
        self.block_params = block_params
214
        params = {'path': block_path,
215
                  'block_size': self.block_size,
216
                  'hash_algorithm': self.hash_algorithm,
217
                  'umask': block_umask}
218
        params.update(self.block_params)
219
        self.store = self.block_module.Store(**params)
220

    
221
        if queue_module and queue_hosts:
222
            self.queue_module = load_module(queue_module)
223
            params = {'hosts': queue_hosts,
224
                      'exchange': queue_exchange,
225
                      'client_id': QUEUE_CLIENT_ID}
226
            self.queue = self.queue_module.Queue(**params)
227
        else:
228
            class NoQueue:
229
                def send(self, *args):
230
                    pass
231

    
232
                def close(self):
233
                    pass
234

    
235
            self.queue = NoQueue()
236

    
237
        self.astakos_url = astakos_url
238
        self.service_token = service_token
239

    
240
        if not astakos_url or not AstakosClient:
241
            self.astakosclient = DisabledAstakosClient(
242
                astakos_url,
243
                use_pool=True,
244
                pool_size=astakosclient_poolsize)
245
        else:
246
            self.astakosclient = AstakosClient(
247
                astakos_url,
248
                use_pool=True,
249
                pool_size=astakosclient_poolsize)
250

    
251
        self.serials = []
252
        self.messages = []
253

    
254
        self._move_object = partial(self._copy_object, is_move=True)
255

    
256
        self.lock_container_path = False
257

    
258
    def pre_exec(self, lock_container_path=False):
259
        self.lock_container_path = lock_container_path
260
        self.wrapper.execute()
261

    
262
    def post_exec(self, success_status=True):
263
        if success_status:
264
            # send messages produced
265
            for m in self.messages:
266
                self.queue.send(*m)
267

    
268
            # register serials
269
            if self.serials:
270
                self.commission_serials.insert_many(
271
                    self.serials)
272

    
273
                # commit to ensure that the serials are registered
274
                # even if resolve commission fails
275
                self.wrapper.commit()
276

    
277
                # start new transaction
278
                self.wrapper.execute()
279

    
280
                r = self.astakosclient.resolve_commissions(
281
                    token=self.service_token,
282
                    accept_serials=self.serials,
283
                    reject_serials=[])
284
                self.commission_serials.delete_many(
285
                    r['accepted'])
286

    
287
            self.wrapper.commit()
288
        else:
289
            if self.serials:
290
                self.astakosclient.resolve_commissions(
291
                    token=self.service_token,
292
                    accept_serials=[],
293
                    reject_serials=self.serials)
294
            self.wrapper.rollback()
295

    
296
    def close(self):
297
        self.wrapper.close()
298
        self.queue.close()
299

    
300
    @property
301
    def using_external_quotaholder(self):
302
        return not isinstance(self.astakosclient, DisabledAstakosClient)
303

    
304
    @debug_method
305
    def list_accounts(self, user, marker=None, limit=10000):
306
        """Return a list of accounts the user can access."""
307

    
308
        allowed = self._allowed_accounts(user)
309
        start, limit = self._list_limits(allowed, marker, limit)
310
        return allowed[start:start + limit]
311

    
312
    @debug_method
313
    def get_account_meta(
314
            self, user, account, domain, until=None, include_user_defined=True,
315
            external_quota=None):
316
        """Return a dictionary with the account metadata for the domain."""
317

    
318
        path, node = self._lookup_account(account, user == account)
319
        if user != account:
320
            if until or (node is None) or (account not
321
                                           in self._allowed_accounts(user)):
322
                raise NotAllowedError
323
        try:
324
            props = self._get_properties(node, until)
325
            mtime = props[self.MTIME]
326
        except NameError:
327
            props = None
328
            mtime = until
329
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
330
        tstamp = max(tstamp, mtime)
331
        if until is None:
332
            modified = tstamp
333
        else:
334
            modified = self._get_statistics(
335
                node, compute=True)[2]  # Overall last modification.
336
            modified = max(modified, mtime)
337

    
338
        if user != account:
339
            meta = {'name': account}
340
        else:
341
            meta = {}
342
            if props is not None and include_user_defined:
343
                meta.update(
344
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
345
            if until is not None:
346
                meta.update({'until_timestamp': tstamp})
347
            meta.update({'name': account, 'count': count, 'bytes': bytes})
348
            if self.using_external_quotaholder:
349
                external_quota = external_quota or {}
350
                meta['bytes'] = external_quota.get('usage', 0)
351
        meta.update({'modified': modified})
352
        return meta
353

    
354
    @debug_method
355
    def update_account_meta(self, user, account, domain, meta, replace=False):
356
        """Update the metadata associated with the account for the domain."""
357

    
358
        if user != account:
359
            raise NotAllowedError
360
        path, node = self._lookup_account(account, True)
361
        self._put_metadata(user, node, domain, meta, replace,
362
                           update_statistics_ancestors_depth=-1)
363

    
364
    @debug_method
365
    def get_account_groups(self, user, account):
366
        """Return a dictionary with the user groups defined for the account."""
367

    
368
        if user != account:
369
            if account not in self._allowed_accounts(user):
370
                raise NotAllowedError
371
            return {}
372
        self._lookup_account(account, True)
373
        return self.permissions.group_dict(account)
374

    
375
    @debug_method
376
    def update_account_groups(self, user, account, groups, replace=False):
377
        """Update the groups associated with the account."""
378

    
379
        if user != account:
380
            raise NotAllowedError
381
        self._lookup_account(account, True)
382
        self._check_groups(groups)
383
        if replace:
384
            self.permissions.group_destroy(account)
385
        for k, v in groups.iteritems():
386
            if not replace:  # If not already deleted.
387
                self.permissions.group_delete(account, k)
388
            if v:
389
                self.permissions.group_addmany(account, k, v)
390

    
391
    @debug_method
392
    def get_account_policy(self, user, account, external_quota=None):
393
        """Return a dictionary with the account policy."""
394

    
395
        if user != account:
396
            if account not in self._allowed_accounts(user):
397
                raise NotAllowedError
398
            return {}
399
        path, node = self._lookup_account(account, True)
400
        policy = self._get_policy(node, is_account_policy=True)
401
        if self.using_external_quotaholder:
402
            external_quota = external_quota or {}
403
            policy['quota'] = external_quota.get('limit', 0)
404
        return policy
405

    
406
    @debug_method
407
    def update_account_policy(self, user, account, policy, replace=False):
408
        """Update the policy associated with the account."""
409

    
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_account(account, True)
413
        self._check_policy(policy, is_account_policy=True)
414
        self._put_policy(node, policy, replace, is_account_policy=True)
415

    
416
    @debug_method
417
    def put_account(self, user, account, policy=None):
418
        """Create a new account with the given name."""
419

    
420
        policy = policy or {}
421
        if user != account:
422
            raise NotAllowedError
423
        node = self.node.node_lookup(account)
424
        if node is not None:
425
            raise AccountExists('Account already exists')
426
        if policy:
427
            self._check_policy(policy, is_account_policy=True)
428
        node = self._put_path(user, self.ROOTNODE, account,
429
                              update_statistics_ancestors_depth=-1)
430
        self._put_policy(node, policy, True, is_account_policy=True)
431

    
432
    @debug_method
433
    def delete_account(self, user, account):
434
        """Delete the account with the given name."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        node = self.node.node_lookup(account)
439
        if node is None:
440
            return
441
        if not self.node.node_remove(node,
442
                                     update_statistics_ancestors_depth=-1):
443
            raise AccountNotEmpty('Account is not empty')
444
        self.permissions.group_destroy(account)
445

    
446
    @debug_method
447
    def list_containers(self, user, account, marker=None, limit=10000,
448
                        shared=False, until=None, public=False):
449
        """Return a list of containers existing under an account."""
450

    
451
        if user != account:
452
            if until or account not in self._allowed_accounts(user):
453
                raise NotAllowedError
454
            allowed = self._allowed_containers(user, account)
455
            start, limit = self._list_limits(allowed, marker, limit)
456
            return allowed[start:start + limit]
457
        if shared or public:
458
            allowed = set()
459
            if shared:
460
                allowed.update([x.split('/', 2)[1] for x in
461
                               self.permissions.access_list_shared(account)])
462
            if public:
463
                allowed.update([x[0].split('/', 2)[1] for x in
464
                               self.permissions.public_list(account)])
465
            allowed = sorted(allowed)
466
            start, limit = self._list_limits(allowed, marker, limit)
467
            return allowed[start:start + limit]
468
        node = self.node.node_lookup(account)
469
        containers = [x[0] for x in self._list_object_properties(
470
            node, account, '', '/', marker, limit, False, None, [], until)]
471
        start, limit = self._list_limits(
472
            [x[0] for x in containers], marker, limit)
473
        return containers[start:start + limit]
474

    
475
    @debug_method
476
    def list_container_meta(self, user, account, container, domain,
477
                            until=None):
478
        """Return a list of the container's object meta keys for a domain."""
479

    
480
        allowed = []
481
        if user != account:
482
            if until:
483
                raise NotAllowedError
484
            allowed = self.permissions.access_list_paths(
485
                user, '/'.join((account, container)))
486
            if not allowed:
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        before = until if until is not None else inf
490
        allowed = self._get_formatted_paths(allowed)
491
        return self.node.latest_attribute_keys(node, domain, before,
492
                                               CLUSTER_DELETED, allowed)
493

    
494
    @debug_method
495
    def get_container_meta(self, user, account, container, domain, until=None,
496
                           include_user_defined=True):
497
        """Return a dictionary with the container metadata for the domain."""
498

    
499
        if user != account:
500
            if until or container not in self._allowed_containers(user,
501
                                                                  account):
502
                raise NotAllowedError
503
        path, node = self._lookup_container(account, container)
504
        props = self._get_properties(node, until)
505
        mtime = props[self.MTIME]
506
        count, bytes, tstamp = self._get_statistics(node, until)
507
        tstamp = max(tstamp, mtime)
508
        if until is None:
509
            modified = tstamp
510
        else:
511
            modified = self._get_statistics(
512
                node)[2]  # Overall last modification.
513
            modified = max(modified, mtime)
514

    
515
        if user != account:
516
            meta = {'name': container}
517
        else:
518
            meta = {}
519
            if include_user_defined:
520
                meta.update(
521
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
522
            if until is not None:
523
                meta.update({'until_timestamp': tstamp})
524
            meta.update({'name': container, 'count': count, 'bytes': bytes})
525
        meta.update({'modified': modified})
526
        return meta
527

    
528
    @debug_method
529
    def update_container_meta(self, user, account, container, domain, meta,
530
                              replace=False):
531
        """Update the metadata associated with the container for the domain."""
532

    
533
        if user != account:
534
            raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536
        src_version_id, dest_version_id = self._put_metadata(
537
            user, node, domain, meta, replace,
538
            update_statistics_ancestors_depth=0)
539
        if src_version_id is not None:
540
            versioning = self._get_policy(
541
                node, is_account_policy=False)['versioning']
542
            if versioning != 'auto':
543
                self.node.version_remove(src_version_id,
544
                                         update_statistics_ancestors_depth=0)
545

    
546
    @debug_method
547
    def get_container_policy(self, user, account, container):
548
        """Return a dictionary with the container policy."""
549

    
550
        if user != account:
551
            if container not in self._allowed_containers(user, account):
552
                raise NotAllowedError
553
            return {}
554
        path, node = self._lookup_container(account, container)
555
        return self._get_policy(node, is_account_policy=False)
556

    
557
    @debug_method
558
    def update_container_policy(self, user, account, container, policy,
559
                                replace=False):
560
        """Update the policy associated with the container."""
561

    
562
        if user != account:
563
            raise NotAllowedError
564
        path, node = self._lookup_container(account, container)
565
        self._check_policy(policy, is_account_policy=False)
566
        self._put_policy(node, policy, replace, is_account_policy=False)
567

    
568
    @debug_method
569
    def put_container(self, user, account, container, policy=None):
570
        """Create a new container with the given name."""
571

    
572
        policy = policy or {}
573
        if user != account:
574
            raise NotAllowedError
575
        try:
576
            path, node = self._lookup_container(account, container)
577
        except NameError:
578
            pass
579
        else:
580
            raise ContainerExists('Container already exists')
581
        if policy:
582
            self._check_policy(policy, is_account_policy=False)
583
        path = '/'.join((account, container))
584
        node = self._put_path(
585
            user, self._lookup_account(account, True)[1], path,
586
            update_statistics_ancestors_depth=-1)
587
        self._put_policy(node, policy, True, is_account_policy=False)
588

    
589
    @debug_method
590
    def delete_container(self, user, account, container, until=None, prefix='',
591
                         delimiter=None):
592
        """Delete/purge the container with the given name."""
593

    
594
        if user != account:
595
            raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597

    
598
        if until is not None:
599
            hashes, size, serials = self.node.node_purge_children(
600
                node, until, CLUSTER_HISTORY,
601
                update_statistics_ancestors_depth=0)
602
            for h in hashes:
603
                self.store.map_delete(h)
604
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
605
                                          update_statistics_ancestors_depth=0)
606
            if not self.free_versioning:
607
                self._report_size_change(
608
                    user, account, -size, {
609
                        'action': 'container purge',
610
                        'path': path,
611
                        'versions': ','.join(str(i) for i in serials)
612
                    }
613
                )
614
            return
615

    
616
        if not delimiter:
617
            if self._get_statistics(node)[0] > 0:
618
                raise ContainerNotEmpty('Container is not empty')
619
            hashes, size, serials = self.node.node_purge_children(
620
                node, inf, CLUSTER_HISTORY,
621
                update_statistics_ancestors_depth=0)
622
            for h in hashes:
623
                self.store.map_delete(h)
624
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
625
                                          update_statistics_ancestors_depth=0)
626
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
627
            if not self.free_versioning:
628
                self._report_size_change(
629
                    user, account, -size, {
630
                        'action': 'container purge',
631
                        'path': path,
632
                        'versions': ','.join(str(i) for i in serials)
633
                    }
634
                )
635
        else:
636
            # remove only contents
637
            src_names = self._list_objects_no_limit(
638
                user, account, container, prefix='', delimiter=None,
639
                virtual=False, domain=None, keys=[], shared=False, until=None,
640
                size_range=None, all_props=True, public=False)
641
            paths = []
642
            for t in src_names:
643
                path = '/'.join((account, container, t[0]))
644
                node = t[2]
645
                if not self._exists(node):
646
                    continue
647
                src_version_id, dest_version_id = self._put_version_duplicate(
648
                    user, node, size=0, type='', hash=None, checksum='',
649
                    cluster=CLUSTER_DELETED,
650
                    update_statistics_ancestors_depth=1)
651
                del_size = self._apply_versioning(
652
                    account, container, src_version_id,
653
                    update_statistics_ancestors_depth=1)
654
                self._report_size_change(
655
                    user, account, -del_size, {
656
                        'action': 'object delete',
657
                        'path': path,
658
                        'versions': ','.join([str(dest_version_id)])})
659
                self._report_object_change(
660
                    user, account, path, details={'action': 'object delete'})
661
                paths.append(path)
662
            self.permissions.access_clear_bulk(paths)
663

    
664
    def _list_objects(self, user, account, container, prefix, delimiter,
665
                      marker, limit, virtual, domain, keys, shared, until,
666
                      size_range, all_props, public):
667
        if user != account and until:
668
            raise NotAllowedError
669
        if shared and public:
670
            # get shared first
671
            shared_paths = self._list_object_permissions(
672
                user, account, container, prefix, shared=True, public=False)
673
            objects = set()
674
            if shared_paths:
675
                path, node = self._lookup_container(account, container)
676
                shared_paths = self._get_formatted_paths(shared_paths)
677
                objects |= set(self._list_object_properties(
678
                    node, path, prefix, delimiter, marker, limit, virtual,
679
                    domain, keys, until, size_range, shared_paths, all_props))
680

    
681
            # get public
682
            objects |= set(self._list_public_object_properties(
683
                user, account, container, prefix, all_props))
684
            objects = list(objects)
685

    
686
            objects.sort(key=lambda x: x[0])
687
            start, limit = self._list_limits(
688
                [x[0] for x in objects], marker, limit)
689
            return objects[start:start + limit]
690
        elif public:
691
            objects = self._list_public_object_properties(
692
                user, account, container, prefix, all_props)
693
            start, limit = self._list_limits(
694
                [x[0] for x in objects], marker, limit)
695
            return objects[start:start + limit]
696

    
697
        allowed = self._list_object_permissions(
698
            user, account, container, prefix, shared, public)
699
        if shared and not allowed:
700
            return []
701
        path, node = self._lookup_container(account, container)
702
        allowed = self._get_formatted_paths(allowed)
703
        objects = self._list_object_properties(
704
            node, path, prefix, delimiter, marker, limit, virtual, domain,
705
            keys, until, size_range, allowed, all_props)
706
        start, limit = self._list_limits(
707
            [x[0] for x in objects], marker, limit)
708
        return objects[start:start + limit]
709

    
710
    def _list_public_object_properties(self, user, account, container, prefix,
711
                                       all_props):
712
        public = self._list_object_permissions(
713
            user, account, container, prefix, shared=False, public=True)
714
        paths, nodes = self._lookup_objects(public)
715
        path = '/'.join((account, container))
716
        cont_prefix = path + '/'
717
        paths = [x[len(cont_prefix):] for x in paths]
718
        objects = [(p,) + props for p, props in
719
                   zip(paths, self.node.version_lookup_bulk(
720
                       nodes, all_props=all_props))]
721
        return objects
722

    
723
    def _list_objects_no_limit(self, user, account, container, prefix,
724
                               delimiter, virtual, domain, keys, shared, until,
725
                               size_range, all_props, public):
726
        objects = []
727
        while True:
728
            marker = objects[-1] if objects else None
729
            limit = 10000
730
            l = self._list_objects(
731
                user, account, container, prefix, delimiter, marker, limit,
732
                virtual, domain, keys, shared, until, size_range, all_props,
733
                public)
734
            objects.extend(l)
735
            if not l or len(l) < limit:
736
                break
737
        return objects
738

    
739
    def _list_object_permissions(self, user, account, container, prefix,
740
                                 shared, public):
741
        allowed = []
742
        path = '/'.join((account, container, prefix)).rstrip('/')
743
        if user != account:
744
            allowed = self.permissions.access_list_paths(user, path)
745
            if not allowed:
746
                raise NotAllowedError
747
        else:
748
            allowed = set()
749
            if shared:
750
                allowed.update(self.permissions.access_list_shared(path))
751
            if public:
752
                allowed.update(
753
                    [x[0] for x in self.permissions.public_list(path)])
754
            allowed = sorted(allowed)
755
            if not allowed:
756
                return []
757
        return allowed
758

    
759
    @debug_method
760
    def list_objects(self, user, account, container, prefix='', delimiter=None,
761
                     marker=None, limit=10000, virtual=True, domain=None,
762
                     keys=None, shared=False, until=None, size_range=None,
763
                     public=False):
764
        """List (object name, object version_id) under a container."""
765

    
766
        keys = keys or []
767
        return self._list_objects(
768
            user, account, container, prefix, delimiter, marker, limit,
769
            virtual, domain, keys, shared, until, size_range, False, public)
770

    
771
    @debug_method
772
    def list_object_meta(self, user, account, container, prefix='',
773
                         delimiter=None, marker=None, limit=10000,
774
                         virtual=True, domain=None, keys=None, shared=False,
775
                         until=None, size_range=None, public=False):
776
        """Return a list of metadata dicts of objects under a container."""
777

    
778
        keys = keys or []
779
        props = self._list_objects(
780
            user, account, container, prefix, delimiter, marker, limit,
781
            virtual, domain, keys, shared, until, size_range, True, public)
782
        objects = []
783
        for p in props:
784
            if len(p) == 2:
785
                objects.append({'subdir': p[0]})
786
            else:
787
                objects.append({
788
                    'name': p[0],
789
                    'bytes': p[self.SIZE + 1],
790
                    'type': p[self.TYPE + 1],
791
                    'hash': p[self.HASH + 1],
792
                    'version': p[self.SERIAL + 1],
793
                    'version_timestamp': p[self.MTIME + 1],
794
                    'modified': p[self.MTIME + 1] if until is None else None,
795
                    'modified_by': p[self.MUSER + 1],
796
                    'uuid': p[self.UUID + 1],
797
                    'checksum': p[self.CHECKSUM + 1]})
798
        return objects
799

    
800
    @debug_method
801
    def list_object_permissions(self, user, account, container, prefix=''):
802
        """Return a list of paths enforce permissions under a container."""
803

    
804
        return self._list_object_permissions(user, account, container, prefix,
805
                                             True, False)
806

    
807
    @debug_method
808
    def list_object_public(self, user, account, container, prefix=''):
809
        """Return a mapping of object paths to public ids under a container."""
810

    
811
        public = {}
812
        for path, p in self.permissions.public_list('/'.join((account,
813
                                                              container,
814
                                                              prefix))):
815
            public[path] = p
816
        return public
817

    
818
    @debug_method
819
    def get_object_meta(self, user, account, container, name, domain,
820
                        version=None, include_user_defined=True):
821
        """Return a dictionary with the object metadata for the domain."""
822

    
823
        self._can_read(user, account, container, name)
824
        path, node = self._lookup_object(account, container, name)
825
        props = self._get_version(node, version)
826
        if version is None:
827
            modified = props[self.MTIME]
828
        else:
829
            try:
830
                modified = self._get_version(
831
                    node)[self.MTIME]  # Overall last modification.
832
            except NameError:  # Object may be deleted.
833
                del_props = self.node.version_lookup(
834
                    node, inf, CLUSTER_DELETED)
835
                if del_props is None:
836
                    raise ItemNotExists('Object does not exist')
837
                modified = del_props[self.MTIME]
838

    
839
        meta = {}
840
        if include_user_defined:
841
            meta.update(
842
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
843
        meta.update({'name': name,
844
                     'bytes': props[self.SIZE],
845
                     'type': props[self.TYPE],
846
                     'hash': props[self.HASH],
847
                     'version': props[self.SERIAL],
848
                     'version_timestamp': props[self.MTIME],
849
                     'modified': modified,
850
                     'modified_by': props[self.MUSER],
851
                     'uuid': props[self.UUID],
852
                     'checksum': props[self.CHECKSUM]})
853
        return meta
854

    
855
    @debug_method
856
    def update_object_meta(self, user, account, container, name, domain, meta,
857
                           replace=False):
858
        """Update object metadata for a domain and return the new version."""
859

    
860
        self._can_write(user, account, container, name)
861

    
862
        path, node = self._lookup_object(account, container, name,
863
                                         lock_container=True)
864
        src_version_id, dest_version_id = self._put_metadata(
865
            user, node, domain, meta, replace,
866
            update_statistics_ancestors_depth=1)
867
        self._apply_versioning(account, container, src_version_id,
868
                               update_statistics_ancestors_depth=1)
869
        return dest_version_id
870

    
871
    @debug_method
872
    def get_object_permissions(self, user, account, container, name):
873
        """Return the action allowed on the object, the path
874
        from which the object gets its permissions from,
875
        along with a dictionary containing the permissions."""
876

    
877
        allowed = 'write'
878
        permissions_path = self._get_permissions_path(account, container, name)
879
        if user != account:
880
            if self.permissions.access_check(permissions_path, self.WRITE,
881
                                             user):
882
                allowed = 'write'
883
            elif self.permissions.access_check(permissions_path, self.READ,
884
                                               user):
885
                allowed = 'read'
886
            else:
887
                raise NotAllowedError
888
        self._lookup_object(account, container, name)
889
        return (allowed,
890
                permissions_path,
891
                self.permissions.access_get(permissions_path))
892

    
893
    @debug_method
894
    def update_object_permissions(self, user, account, container, name,
895
                                  permissions):
896
        """Update the permissions associated with the object."""
897

    
898
        if user != account:
899
            raise NotAllowedError
900
        path = self._lookup_object(account, container, name,
901
                                   lock_container=True)[0]
902
        self._check_permissions(path, permissions)
903
        self.permissions.access_set(path, permissions)
904
        self._report_sharing_change(user, account, path, {'members':
905
                                    self.permissions.access_members(path)})
906

    
907
    @debug_method
908
    def get_object_public(self, user, account, container, name):
909
        """Return the public id of the object if applicable."""
910

    
911
        self._can_read(user, account, container, name)
912
        path = self._lookup_object(account, container, name)[0]
913
        p = self.permissions.public_get(path)
914
        return p
915

    
916
    @debug_method
917
    def update_object_public(self, user, account, container, name, public):
918
        """Update the public status of the object."""
919

    
920
        self._can_write(user, account, container, name)
921
        path = self._lookup_object(account, container, name,
922
                                   lock_container=True)[0]
923
        if not public:
924
            self.permissions.public_unset(path)
925
        else:
926
            self.permissions.public_set(
927
                path, self.public_url_security, self.public_url_alphabet)
928

    
929
    @debug_method
930
    def get_object_hashmap(self, user, account, container, name, version=None):
931
        """Return the object's size and a list with partial hashes."""
932

    
933
        self._can_read(user, account, container, name)
934
        path, node = self._lookup_object(account, container, name)
935
        props = self._get_version(node, version)
936
        if props[self.HASH] is None:
937
            return 0, ()
938
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
939
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
940

    
941
    def _update_object_hash(self, user, account, container, name, size, type,
942
                            hash, checksum, domain, meta, replace_meta,
943
                            permissions, src_node=None, src_version_id=None,
944
                            is_copy=False, report_size_change=True):
945
        if permissions is not None and user != account:
946
            raise NotAllowedError
947
        self._can_write(user, account, container, name)
948
        if permissions is not None:
949
            path = '/'.join((account, container, name))
950
            self._check_permissions(path, permissions)
951

    
952
        account_path, account_node = self._lookup_account(account, True)
953
        container_path, container_node = self._lookup_container(
954
            account, container)
955

    
956
        path, node = self._put_object_node(
957
            container_path, container_node, name)
958
        pre_version_id, dest_version_id = self._put_version_duplicate(
959
            user, node, src_node=src_node, size=size, type=type, hash=hash,
960
            checksum=checksum, is_copy=is_copy,
961
            update_statistics_ancestors_depth=1)
962

    
963
        # Handle meta.
964
        if src_version_id is None:
965
            src_version_id = pre_version_id
966
        self._put_metadata_duplicate(
967
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
968

    
969
        del_size = self._apply_versioning(account, container, pre_version_id,
970
                                          update_statistics_ancestors_depth=1)
971
        size_delta = size - del_size
972
        if size_delta > 0:
973
            # Check account quota.
974
            if not self.using_external_quotaholder:
975
                account_quota = long(self._get_policy(
976
                    account_node, is_account_policy=True)['quota'])
977
                account_usage = self._get_statistics(account_node,
978
                                                     compute=True)[1]
979
                if (account_quota > 0 and account_usage > account_quota):
980
                    raise QuotaError(
981
                        'Account quota exceeded: limit: %s, usage: %s' % (
982
                            account_quota, account_usage))
983

    
984
            # Check container quota.
985
            container_quota = long(self._get_policy(
986
                container_node, is_account_policy=False)['quota'])
987
            container_usage = self._get_statistics(container_node)[1]
988
            if (container_quota > 0 and container_usage > container_quota):
989
                # This must be executed in a transaction, so the version is
990
                # never created if it fails.
991
                raise QuotaError(
992
                    'Container quota exceeded: limit: %s, usage: %s' % (
993
                        container_quota, container_usage
994
                    )
995
                )
996

    
997
        if report_size_change:
998
            self._report_size_change(
999
                user, account, size_delta,
1000
                {'action': 'object update', 'path': path,
1001
                 'versions': ','.join([str(dest_version_id)])})
1002
        if permissions is not None:
1003
            self.permissions.access_set(path, permissions)
1004
            self._report_sharing_change(
1005
                user, account, path,
1006
                {'members': self.permissions.access_members(path)})
1007

    
1008
        self._report_object_change(
1009
            user, account, path,
1010
            details={'version': dest_version_id, 'action': 'object update'})
1011
        return dest_version_id
1012

    
1013
    @debug_method
1014
    def update_object_hashmap(self, user, account, container, name, size, type,
1015
                              hashmap, checksum, domain, meta=None,
1016
                              replace_meta=False, permissions=None):
1017
        """Create/update an object's hashmap and return the new version."""
1018

    
1019
        meta = meta or {}
1020
        if size == 0:  # No such thing as an empty hashmap.
1021
            hashmap = [self.put_block('')]
1022
        map = HashMap(self.block_size, self.hash_algorithm)
1023
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1024
        missing = self.store.block_search(map)
1025
        if missing:
1026
            ie = IndexError()
1027
            ie.data = [binascii.hexlify(x) for x in missing]
1028
            raise ie
1029

    
1030
        hash = map.hash()
1031
        hexlified = binascii.hexlify(hash)
1032
        dest_version_id = self._update_object_hash(
1033
            user, account, container, name, size, type, hexlified, checksum,
1034
            domain, meta, replace_meta, permissions)
1035
        self.store.map_put(hash, map)
1036
        return dest_version_id, hexlified
1037

    
1038
    @debug_method
1039
    def update_object_checksum(self, user, account, container, name, version,
1040
                               checksum):
1041
        """Update an object's checksum."""
1042

    
1043
        # Update objects with greater version and same hashmap
1044
        # and size (fix metadata updates).
1045
        self._can_write(user, account, container, name)
1046
        path, node = self._lookup_object(account, container, name,
1047
                                         lock_container=True)
1048
        props = self._get_version(node, version)
1049
        versions = self.node.node_get_versions(node)
1050
        for x in versions:
1051
            if (x[self.SERIAL] >= int(version) and
1052
                x[self.HASH] == props[self.HASH] and
1053
                    x[self.SIZE] == props[self.SIZE]):
1054
                self.node.version_put_property(
1055
                    x[self.SERIAL], 'checksum', checksum)
1056

    
1057
    def _copy_object(self, user, src_account, src_container, src_name,
1058
                     dest_account, dest_container, dest_name, type,
1059
                     dest_domain=None, dest_meta=None, replace_meta=False,
1060
                     permissions=None, src_version=None, is_move=False,
1061
                     delimiter=None):
1062

    
1063
        report_size_change = not is_move
1064
        dest_meta = dest_meta or {}
1065
        dest_version_ids = []
1066
        self._can_read(user, src_account, src_container, src_name)
1067
        path, node = self._lookup_object(src_account, src_container, src_name,
1068
                                         lock_container=True)
1069
        # TODO: Will do another fetch of the properties in duplicate version...
1070
        props = self._get_version(
1071
            node, src_version)  # Check to see if source exists.
1072
        src_version_id = props[self.SERIAL]
1073
        hash = props[self.HASH]
1074
        size = props[self.SIZE]
1075
        is_copy = not is_move and (src_account, src_container, src_name) != (
1076
            dest_account, dest_container, dest_name)  # New uuid.
1077
        dest_version_ids.append(self._update_object_hash(
1078
            user, dest_account, dest_container, dest_name, size, type, hash,
1079
            None, dest_domain, dest_meta, replace_meta, permissions,
1080
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1081
            report_size_change=report_size_change))
1082
        if is_move and ((src_account, src_container, src_name) !=
1083
                        (dest_account, dest_container, dest_name)):
1084
            self._delete_object(user, src_account, src_container, src_name,
1085
                                report_size_change=report_size_change)
1086

    
1087
        if delimiter:
1088
            prefix = (src_name + delimiter if not
1089
                      src_name.endswith(delimiter) else src_name)
1090
            src_names = self._list_objects_no_limit(
1091
                user, src_account, src_container, prefix, delimiter=None,
1092
                virtual=False, domain=None, keys=[], shared=False, until=None,
1093
                size_range=None, all_props=True, public=False)
1094
            src_names.sort(key=lambda x: x[2])  # order by nodes
1095
            paths = [elem[0] for elem in src_names]
1096
            nodes = [elem[2] for elem in src_names]
1097
            # TODO: Will do another fetch of the properties
1098
            # in duplicate version...
1099
            props = self._get_versions(nodes)  # Check to see if source exists.
1100

    
1101
            for prop, path, node in zip(props, paths, nodes):
1102
                src_version_id = prop[self.SERIAL]
1103
                hash = prop[self.HASH]
1104
                vtype = prop[self.TYPE]
1105
                size = prop[self.SIZE]
1106
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1107
                    delimiter) else dest_name
1108
                vdest_name = path.replace(prefix, dest_prefix, 1)
1109
                dest_version_ids.append(self._update_object_hash(
1110
                    user, dest_account, dest_container, vdest_name, size,
1111
                    vtype, hash, None, dest_domain, meta={},
1112
                    replace_meta=False, permissions=None, src_node=node,
1113
                    src_version_id=src_version_id, is_copy=is_copy))
1114
                if is_move and ((src_account, src_container, src_name) !=
1115
                                (dest_account, dest_container, dest_name)):
1116
                    self._delete_object(user, src_account, src_container, path)
1117
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1118
                dest_version_ids)
1119

    
1120
    @debug_method
1121
    def copy_object(self, user, src_account, src_container, src_name,
1122
                    dest_account, dest_container, dest_name, type, domain,
1123
                    meta=None, replace_meta=False, permissions=None,
1124
                    src_version=None, delimiter=None):
1125
        """Copy an object's data and metadata."""
1126

    
1127
        meta = meta or {}
1128
        dest_version_id = self._copy_object(
1129
            user, src_account, src_container, src_name, dest_account,
1130
            dest_container, dest_name, type, domain, meta, replace_meta,
1131
            permissions, src_version, False, delimiter)
1132
        return dest_version_id
1133

    
1134
    @debug_method
1135
    def move_object(self, user, src_account, src_container, src_name,
1136
                    dest_account, dest_container, dest_name, type, domain,
1137
                    meta=None, replace_meta=False, permissions=None,
1138
                    delimiter=None):
1139
        """Move an object's data and metadata."""
1140

    
1141
        meta = meta or {}
1142
        if user != src_account:
1143
            raise NotAllowedError
1144
        dest_version_id = self._move_object(
1145
            user, src_account, src_container, src_name, dest_account,
1146
            dest_container, dest_name, type, domain, meta, replace_meta,
1147
            permissions, None, delimiter=delimiter)
1148
        return dest_version_id
1149

    
1150
    def _delete_object(self, user, account, container, name, until=None,
1151
                       delimiter=None, report_size_change=True):
1152
        if user != account:
1153
            raise NotAllowedError
1154

    
1155
        # lookup object and lock container path also
1156
        path, node = self._lookup_object(account, container, name,
1157
                                         lock_container=True)
1158

    
1159
        if until is not None:
1160
            if node is None:
1161
                return
1162
            hashes = []
1163
            size = 0
1164
            serials = []
1165
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1166
                                           update_statistics_ancestors_depth=1)
1167
            hashes += h
1168
            size += s
1169
            serials += v
1170
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1171
                                           update_statistics_ancestors_depth=1)
1172
            hashes += h
1173
            if not self.free_versioning:
1174
                size += s
1175
            serials += v
1176
            for h in hashes:
1177
                self.store.map_delete(h)
1178
            self.node.node_purge(node, until, CLUSTER_DELETED,
1179
                                 update_statistics_ancestors_depth=1)
1180
            try:
1181
                self._get_version(node)
1182
            except NameError:
1183
                self.permissions.access_clear(path)
1184
            self._report_size_change(
1185
                user, account, -size, {
1186
                    'action': 'object purge',
1187
                    'path': path,
1188
                    'versions': ','.join(str(i) for i in serials)
1189
                }
1190
            )
1191
            return
1192

    
1193
        if not self._exists(node):
1194
            raise ItemNotExists('Object is deleted.')
1195
        src_version_id, dest_version_id = self._put_version_duplicate(
1196
            user, node, size=0, type='', hash=None, checksum='',
1197
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1198
        del_size = self._apply_versioning(account, container, src_version_id,
1199
                                          update_statistics_ancestors_depth=1)
1200
        if report_size_change:
1201
            self._report_size_change(
1202
                user, account, -del_size,
1203
                {'action': 'object delete',
1204
                 'path': path,
1205
                 'versions': ','.join([str(dest_version_id)])})
1206
        self._report_object_change(
1207
            user, account, path, details={'action': 'object delete'})
1208
        self.permissions.access_clear(path)
1209

    
1210
        if delimiter:
1211
            prefix = name + delimiter if not name.endswith(delimiter) else name
1212
            src_names = self._list_objects_no_limit(
1213
                user, account, container, prefix, delimiter=None,
1214
                virtual=False, domain=None, keys=[], shared=False, until=None,
1215
                size_range=None, all_props=True, public=False)
1216
            paths = []
1217
            for t in src_names:
1218
                path = '/'.join((account, container, t[0]))
1219
                node = t[2]
1220
                if not self._exists(node):
1221
                    continue
1222
                src_version_id, dest_version_id = self._put_version_duplicate(
1223
                    user, node, size=0, type='', hash=None, checksum='',
1224
                    cluster=CLUSTER_DELETED,
1225
                    update_statistics_ancestors_depth=1)
1226
                del_size = self._apply_versioning(
1227
                    account, container, src_version_id,
1228
                    update_statistics_ancestors_depth=1)
1229
                if report_size_change:
1230
                    self._report_size_change(
1231
                        user, account, -del_size,
1232
                        {'action': 'object delete',
1233
                         'path': path,
1234
                         'versions': ','.join([str(dest_version_id)])})
1235
                self._report_object_change(
1236
                    user, account, path, details={'action': 'object delete'})
1237
                paths.append(path)
1238
            self.permissions.access_clear_bulk(paths)
1239

    
1240
    @debug_method
1241
    def delete_object(self, user, account, container, name, until=None,
1242
                      prefix='', delimiter=None):
1243
        """Delete/purge an object."""
1244

    
1245
        self._delete_object(user, account, container, name, until, delimiter)
1246

    
1247
    @debug_method
1248
    def list_versions(self, user, account, container, name):
1249
        """Return a list of all object (version, version_timestamp) tuples."""
1250

    
1251
        self._can_read(user, account, container, name)
1252
        path, node = self._lookup_object(account, container, name)
1253
        versions = self.node.node_get_versions(node)
1254
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1255
                x[self.CLUSTER] != CLUSTER_DELETED]
1256

    
1257
    @debug_method
1258
    def get_uuid(self, user, uuid):
1259
        """Return the (account, container, name) for the UUID given."""
1260

    
1261
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1262
        if info is None:
1263
            raise NameError
1264
        path, serial = info
1265
        account, container, name = path.split('/', 2)
1266
        self._can_read(user, account, container, name)
1267
        return (account, container, name)
1268

    
1269
    @debug_method
1270
    def get_public(self, user, public):
1271
        """Return the (account, container, name) for the public id given."""
1272

    
1273
        path = self.permissions.public_path(public)
1274
        if path is None:
1275
            raise NameError
1276
        account, container, name = path.split('/', 2)
1277
        self._can_read(user, account, container, name)
1278
        return (account, container, name)
1279

    
1280
    def get_block(self, hash):
1281
        """Return a block's data."""
1282

    
1283
        logger.debug("get_block: %s", hash)
1284
        block = self.store.block_get(self._unhexlify_hash(hash))
1285
        if not block:
1286
            raise ItemNotExists('Block does not exist')
1287
        return block
1288

    
1289
    def put_block(self, data):
1290
        """Store a block and return the hash."""
1291

    
1292
        logger.debug("put_block: %s", len(data))
1293
        return binascii.hexlify(self.store.block_put(data))
1294

    
1295
    def update_block(self, hash, data, offset=0):
1296
        """Update a known block and return the hash."""
1297

    
1298
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1299
        if offset == 0 and len(data) == self.block_size:
1300
            return self.put_block(data)
1301
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1302
        return binascii.hexlify(h)
1303

    
1304
    # Path functions.
1305

    
1306
    def _generate_uuid(self):
1307
        return str(uuidlib.uuid4())
1308

    
1309
    def _put_object_node(self, path, parent, name):
1310
        path = '/'.join((path, name))
1311
        node = self.node.node_lookup(path)
1312
        if node is None:
1313
            node = self.node.node_create(parent, path)
1314
        return path, node
1315

    
1316
    def _put_path(self, user, parent, path,
1317
                  update_statistics_ancestors_depth=None):
1318
        node = self.node.node_create(parent, path)
1319
        self.node.version_create(node, None, 0, '', None, user,
1320
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1321
                                 update_statistics_ancestors_depth)
1322
        return node
1323

    
1324
    def _lookup_account(self, account, create=True):
1325
        node = self.node.node_lookup(account)
1326
        if node is None and create:
1327
            node = self._put_path(
1328
                account, self.ROOTNODE, account,
1329
                update_statistics_ancestors_depth=-1)  # User is account.
1330
        return account, node
1331

    
1332
    def _lookup_container(self, account, container):
1333
        for_update = True if self.lock_container_path else False
1334
        path = '/'.join((account, container))
1335
        node = self.node.node_lookup(path, for_update)
1336
        if node is None:
1337
            raise ItemNotExists('Container does not exist')
1338
        return path, node
1339

    
1340
    def _lookup_object(self, account, container, name, lock_container=False):
1341
        if lock_container:
1342
            self._lookup_container(account, container)
1343

    
1344
        path = '/'.join((account, container, name))
1345
        node = self.node.node_lookup(path)
1346
        if node is None:
1347
            raise ItemNotExists('Object does not exist')
1348
        return path, node
1349

    
1350
    def _lookup_objects(self, paths):
1351
        nodes = self.node.node_lookup_bulk(paths)
1352
        return paths, nodes
1353

    
1354
    def _get_properties(self, node, until=None):
1355
        """Return properties until the timestamp given."""
1356

    
1357
        before = until if until is not None else inf
1358
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1359
        if props is None and until is not None:
1360
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1361
        if props is None:
1362
            raise ItemNotExists('Path does not exist')
1363
        return props
1364

    
1365
    def _get_statistics(self, node, until=None, compute=False):
1366
        """Return (count, sum of size, timestamp) of everything under node."""
1367

    
1368
        if until is not None:
1369
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1370
        elif compute:
1371
            stats = self.node.statistics_latest(node,
1372
                                                except_cluster=CLUSTER_DELETED)
1373
        else:
1374
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1375
        if stats is None:
1376
            stats = (0, 0, 0)
1377
        return stats
1378

    
1379
    def _get_version(self, node, version=None):
1380
        if version is None:
1381
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1382
            if props is None:
1383
                raise ItemNotExists('Object does not exist')
1384
        else:
1385
            try:
1386
                version = int(version)
1387
            except ValueError:
1388
                raise VersionNotExists('Version does not exist')
1389
            props = self.node.version_get_properties(version, node=node)
1390
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1391
                raise VersionNotExists('Version does not exist')
1392
        return props
1393

    
1394
    def _get_versions(self, nodes):
1395
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1396

    
1397
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1398
                               type=None, hash=None, checksum=None,
1399
                               cluster=CLUSTER_NORMAL, is_copy=False,
1400
                               update_statistics_ancestors_depth=None):
1401
        """Create a new version of the node."""
1402

    
1403
        props = self.node.version_lookup(
1404
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1405
        if props is not None:
1406
            src_version_id = props[self.SERIAL]
1407
            src_hash = props[self.HASH]
1408
            src_size = props[self.SIZE]
1409
            src_type = props[self.TYPE]
1410
            src_checksum = props[self.CHECKSUM]
1411
        else:
1412
            src_version_id = None
1413
            src_hash = None
1414
            src_size = 0
1415
            src_type = ''
1416
            src_checksum = ''
1417
        if size is None:  # Set metadata.
1418
            hash = src_hash  # This way hash can be set to None
1419
                             # (account or container).
1420
            size = src_size
1421
        if type is None:
1422
            type = src_type
1423
        if checksum is None:
1424
            checksum = src_checksum
1425
        uuid = self._generate_uuid(
1426
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1427

    
1428
        if src_node is None:
1429
            pre_version_id = src_version_id
1430
        else:
1431
            pre_version_id = None
1432
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1433
            if props is not None:
1434
                pre_version_id = props[self.SERIAL]
1435
        if pre_version_id is not None:
1436
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1437
                                        update_statistics_ancestors_depth)
1438

    
1439
        dest_version_id, mtime = self.node.version_create(
1440
            node, hash, size, type, src_version_id, user, uuid, checksum,
1441
            cluster, update_statistics_ancestors_depth)
1442

    
1443
        self.node.attribute_unset_is_latest(node, dest_version_id)
1444

    
1445
        return pre_version_id, dest_version_id
1446

    
1447
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1448
                                node, meta, replace=False):
1449
        if src_version_id is not None:
1450
            self.node.attribute_copy(src_version_id, dest_version_id)
1451
        if not replace:
1452
            self.node.attribute_del(dest_version_id, domain, (
1453
                k for k, v in meta.iteritems() if v == ''))
1454
            self.node.attribute_set(dest_version_id, domain, node, (
1455
                (k, v) for k, v in meta.iteritems() if v != ''))
1456
        else:
1457
            self.node.attribute_del(dest_version_id, domain)
1458
            self.node.attribute_set(dest_version_id, domain, node, ((
1459
                k, v) for k, v in meta.iteritems()))
1460

    
1461
    def _put_metadata(self, user, node, domain, meta, replace=False,
1462
                      update_statistics_ancestors_depth=None):
1463
        """Create a new version and store metadata."""
1464

    
1465
        src_version_id, dest_version_id = self._put_version_duplicate(
1466
            user, node,
1467
            update_statistics_ancestors_depth=
1468
            update_statistics_ancestors_depth)
1469
        self._put_metadata_duplicate(
1470
            src_version_id, dest_version_id, domain, node, meta, replace)
1471
        return src_version_id, dest_version_id
1472

    
1473
    def _list_limits(self, listing, marker, limit):
1474
        start = 0
1475
        if marker:
1476
            try:
1477
                start = listing.index(marker) + 1
1478
            except ValueError:
1479
                pass
1480
        if not limit or limit > 10000:
1481
            limit = 10000
1482
        return start, limit
1483

    
1484
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1485
                                marker=None, limit=10000, virtual=True,
1486
                                domain=None, keys=None, until=None,
1487
                                size_range=None, allowed=None,
1488
                                all_props=False):
1489
        keys = keys or []
1490
        allowed = allowed or []
1491
        cont_prefix = path + '/'
1492
        prefix = cont_prefix + prefix
1493
        start = cont_prefix + marker if marker else None
1494
        before = until if until is not None else inf
1495
        filterq = keys if domain else []
1496
        sizeq = size_range
1497

    
1498
        objects, prefixes = self.node.latest_version_list(
1499
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1500
            allowed, domain, filterq, sizeq, all_props)
1501
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1502
        objects.sort(key=lambda x: x[0])
1503
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1504
        return objects
1505

    
1506
    # Reporting functions.
1507

    
1508
    @debug_method
1509
    def _report_size_change(self, user, account, size, details=None):
1510
        details = details or {}
1511

    
1512
        if size == 0:
1513
            return
1514

    
1515
        account_node = self._lookup_account(account, True)[1]
1516
        total = self._get_statistics(account_node, compute=True)[1]
1517
        details.update({'user': user, 'total': total})
1518
        self.messages.append(
1519
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1520
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1521

    
1522
        if not self.using_external_quotaholder:
1523
            return
1524

    
1525
        try:
1526
            name = details['path'] if 'path' in details else ''
1527
            serial = self.astakosclient.issue_one_commission(
1528
                token=self.service_token,
1529
                holder=account,
1530
                source=DEFAULT_SOURCE,
1531
                provisions={'pithos.diskspace': size},
1532
                name=name)
1533
        except BaseException, e:
1534
            raise QuotaError(e)
1535
        else:
1536
            self.serials.append(serial)
1537

    
1538
    @debug_method
1539
    def _report_object_change(self, user, account, path, details=None):
1540
        details = details or {}
1541
        details.update({'user': user})
1542
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1543
                              account, QUEUE_INSTANCE_ID, 'object', path,
1544
                              details))
1545

    
1546
    @debug_method
1547
    def _report_sharing_change(self, user, account, path, details=None):
1548
        details = details or {}
1549
        details.update({'user': user})
1550
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1551
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1552
                              details))
1553

    
1554
    # Policy functions.
1555

    
1556
    def _check_policy(self, policy, is_account_policy=True):
1557
        default_policy = self.default_account_policy \
1558
            if is_account_policy else self.default_container_policy
1559
        for k in policy.keys():
1560
            if policy[k] == '':
1561
                policy[k] = default_policy.get(k)
1562
        for k, v in policy.iteritems():
1563
            if k == 'quota':
1564
                q = int(v)  # May raise ValueError.
1565
                if q < 0:
1566
                    raise ValueError
1567
            elif k == 'versioning':
1568
                if v not in ['auto', 'none']:
1569
                    raise ValueError
1570
            else:
1571
                raise ValueError
1572

    
1573
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1574
        default_policy = self.default_account_policy \
1575
            if is_account_policy else self.default_container_policy
1576
        if replace:
1577
            for k, v in default_policy.iteritems():
1578
                if k not in policy:
1579
                    policy[k] = v
1580
        self.node.policy_set(node, policy)
1581

    
1582
    def _get_policy(self, node, is_account_policy=True):
1583
        default_policy = self.default_account_policy \
1584
            if is_account_policy else self.default_container_policy
1585
        policy = default_policy.copy()
1586
        policy.update(self.node.policy_get(node))
1587
        return policy
1588

    
1589
    def _apply_versioning(self, account, container, version_id,
1590
                          update_statistics_ancestors_depth=None):
1591
        """Delete the provided version if such is the policy.
1592
           Return size of object removed.
1593
        """
1594

    
1595
        if version_id is None:
1596
            return 0
1597
        path, node = self._lookup_container(account, container)
1598
        versioning = self._get_policy(
1599
            node, is_account_policy=False)['versioning']
1600
        if versioning != 'auto':
1601
            hash, size = self.node.version_remove(
1602
                version_id, update_statistics_ancestors_depth)
1603
            self.store.map_delete(hash)
1604
            return size
1605
        elif self.free_versioning:
1606
            return self.node.version_get_properties(
1607
                version_id, keys=('size',))[0]
1608
        return 0
1609

    
1610
    # Access control functions.
1611

    
1612
    def _check_groups(self, groups):
1613
        # raise ValueError('Bad characters in groups')
1614
        pass
1615

    
1616
    def _check_permissions(self, path, permissions):
1617
        # raise ValueError('Bad characters in permissions')
1618
        pass
1619

    
1620
    def _get_formatted_paths(self, paths):
1621
        formatted = []
1622
        for p in paths:
1623
            node = self.node.node_lookup(p)
1624
            props = None
1625
            if node is not None:
1626
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1627
            if props is not None:
1628
                if props[self.TYPE].split(';', 1)[0].strip() in (
1629
                        'application/directory', 'application/folder'):
1630
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1631
                formatted.append((p, self.MATCH_EXACT))
1632
        return formatted
1633

    
1634
    def _get_permissions_path(self, account, container, name):
1635
        path = '/'.join((account, container, name))
1636
        permission_paths = self.permissions.access_inherit(path)
1637
        permission_paths.sort()
1638
        permission_paths.reverse()
1639
        for p in permission_paths:
1640
            if p == path:
1641
                return p
1642
            else:
1643
                if p.count('/') < 2:
1644
                    continue
1645
                node = self.node.node_lookup(p)
1646
                props = None
1647
                if node is not None:
1648
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1649
                if props is not None:
1650
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1651
                            'application/directory', 'application/folder'):
1652
                        return p
1653
        return None
1654

    
1655
    def _can_read(self, user, account, container, name):
1656
        if user == account:
1657
            return True
1658
        path = '/'.join((account, container, name))
1659
        if self.permissions.public_get(path) is not None:
1660
            return True
1661
        path = self._get_permissions_path(account, container, name)
1662
        if not path:
1663
            raise NotAllowedError
1664
        if (not self.permissions.access_check(path, self.READ, user) and not
1665
                self.permissions.access_check(path, self.WRITE, user)):
1666
            raise NotAllowedError
1667

    
1668
    def _can_write(self, user, account, container, name):
1669
        if user == account:
1670
            return True
1671
        path = '/'.join((account, container, name))
1672
        path = self._get_permissions_path(account, container, name)
1673
        if not path:
1674
            raise NotAllowedError
1675
        if not self.permissions.access_check(path, self.WRITE, user):
1676
            raise NotAllowedError
1677

    
1678
    def _allowed_accounts(self, user):
1679
        allow = set()
1680
        for path in self.permissions.access_list_paths(user):
1681
            allow.add(path.split('/', 1)[0])
1682
        return sorted(allow)
1683

    
1684
    def _allowed_containers(self, user, account):
1685
        allow = set()
1686
        for path in self.permissions.access_list_paths(user, account):
1687
            allow.add(path.split('/', 2)[1])
1688
        return sorted(allow)
1689

    
1690
    # Domain functions
1691

    
1692
    @debug_method
1693
    def get_domain_objects(self, domain, user=None):
1694
        allowed_paths = self.permissions.access_list_paths(
1695
            user, include_owned=user is not None, include_containers=False)
1696
        if not allowed_paths:
1697
            return []
1698
        obj_list = self.node.domain_object_list(
1699
            domain, allowed_paths, CLUSTER_NORMAL)
1700
        return [(path,
1701
                 self._build_metadata(props, user_defined_meta),
1702
                 self.permissions.access_get(path)) for
1703
                path, props, user_defined_meta in obj_list]
1704

    
1705
    # util functions
1706

    
1707
    def _build_metadata(self, props, user_defined=None,
1708
                        include_user_defined=True):
1709
        meta = {'bytes': props[self.SIZE],
1710
                'type': props[self.TYPE],
1711
                'hash': props[self.HASH],
1712
                'version': props[self.SERIAL],
1713
                'version_timestamp': props[self.MTIME],
1714
                'modified_by': props[self.MUSER],
1715
                'uuid': props[self.UUID],
1716
                'checksum': props[self.CHECKSUM]}
1717
        if include_user_defined and user_defined is not None:
1718
            meta.update(user_defined)
1719
        return meta
1720

    
1721
    def _exists(self, node):
1722
        try:
1723
            self._get_version(node)
1724
        except ItemNotExists:
1725
            return False
1726
        else:
1727
            return True
1728

    
1729
    def _unhexlify_hash(self, hash):
1730
        try:
1731
            return binascii.unhexlify(hash)
1732
        except TypeError:
1733
            raise InvalidHash(hash)