Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 9e3a38bb

History | View | Annotate | Download (69.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
52

    
53

    
54
class DisabledAstakosClient(object):
55
    def __init__(self, *args, **kwargs):
56
        self.args = args
57
        self.kwargs = kwargs
58

    
59
    def __getattr__(self, name):
60
        m = ("AstakosClient has been disabled, "
61
             "yet an attempt to access it was made")
62
        raise AssertionError(m)
63

    
64

    
65
# Stripped-down version of the HashMap class found in tools.
66

    
67
class HashMap(list):
68

    
69
    def __init__(self, blocksize, blockhash):
70
        super(HashMap, self).__init__()
71
        self.blocksize = blocksize
72
        self.blockhash = blockhash
73

    
74
    def _hash_raw(self, v):
75
        h = hashlib.new(self.blockhash)
76
        h.update(v)
77
        return h.digest()
78

    
79
    def hash(self):
80
        if len(self) == 0:
81
            return self._hash_raw('')
82
        if len(self) == 1:
83
            return self.__getitem__(0)
84

    
85
        h = list(self)
86
        s = 2
87
        while s < len(h):
88
            s = s * 2
89
        h += [('\x00' * len(h[0]))] * (s - len(h))
90
        while len(h) > 1:
91
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
92
        return h[0]
93

    
94
# Default modules and settings.
95
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
96
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
97
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
98
DEFAULT_BLOCK_PATH = 'data/'
99
DEFAULT_BLOCK_UMASK = 0o022
100
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
101
DEFAULT_HASH_ALGORITHM = 'sha256'
102
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
103
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
104
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
105
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
106
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
107
                               'abcdefghijklmnopqrstuvwxyz'
108
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
109
DEFAULT_PUBLIC_URL_SECURITY = 16
110

    
111
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
112
QUEUE_CLIENT_ID = 'pithos'
113
QUEUE_INSTANCE_ID = '1'
114

    
115
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
116

    
117
inf = float('inf')
118

    
119
ULTIMATE_ANSWER = 42
120

    
121
DEFAULT_SOURCE = 'system'
122

    
123
logger = logging.getLogger(__name__)
124

    
125

    
126
def debug_method(func):
127
    @wraps(func)
128
    def wrapper(self, *args, **kw):
129
        try:
130
            result = func(self, *args, **kw)
131
            return result
132
        except:
133
            result = format_exc()
134
            raise
135
        finally:
136
            all_args = [str(i) for i in args]
137
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
138
            logger.debug(">>> %s(%s) <<< %s" % (
139
                func.__name__, ', '.join(all_args).rstrip(', '), result))
140
    return wrapper
141

    
142

    
143
class ModularBackend(BaseBackend):
144
    """A modular backend.
145

146
    Uses modules for SQL functions and storage.
147
    """
148

    
149
    def __init__(self, db_module=None, db_connection=None,
150
                 block_module=None, block_path=None, block_umask=None,
151
                 block_size=None, hash_algorithm=None,
152
                 queue_module=None, queue_hosts=None, queue_exchange=None,
153
                 astakos_url=None, service_token=None,
154
                 astakosclient_poolsize=None,
155
                 free_versioning=True, block_params=None,
156
                 public_url_security=None,
157
                 public_url_alphabet=None,
158
                 account_quota_policy=None,
159
                 container_quota_policy=None,
160
                 container_versioning_policy=None):
161
        db_module = db_module or DEFAULT_DB_MODULE
162
        db_connection = db_connection or DEFAULT_DB_CONNECTION
163
        block_module = block_module or DEFAULT_BLOCK_MODULE
164
        block_path = block_path or DEFAULT_BLOCK_PATH
165
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
166
        block_params = block_params or DEFAULT_BLOCK_PARAMS
167
        block_size = block_size or DEFAULT_BLOCK_SIZE
168
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
169
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
170
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
171
        container_quota_policy = container_quota_policy \
172
            or DEFAULT_CONTAINER_QUOTA
173
        container_versioning_policy = container_versioning_policy \
174
            or DEFAULT_CONTAINER_VERSIONING
175

    
176
        self.default_account_policy = {'quota': account_quota_policy}
177
        self.default_container_policy = {
178
            'quota': container_quota_policy,
179
            'versioning': container_versioning_policy
180
        }
181
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
182
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
183

    
184
        self.public_url_security = (public_url_security or
185
                                    DEFAULT_PUBLIC_URL_SECURITY)
186
        self.public_url_alphabet = (public_url_alphabet or
187
                                    DEFAULT_PUBLIC_URL_ALPHABET)
188

    
189
        self.hash_algorithm = hash_algorithm
190
        self.block_size = block_size
191
        self.free_versioning = free_versioning
192

    
193
        def load_module(m):
194
            __import__(m)
195
            return sys.modules[m]
196

    
197
        self.db_module = load_module(db_module)
198
        self.wrapper = self.db_module.DBWrapper(db_connection)
199
        params = {'wrapper': self.wrapper}
200
        self.permissions = self.db_module.Permissions(**params)
201
        self.config = self.db_module.Config(**params)
202
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
203
        for x in ['READ', 'WRITE']:
204
            setattr(self, x, getattr(self.db_module, x))
205
        self.node = self.db_module.Node(**params)
206
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME',
207
                  'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX',
208
                  'MATCH_EXACT']:
209
            setattr(self, x, getattr(self.db_module, x))
210

    
211
        self.block_module = load_module(block_module)
212
        self.block_params = block_params
213
        params = {'path': block_path,
214
                  'block_size': self.block_size,
215
                  'hash_algorithm': self.hash_algorithm,
216
                  'umask': block_umask}
217
        params.update(self.block_params)
218
        self.store = self.block_module.Store(**params)
219

    
220
        if queue_module and queue_hosts:
221
            self.queue_module = load_module(queue_module)
222
            params = {'hosts': queue_hosts,
223
                      'exchange': queue_exchange,
224
                      'client_id': QUEUE_CLIENT_ID}
225
            self.queue = self.queue_module.Queue(**params)
226
        else:
227
            class NoQueue:
228
                def send(self, *args):
229
                    pass
230

    
231
                def close(self):
232
                    pass
233

    
234
            self.queue = NoQueue()
235

    
236
        self.astakos_url = astakos_url
237
        self.service_token = service_token
238

    
239
        if not astakos_url or not AstakosClient:
240
            self.astakosclient = DisabledAstakosClient(
241
                astakos_url,
242
                use_pool=True,
243
                pool_size=astakosclient_poolsize)
244
        else:
245
            self.astakosclient = AstakosClient(
246
                astakos_url,
247
                use_pool=True,
248
                pool_size=astakosclient_poolsize)
249

    
250
        self.serials = []
251
        self.messages = []
252

    
253
        self._move_object = partial(self._copy_object, is_move=True)
254

    
255
    def pre_exec(self, lock_container_path=False):
256
        self.lock_container_path = lock_container_path
257
        self.wrapper.execute()
258

    
259
    def post_exec(self, success_status=True):
260
        if success_status:
261
            # send messages produced
262
            for m in self.messages:
263
                self.queue.send(*m)
264

    
265
            # register serials
266
            if self.serials:
267
                self.commission_serials.insert_many(
268
                    self.serials)
269

    
270
                # commit to ensure that the serials are registered
271
                # even if resolve commission fails
272
                self.wrapper.commit()
273

    
274
                # start new transaction
275
                self.wrapper.execute()
276

    
277
                r = self.astakosclient.resolve_commissions(
278
                    token=self.service_token,
279
                    accept_serials=self.serials,
280
                    reject_serials=[])
281
                self.commission_serials.delete_many(
282
                    r['accepted'])
283

    
284
            self.wrapper.commit()
285
        else:
286
            if self.serials:
287
                self.astakosclient.resolve_commissions(
288
                    token=self.service_token,
289
                    accept_serials=[],
290
                    reject_serials=self.serials)
291
            self.wrapper.rollback()
292

    
293
    def close(self):
294
        self.wrapper.close()
295
        self.queue.close()
296

    
297
    @property
298
    def using_external_quotaholder(self):
299
        return not isinstance(self.astakosclient, DisabledAstakosClient)
300

    
301
    @debug_method
302
    def list_accounts(self, user, marker=None, limit=10000):
303
        """Return a list of accounts the user can access."""
304

    
305
        allowed = self._allowed_accounts(user)
306
        start, limit = self._list_limits(allowed, marker, limit)
307
        return allowed[start:start + limit]
308

    
309
    @debug_method
310
    def get_account_meta(
311
            self, user, account, domain, until=None, include_user_defined=True,
312
            external_quota=None):
313
        """Return a dictionary with the account metadata for the domain."""
314

    
315
        path, node = self._lookup_account(account, user == account)
316
        if user != account:
317
            if until or (node is None) or (account not
318
                                           in self._allowed_accounts(user)):
319
                raise NotAllowedError
320
        try:
321
            props = self._get_properties(node, until)
322
            mtime = props[self.MTIME]
323
        except NameError:
324
            props = None
325
            mtime = until
326
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
327
        tstamp = max(tstamp, mtime)
328
        if until is None:
329
            modified = tstamp
330
        else:
331
            modified = self._get_statistics(
332
                node, compute=True)[2]  # Overall last modification.
333
            modified = max(modified, mtime)
334

    
335
        if user != account:
336
            meta = {'name': account}
337
        else:
338
            meta = {}
339
            if props is not None and include_user_defined:
340
                meta.update(
341
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
342
            if until is not None:
343
                meta.update({'until_timestamp': tstamp})
344
            meta.update({'name': account, 'count': count, 'bytes': bytes})
345
            if self.using_external_quotaholder:
346
                external_quota = external_quota or {}
347
                meta['bytes'] = external_quota.get('usage', 0)
348
        meta.update({'modified': modified})
349
        return meta
350

    
351
    @debug_method
352
    def update_account_meta(self, user, account, domain, meta, replace=False):
353
        """Update the metadata associated with the account for the domain."""
354

    
355
        if user != account:
356
            raise NotAllowedError
357
        path, node = self._lookup_account(account, True)
358
        self._put_metadata(user, node, domain, meta, replace,
359
                           update_statistics_ancestors_depth=-1)
360

    
361
    @debug_method
362
    def get_account_groups(self, user, account):
363
        """Return a dictionary with the user groups defined for the account."""
364

    
365
        if user != account:
366
            if account not in self._allowed_accounts(user):
367
                raise NotAllowedError
368
            return {}
369
        self._lookup_account(account, True)
370
        return self.permissions.group_dict(account)
371

    
372
    @debug_method
373
    def update_account_groups(self, user, account, groups, replace=False):
374
        """Update the groups associated with the account."""
375

    
376
        if user != account:
377
            raise NotAllowedError
378
        self._lookup_account(account, True)
379
        self._check_groups(groups)
380
        if replace:
381
            self.permissions.group_destroy(account)
382
        for k, v in groups.iteritems():
383
            if not replace:  # If not already deleted.
384
                self.permissions.group_delete(account, k)
385
            if v:
386
                self.permissions.group_addmany(account, k, v)
387

    
388
    @debug_method
389
    def get_account_policy(self, user, account, external_quota=None):
390
        """Return a dictionary with the account policy."""
391

    
392
        if user != account:
393
            if account not in self._allowed_accounts(user):
394
                raise NotAllowedError
395
            return {}
396
        path, node = self._lookup_account(account, True)
397
        policy = self._get_policy(node, is_account_policy=True)
398
        if self.using_external_quotaholder:
399
            external_quota = external_quota or {}
400
            policy['quota'] = external_quota.get('limit', 0)
401
        return policy
402

    
403
    @debug_method
404
    def update_account_policy(self, user, account, policy, replace=False):
405
        """Update the policy associated with the account."""
406

    
407
        if user != account:
408
            raise NotAllowedError
409
        path, node = self._lookup_account(account, True)
410
        self._check_policy(policy, is_account_policy=True)
411
        self._put_policy(node, policy, replace, is_account_policy=True)
412

    
413
    @debug_method
414
    def put_account(self, user, account, policy=None):
415
        """Create a new account with the given name."""
416

    
417
        policy = policy or {}
418
        if user != account:
419
            raise NotAllowedError
420
        node = self.node.node_lookup(account)
421
        if node is not None:
422
            raise AccountExists('Account already exists')
423
        if policy:
424
            self._check_policy(policy, is_account_policy=True)
425
        node = self._put_path(user, self.ROOTNODE, account,
426
                              update_statistics_ancestors_depth=-1)
427
        self._put_policy(node, policy, True, is_account_policy=True)
428

    
429
    @debug_method
430
    def delete_account(self, user, account):
431
        """Delete the account with the given name."""
432

    
433
        if user != account:
434
            raise NotAllowedError
435
        node = self.node.node_lookup(account)
436
        if node is None:
437
            return
438
        if not self.node.node_remove(node,
439
                                     update_statistics_ancestors_depth=-1):
440
            raise AccountNotEmpty('Account is not empty')
441
        self.permissions.group_destroy(account)
442

    
443
    @debug_method
444
    def list_containers(self, user, account, marker=None, limit=10000,
445
                        shared=False, until=None, public=False):
446
        """Return a list of containers existing under an account."""
447

    
448
        if user != account:
449
            if until or account not in self._allowed_accounts(user):
450
                raise NotAllowedError
451
            allowed = self._allowed_containers(user, account)
452
            start, limit = self._list_limits(allowed, marker, limit)
453
            return allowed[start:start + limit]
454
        if shared or public:
455
            allowed = set()
456
            if shared:
457
                allowed.update([x.split('/', 2)[1] for x in
458
                               self.permissions.access_list_shared(account)])
459
            if public:
460
                allowed.update([x[0].split('/', 2)[1] for x in
461
                               self.permissions.public_list(account)])
462
            allowed = sorted(allowed)
463
            start, limit = self._list_limits(allowed, marker, limit)
464
            return allowed[start:start + limit]
465
        node = self.node.node_lookup(account)
466
        containers = [x[0] for x in self._list_object_properties(
467
            node, account, '', '/', marker, limit, False, None, [], until)]
468
        start, limit = self._list_limits(
469
            [x[0] for x in containers], marker, limit)
470
        return containers[start:start + limit]
471

    
472
    @debug_method
473
    def list_container_meta(self, user, account, container, domain,
474
                            until=None):
475
        """Return a list of the container's object meta keys for a domain."""
476

    
477
        allowed = []
478
        if user != account:
479
            if until:
480
                raise NotAllowedError
481
            allowed = self.permissions.access_list_paths(
482
                user, '/'.join((account, container)))
483
            if not allowed:
484
                raise NotAllowedError
485
        path, node = self._lookup_container(account, container)
486
        before = until if until is not None else inf
487
        allowed = self._get_formatted_paths(allowed)
488
        return self.node.latest_attribute_keys(node, domain, before,
489
                                               CLUSTER_DELETED, allowed)
490

    
491
    @debug_method
492
    def get_container_meta(self, user, account, container, domain, until=None,
493
                           include_user_defined=True):
494
        """Return a dictionary with the container metadata for the domain."""
495

    
496
        if user != account:
497
            if until or container not in self._allowed_containers(user,
498
                                                                  account):
499
                raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501
        props = self._get_properties(node, until)
502
        mtime = props[self.MTIME]
503
        count, bytes, tstamp = self._get_statistics(node, until)
504
        tstamp = max(tstamp, mtime)
505
        if until is None:
506
            modified = tstamp
507
        else:
508
            modified = self._get_statistics(
509
                node)[2]  # Overall last modification.
510
            modified = max(modified, mtime)
511

    
512
        if user != account:
513
            meta = {'name': container}
514
        else:
515
            meta = {}
516
            if include_user_defined:
517
                meta.update(
518
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
519
            if until is not None:
520
                meta.update({'until_timestamp': tstamp})
521
            meta.update({'name': container, 'count': count, 'bytes': bytes})
522
        meta.update({'modified': modified})
523
        return meta
524

    
525
    @debug_method
526
    def update_container_meta(self, user, account, container, domain, meta,
527
                              replace=False):
528
        """Update the metadata associated with the container for the domain."""
529

    
530
        if user != account:
531
            raise NotAllowedError
532
        path, node = self._lookup_container(account, container)
533
        src_version_id, dest_version_id = self._put_metadata(
534
            user, node, domain, meta, replace,
535
            update_statistics_ancestors_depth=0)
536
        if src_version_id is not None:
537
            versioning = self._get_policy(
538
                node, is_account_policy=False)['versioning']
539
            if versioning != 'auto':
540
                self.node.version_remove(src_version_id,
541
                                         update_statistics_ancestors_depth=0)
542

    
543
    @debug_method
544
    def get_container_policy(self, user, account, container):
545
        """Return a dictionary with the container policy."""
546

    
547
        if user != account:
548
            if container not in self._allowed_containers(user, account):
549
                raise NotAllowedError
550
            return {}
551
        path, node = self._lookup_container(account, container)
552
        return self._get_policy(node, is_account_policy=False)
553

    
554
    @debug_method
555
    def update_container_policy(self, user, account, container, policy,
556
                                replace=False):
557
        """Update the policy associated with the container."""
558

    
559
        if user != account:
560
            raise NotAllowedError
561
        path, node = self._lookup_container(account, container)
562
        self._check_policy(policy, is_account_policy=False)
563
        self._put_policy(node, policy, replace, is_account_policy=False)
564

    
565
    @debug_method
566
    def put_container(self, user, account, container, policy=None):
567
        """Create a new container with the given name."""
568

    
569
        policy = policy or {}
570
        if user != account:
571
            raise NotAllowedError
572
        try:
573
            path, node = self._lookup_container(account, container)
574
        except NameError:
575
            pass
576
        else:
577
            raise ContainerExists('Container already exists')
578
        if policy:
579
            self._check_policy(policy, is_account_policy=False)
580
        path = '/'.join((account, container))
581
        node = self._put_path(
582
            user, self._lookup_account(account, True)[1], path,
583
            update_statistics_ancestors_depth=-1)
584
        self._put_policy(node, policy, True, is_account_policy=False)
585

    
586
    @debug_method
587
    def delete_container(self, user, account, container, until=None, prefix='',
588
                         delimiter=None):
589
        """Delete/purge the container with the given name."""
590

    
591
        if user != account:
592
            raise NotAllowedError
593
        path, node = self._lookup_container(account, container)
594

    
595
        if until is not None:
596
            hashes, size, serials = self.node.node_purge_children(
597
                node, until, CLUSTER_HISTORY,
598
                update_statistics_ancestors_depth=0)
599
            for h in hashes:
600
                self.store.map_delete(h)
601
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
602
                                          update_statistics_ancestors_depth=0)
603
            if not self.free_versioning:
604
                self._report_size_change(
605
                    user, account, -size, {
606
                        'action': 'container purge',
607
                        'path': path,
608
                        'versions': ','.join(str(i) for i in serials)
609
                    }
610
                )
611
            return
612

    
613
        if not delimiter:
614
            if self._get_statistics(node)[0] > 0:
615
                raise ContainerNotEmpty('Container is not empty')
616
            hashes, size, serials = self.node.node_purge_children(
617
                node, inf, CLUSTER_HISTORY,
618
                update_statistics_ancestors_depth=0)
619
            for h in hashes:
620
                self.store.map_delete(h)
621
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
622
                                          update_statistics_ancestors_depth=0)
623
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
624
            if not self.free_versioning:
625
                self._report_size_change(
626
                    user, account, -size, {
627
                        'action': 'container purge',
628
                        'path': path,
629
                        'versions': ','.join(str(i) for i in serials)
630
                    }
631
                )
632
        else:
633
            # remove only contents
634
            src_names = self._list_objects_no_limit(
635
                user, account, container, prefix='', delimiter=None,
636
                virtual=False, domain=None, keys=[], shared=False, until=None,
637
                size_range=None, all_props=True, public=False)
638
            paths = []
639
            for t in src_names:
640
                path = '/'.join((account, container, t[0]))
641
                node = t[2]
642
                src_version_id, dest_version_id = self._put_version_duplicate(
643
                    user, node, size=0, type='', hash=None, checksum='',
644
                    cluster=CLUSTER_DELETED,
645
                    update_statistics_ancestors_depth=1)
646
                del_size = self._apply_versioning(
647
                    account, container, src_version_id,
648
                    update_statistics_ancestors_depth=1)
649
                self._report_size_change(
650
                    user, account, -del_size, {
651
                        'action': 'object delete',
652
                        'path': path,
653
                        'versions': ','.join([str(dest_version_id)])})
654
                self._report_object_change(
655
                    user, account, path, details={'action': 'object delete'})
656
                paths.append(path)
657
            self.permissions.access_clear_bulk(paths)
658

    
659
    def _list_objects(self, user, account, container, prefix, delimiter,
660
                      marker, limit, virtual, domain, keys, shared, until,
661
                      size_range, all_props, public):
662
        if user != account and until:
663
            raise NotAllowedError
664
        if shared and public:
665
            # get shared first
666
            shared_paths = self._list_object_permissions(
667
                user, account, container, prefix, shared=True, public=False)
668
            objects = set()
669
            if shared_paths:
670
                path, node = self._lookup_container(account, container)
671
                shared_paths = self._get_formatted_paths(shared_paths)
672
                objects |= set(self._list_object_properties(
673
                    node, path, prefix, delimiter, marker, limit, virtual,
674
                    domain, keys, until, size_range, shared_paths, all_props))
675

    
676
            # get public
677
            objects |= set(self._list_public_object_properties(
678
                user, account, container, prefix, all_props))
679
            objects = list(objects)
680

    
681
            objects.sort(key=lambda x: x[0])
682
            start, limit = self._list_limits(
683
                [x[0] for x in objects], marker, limit)
684
            return objects[start:start + limit]
685
        elif public:
686
            objects = self._list_public_object_properties(
687
                user, account, container, prefix, all_props)
688
            start, limit = self._list_limits(
689
                [x[0] for x in objects], marker, limit)
690
            return objects[start:start + limit]
691

    
692
        allowed = self._list_object_permissions(
693
            user, account, container, prefix, shared, public)
694
        if shared and not allowed:
695
            return []
696
        path, node = self._lookup_container(account, container)
697
        allowed = self._get_formatted_paths(allowed)
698
        objects = self._list_object_properties(
699
            node, path, prefix, delimiter, marker, limit, virtual, domain,
700
            keys, until, size_range, allowed, all_props)
701
        start, limit = self._list_limits(
702
            [x[0] for x in objects], marker, limit)
703
        return objects[start:start + limit]
704

    
705
    def _list_public_object_properties(self, user, account, container, prefix,
706
                                       all_props):
707
        public = self._list_object_permissions(
708
            user, account, container, prefix, shared=False, public=True)
709
        paths, nodes = self._lookup_objects(public)
710
        path = '/'.join((account, container))
711
        cont_prefix = path + '/'
712
        paths = [x[len(cont_prefix):] for x in paths]
713
        objects = [(p,) + props for p, props in
714
                   zip(paths, self.node.version_lookup_bulk(
715
                       nodes, all_props=all_props))]
716
        return objects
717

    
718
    def _list_objects_no_limit(self, user, account, container, prefix,
719
                               delimiter, virtual, domain, keys, shared, until,
720
                               size_range, all_props, public):
721
        objects = []
722
        while True:
723
            marker = objects[-1] if objects else None
724
            limit = 10000
725
            l = self._list_objects(
726
                user, account, container, prefix, delimiter, marker, limit,
727
                virtual, domain, keys, shared, until, size_range, all_props,
728
                public)
729
            objects.extend(l)
730
            if not l or len(l) < limit:
731
                break
732
        return objects
733

    
734
    def _list_object_permissions(self, user, account, container, prefix,
735
                                 shared, public):
736
        allowed = []
737
        path = '/'.join((account, container, prefix)).rstrip('/')
738
        if user != account:
739
            allowed = self.permissions.access_list_paths(user, path)
740
            if not allowed:
741
                raise NotAllowedError
742
        else:
743
            allowed = set()
744
            if shared:
745
                allowed.update(self.permissions.access_list_shared(path))
746
            if public:
747
                allowed.update(
748
                    [x[0] for x in self.permissions.public_list(path)])
749
            allowed = sorted(allowed)
750
            if not allowed:
751
                return []
752
        return allowed
753

    
754
    @debug_method
755
    def list_objects(self, user, account, container, prefix='', delimiter=None,
756
                     marker=None, limit=10000, virtual=True, domain=None,
757
                     keys=None, shared=False, until=None, size_range=None,
758
                     public=False):
759
        """List (object name, object version_id) under a container."""
760

    
761
        keys = keys or []
762
        return self._list_objects(
763
            user, account, container, prefix, delimiter, marker, limit,
764
            virtual, domain, keys, shared, until, size_range, False, public)
765

    
766
    @debug_method
767
    def list_object_meta(self, user, account, container, prefix='',
768
                         delimiter=None, marker=None, limit=10000,
769
                         virtual=True, domain=None, keys=None, shared=False,
770
                         until=None, size_range=None, public=False):
771
        """Return a list of metadata dicts of objects under a container."""
772

    
773
        keys = keys or []
774
        props = self._list_objects(
775
            user, account, container, prefix, delimiter, marker, limit,
776
            virtual, domain, keys, shared, until, size_range, True, public)
777
        objects = []
778
        for p in props:
779
            if len(p) == 2:
780
                objects.append({'subdir': p[0]})
781
            else:
782
                objects.append({
783
                    'name': p[0],
784
                    'bytes': p[self.SIZE + 1],
785
                    'type': p[self.TYPE + 1],
786
                    'hash': p[self.HASH + 1],
787
                    'version': p[self.SERIAL + 1],
788
                    'version_timestamp': p[self.MTIME + 1],
789
                    'modified': p[self.MTIME + 1] if until is None else None,
790
                    'modified_by': p[self.MUSER + 1],
791
                    'uuid': p[self.UUID + 1],
792
                    'checksum': p[self.CHECKSUM + 1]})
793
        return objects
794

    
795
    @debug_method
796
    def list_object_permissions(self, user, account, container, prefix=''):
797
        """Return a list of paths enforce permissions under a container."""
798

    
799
        return self._list_object_permissions(user, account, container, prefix,
800
                                             True, False)
801

    
802
    @debug_method
803
    def list_object_public(self, user, account, container, prefix=''):
804
        """Return a mapping of object paths to public ids under a container."""
805

    
806
        public = {}
807
        for path, p in self.permissions.public_list('/'.join((account,
808
                                                              container,
809
                                                              prefix))):
810
            public[path] = p
811
        return public
812

    
813
    @debug_method
814
    def get_object_meta(self, user, account, container, name, domain,
815
                        version=None, include_user_defined=True):
816
        """Return a dictionary with the object metadata for the domain."""
817

    
818
        self._can_read(user, account, container, name)
819
        path, node = self._lookup_object(account, container, name)
820
        props = self._get_version(node, version)
821
        if version is None:
822
            modified = props[self.MTIME]
823
        else:
824
            try:
825
                modified = self._get_version(
826
                    node)[self.MTIME]  # Overall last modification.
827
            except NameError:  # Object may be deleted.
828
                del_props = self.node.version_lookup(
829
                    node, inf, CLUSTER_DELETED)
830
                if del_props is None:
831
                    raise ItemNotExists('Object does not exist')
832
                modified = del_props[self.MTIME]
833

    
834
        meta = {}
835
        if include_user_defined:
836
            meta.update(
837
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
838
        meta.update({'name': name,
839
                     'bytes': props[self.SIZE],
840
                     'type': props[self.TYPE],
841
                     'hash': props[self.HASH],
842
                     'version': props[self.SERIAL],
843
                     'version_timestamp': props[self.MTIME],
844
                     'modified': modified,
845
                     'modified_by': props[self.MUSER],
846
                     'uuid': props[self.UUID],
847
                     'checksum': props[self.CHECKSUM]})
848
        return meta
849

    
850
    @debug_method
851
    def update_object_meta(self, user, account, container, name, domain, meta,
852
                           replace=False):
853
        """Update object metadata for a domain and return the new version."""
854

    
855
        self._can_write(user, account, container, name)
856
        path, node = self._lookup_object(account, container, name)
857
        src_version_id, dest_version_id = self._put_metadata(
858
            user, node, domain, meta, replace,
859
            update_statistics_ancestors_depth=1)
860
        self._apply_versioning(account, container, src_version_id,
861
                               update_statistics_ancestors_depth=1)
862
        return dest_version_id
863

    
864
    @debug_method
865
    def get_object_permissions(self, user, account, container, name):
866
        """Return the action allowed on the object, the path
867
        from which the object gets its permissions from,
868
        along with a dictionary containing the permissions."""
869

    
870
        allowed = 'write'
871
        permissions_path = self._get_permissions_path(account, container, name)
872
        if user != account:
873
            if self.permissions.access_check(permissions_path, self.WRITE,
874
                                             user):
875
                allowed = 'write'
876
            elif self.permissions.access_check(permissions_path, self.READ,
877
                                               user):
878
                allowed = 'read'
879
            else:
880
                raise NotAllowedError
881
        self._lookup_object(account, container, name)
882
        return (allowed,
883
                permissions_path,
884
                self.permissions.access_get(permissions_path))
885

    
886
    @debug_method
887
    def update_object_permissions(self, user, account, container, name,
888
                                  permissions):
889
        """Update the permissions associated with the object."""
890

    
891
        if user != account:
892
            raise NotAllowedError
893
        path = self._lookup_object(account, container, name)[0]
894
        self._check_permissions(path, permissions)
895
        self.permissions.access_set(path, permissions)
896
        self._report_sharing_change(user, account, path, {'members':
897
                                    self.permissions.access_members(path)})
898

    
899
    @debug_method
900
    def get_object_public(self, user, account, container, name):
901
        """Return the public id of the object if applicable."""
902

    
903
        self._can_read(user, account, container, name)
904
        path = self._lookup_object(account, container, name)[0]
905
        p = self.permissions.public_get(path)
906
        return p
907

    
908
    @debug_method
909
    def update_object_public(self, user, account, container, name, public):
910
        """Update the public status of the object."""
911

    
912
        self._can_write(user, account, container, name)
913
        path = self._lookup_object(account, container, name)[0]
914
        if not public:
915
            self.permissions.public_unset(path)
916
        else:
917
            self.permissions.public_set(
918
                path, self.public_url_security, self.public_url_alphabet
919
            )
920

    
921
    @debug_method
922
    def get_object_hashmap(self, user, account, container, name, version=None):
923
        """Return the object's size and a list with partial hashes."""
924

    
925
        self._can_read(user, account, container, name)
926
        path, node = self._lookup_object(account, container, name)
927
        props = self._get_version(node, version)
928
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
929
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
930

    
931
    def _update_object_hash(self, user, account, container, name, size, type,
932
                            hash, checksum, domain, meta, replace_meta,
933
                            permissions, src_node=None, src_version_id=None,
934
                            is_copy=False, report_size_change=True):
935
        if permissions is not None and user != account:
936
            raise NotAllowedError
937
        self._can_write(user, account, container, name)
938
        if permissions is not None:
939
            path = '/'.join((account, container, name))
940
            self._check_permissions(path, permissions)
941

    
942
        account_path, account_node = self._lookup_account(account, True)
943
        container_path, container_node = self._lookup_container(
944
            account, container)
945

    
946
        path, node = self._put_object_node(
947
            container_path, container_node, name)
948
        pre_version_id, dest_version_id = self._put_version_duplicate(
949
            user, node, src_node=src_node, size=size, type=type, hash=hash,
950
            checksum=checksum, is_copy=is_copy,
951
            update_statistics_ancestors_depth=1)
952

    
953
        # Handle meta.
954
        if src_version_id is None:
955
            src_version_id = pre_version_id
956
        self._put_metadata_duplicate(
957
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
958

    
959
        del_size = self._apply_versioning(account, container, pre_version_id,
960
                                          update_statistics_ancestors_depth=1)
961
        size_delta = size - del_size
962
        if size_delta > 0:
963
            # Check account quota.
964
            if not self.using_external_quotaholder:
965
                account_quota = long(self._get_policy(
966
                    account_node, is_account_policy=True)['quota'])
967
                account_usage = self._get_statistics(account_node,
968
                                                     compute=True)[1]
969
                if (account_quota > 0 and account_usage > account_quota):
970
                    raise QuotaError(
971
                        'Account quota exceeded: limit: %s, usage: %s' % (
972
                            account_quota, account_usage))
973

    
974
            # Check container quota.
975
            container_quota = long(self._get_policy(
976
                container_node, is_account_policy=False)['quota'])
977
            container_usage = self._get_statistics(container_node)[1]
978
            if (container_quota > 0 and container_usage > container_quota):
979
                # This must be executed in a transaction, so the version is
980
                # never created if it fails.
981
                raise QuotaError(
982
                    'Container quota exceeded: limit: %s, usage: %s' % (
983
                        container_quota, container_usage
984
                    )
985
                )
986

    
987
        if report_size_change:
988
            self._report_size_change(
989
                user, account, size_delta,
990
                {'action': 'object update', 'path': path,
991
                 'versions': ','.join([str(dest_version_id)])})
992
        if permissions is not None:
993
            self.permissions.access_set(path, permissions)
994
            self._report_sharing_change(
995
                user, account, path,
996
                {'members': self.permissions.access_members(path)})
997

    
998
        self._report_object_change(
999
            user, account, path,
1000
            details={'version': dest_version_id, 'action': 'object update'})
1001
        return dest_version_id
1002

    
1003
    @debug_method
1004
    def update_object_hashmap(self, user, account, container, name, size, type,
1005
                              hashmap, checksum, domain, meta=None,
1006
                              replace_meta=False, permissions=None):
1007
        """Create/update an object's hashmap and return the new version."""
1008

    
1009
        meta = meta or {}
1010
        if size == 0:  # No such thing as an empty hashmap.
1011
            hashmap = [self.put_block('')]
1012
        map = HashMap(self.block_size, self.hash_algorithm)
1013
        map.extend([binascii.unhexlify(x) for x in hashmap])
1014
        missing = self.store.block_search(map)
1015
        if missing:
1016
            ie = IndexError()
1017
            ie.data = [binascii.hexlify(x) for x in missing]
1018
            raise ie
1019

    
1020
        hash = map.hash()
1021
        hexlified = binascii.hexlify(hash)
1022
        dest_version_id = self._update_object_hash(
1023
            user, account, container, name, size, type, hexlified, checksum,
1024
            domain, meta, replace_meta, permissions)
1025
        self.store.map_put(hash, map)
1026
        return dest_version_id, hexlified
1027

    
1028
    @debug_method
1029
    def update_object_checksum(self, user, account, container, name, version,
1030
                               checksum):
1031
        """Update an object's checksum."""
1032

    
1033
        # Update objects with greater version and same hashmap
1034
        # and size (fix metadata updates).
1035
        self._can_write(user, account, container, name)
1036
        path, node = self._lookup_object(account, container, name)
1037
        props = self._get_version(node, version)
1038
        versions = self.node.node_get_versions(node)
1039
        for x in versions:
1040
            if (x[self.SERIAL] >= int(version) and
1041
                x[self.HASH] == props[self.HASH] and
1042
                    x[self.SIZE] == props[self.SIZE]):
1043
                self.node.version_put_property(
1044
                    x[self.SERIAL], 'checksum', checksum)
1045

    
1046
    def _copy_object(self, user, src_account, src_container, src_name,
1047
                     dest_account, dest_container, dest_name, type,
1048
                     dest_domain=None, dest_meta=None, replace_meta=False,
1049
                     permissions=None, src_version=None, is_move=False,
1050
                     delimiter=None):
1051

    
1052
        report_size_change = not is_move
1053
        dest_meta = dest_meta or {}
1054
        dest_version_ids = []
1055
        self._can_read(user, src_account, src_container, src_name)
1056
        path, node = self._lookup_object(src_account, src_container, src_name)
1057
        # TODO: Will do another fetch of the properties in duplicate version...
1058
        props = self._get_version(
1059
            node, src_version)  # Check to see if source exists.
1060
        src_version_id = props[self.SERIAL]
1061
        hash = props[self.HASH]
1062
        size = props[self.SIZE]
1063
        is_copy = not is_move and (src_account, src_container, src_name) != (
1064
            dest_account, dest_container, dest_name)  # New uuid.
1065
        dest_version_ids.append(self._update_object_hash(
1066
            user, dest_account, dest_container, dest_name, size, type, hash,
1067
            None, dest_domain, dest_meta, replace_meta, permissions,
1068
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1069
            report_size_change=report_size_change))
1070
        if is_move and ((src_account, src_container, src_name) !=
1071
                        (dest_account, dest_container, dest_name)):
1072
            self._delete_object(user, src_account, src_container, src_name,
1073
                                report_size_change=report_size_change)
1074

    
1075
        if delimiter:
1076
            prefix = (src_name + delimiter if not
1077
                      src_name.endswith(delimiter) else src_name)
1078
            src_names = self._list_objects_no_limit(
1079
                user, src_account, src_container, prefix, delimiter=None,
1080
                virtual=False, domain=None, keys=[], shared=False, until=None,
1081
                size_range=None, all_props=True, public=False)
1082
            src_names.sort(key=lambda x: x[2])  # order by nodes
1083
            paths = [elem[0] for elem in src_names]
1084
            nodes = [elem[2] for elem in src_names]
1085
            # TODO: Will do another fetch of the properties
1086
            # in duplicate version...
1087
            props = self._get_versions(nodes)  # Check to see if source exists.
1088

    
1089
            for prop, path, node in zip(props, paths, nodes):
1090
                src_version_id = prop[self.SERIAL]
1091
                hash = prop[self.HASH]
1092
                vtype = prop[self.TYPE]
1093
                size = prop[self.SIZE]
1094
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1095
                    delimiter) else dest_name
1096
                vdest_name = path.replace(prefix, dest_prefix, 1)
1097
                dest_version_ids.append(self._update_object_hash(
1098
                    user, dest_account, dest_container, vdest_name, size,
1099
                    vtype, hash, None, dest_domain, meta={},
1100
                    replace_meta=False, permissions=None, src_node=node,
1101
                    src_version_id=src_version_id, is_copy=is_copy))
1102
                if is_move and ((src_account, src_container, src_name) !=
1103
                                (dest_account, dest_container, dest_name)):
1104
                    self._delete_object(user, src_account, src_container, path)
1105
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1106
                dest_version_ids)
1107

    
1108
    @debug_method
1109
    def copy_object(self, user, src_account, src_container, src_name,
1110
                    dest_account, dest_container, dest_name, type, domain,
1111
                    meta=None, replace_meta=False, permissions=None,
1112
                    src_version=None, delimiter=None):
1113
        """Copy an object's data and metadata."""
1114

    
1115
        meta = meta or {}
1116
        dest_version_id = self._copy_object(
1117
            user, src_account, src_container, src_name, dest_account,
1118
            dest_container, dest_name, type, domain, meta, replace_meta,
1119
            permissions, src_version, False, delimiter)
1120
        return dest_version_id
1121

    
1122
    @debug_method
1123
    def move_object(self, user, src_account, src_container, src_name,
1124
                    dest_account, dest_container, dest_name, type, domain,
1125
                    meta=None, replace_meta=False, permissions=None,
1126
                    delimiter=None):
1127
        """Move an object's data and metadata."""
1128

    
1129
        meta = meta or {}
1130
        if user != src_account:
1131
            raise NotAllowedError
1132
        dest_version_id = self._move_object(
1133
            user, src_account, src_container, src_name, dest_account,
1134
            dest_container, dest_name, type, domain, meta, replace_meta,
1135
            permissions, None, delimiter=delimiter)
1136
        return dest_version_id
1137

    
1138
    def _delete_object(self, user, account, container, name, until=None,
1139
                       delimiter=None, report_size_change=True):
1140
        if user != account:
1141
            raise NotAllowedError
1142

    
1143
        if until is not None:
1144
            path = '/'.join((account, container, name))
1145
            node = self.node.node_lookup(path)
1146
            if node is None:
1147
                return
1148
            hashes = []
1149
            size = 0
1150
            serials = []
1151
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1152
                                           update_statistics_ancestors_depth=1)
1153
            hashes += h
1154
            size += s
1155
            serials += v
1156
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1157
                                           update_statistics_ancestors_depth=1)
1158
            hashes += h
1159
            if not self.free_versioning:
1160
                size += s
1161
            serials += v
1162
            for h in hashes:
1163
                self.store.map_delete(h)
1164
            self.node.node_purge(node, until, CLUSTER_DELETED,
1165
                                 update_statistics_ancestors_depth=1)
1166
            try:
1167
                self._get_version(node)
1168
            except NameError:
1169
                self.permissions.access_clear(path)
1170
            self._report_size_change(
1171
                user, account, -size, {
1172
                    'action': 'object purge',
1173
                    'path': path,
1174
                    'versions': ','.join(str(i) for i in serials)
1175
                }
1176
            )
1177
            return
1178

    
1179
        path, node = self._lookup_object(account, container, name)
1180
        src_version_id, dest_version_id = self._put_version_duplicate(
1181
            user, node, size=0, type='', hash=None, checksum='',
1182
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1183
        del_size = self._apply_versioning(account, container, src_version_id,
1184
                                          update_statistics_ancestors_depth=1)
1185
        if report_size_change:
1186
            self._report_size_change(
1187
                user, account, -del_size,
1188
                {'action': 'object delete',
1189
                 'path': path,
1190
                 'versions': ','.join([str(dest_version_id)])})
1191
        self._report_object_change(
1192
            user, account, path, details={'action': 'object delete'})
1193
        self.permissions.access_clear(path)
1194

    
1195
        if delimiter:
1196
            prefix = name + delimiter if not name.endswith(delimiter) else name
1197
            src_names = self._list_objects_no_limit(
1198
                user, account, container, prefix, delimiter=None,
1199
                virtual=False, domain=None, keys=[], shared=False, until=None,
1200
                size_range=None, all_props=True, public=False)
1201
            paths = []
1202
            for t in src_names:
1203
                path = '/'.join((account, container, t[0]))
1204
                node = t[2]
1205
                src_version_id, dest_version_id = self._put_version_duplicate(
1206
                    user, node, size=0, type='', hash=None, checksum='',
1207
                    cluster=CLUSTER_DELETED,
1208
                    update_statistics_ancestors_depth=1)
1209
                del_size = self._apply_versioning(
1210
                    account, container, src_version_id,
1211
                    update_statistics_ancestors_depth=1)
1212
                if report_size_change:
1213
                    self._report_size_change(
1214
                        user, account, -del_size,
1215
                        {'action': 'object delete',
1216
                         'path': path,
1217
                         'versions': ','.join([str(dest_version_id)])})
1218
                self._report_object_change(
1219
                    user, account, path, details={'action': 'object delete'})
1220
                paths.append(path)
1221
            self.permissions.access_clear_bulk(paths)
1222

    
1223
    @debug_method
1224
    def delete_object(self, user, account, container, name, until=None,
1225
                      prefix='', delimiter=None):
1226
        """Delete/purge an object."""
1227

    
1228
        self._delete_object(user, account, container, name, until, delimiter)
1229

    
1230
    @debug_method
1231
    def list_versions(self, user, account, container, name):
1232
        """Return a list of all object (version, version_timestamp) tuples."""
1233

    
1234
        self._can_read(user, account, container, name)
1235
        path, node = self._lookup_object(account, container, name)
1236
        versions = self.node.node_get_versions(node)
1237
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1238
                x[self.CLUSTER] != CLUSTER_DELETED]
1239

    
1240
    @debug_method
1241
    def get_uuid(self, user, uuid):
1242
        """Return the (account, container, name) for the UUID given."""
1243

    
1244
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1245
        if info is None:
1246
            raise NameError
1247
        path, serial = info
1248
        account, container, name = path.split('/', 2)
1249
        self._can_read(user, account, container, name)
1250
        return (account, container, name)
1251

    
1252
    @debug_method
1253
    def get_public(self, user, public):
1254
        """Return the (account, container, name) for the public id given."""
1255

    
1256
        path = self.permissions.public_path(public)
1257
        if path is None:
1258
            raise NameError
1259
        account, container, name = path.split('/', 2)
1260
        self._can_read(user, account, container, name)
1261
        return (account, container, name)
1262

    
1263
    @debug_method
1264
    def get_block(self, hash):
1265
        """Return a block's data."""
1266

    
1267
        block = self.store.block_get(binascii.unhexlify(hash))
1268
        if not block:
1269
            raise ItemNotExists('Block does not exist')
1270
        return block
1271

    
1272
    def put_block(self, data):
1273
        """Store a block and return the hash."""
1274

    
1275
        logger.debug("put_block: %s", len(data))
1276
        return binascii.hexlify(self.store.block_put(data))
1277

    
1278
    def update_block(self, hash, data, offset=0):
1279
        """Update a known block and return the hash."""
1280

    
1281
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1282
        if offset == 0 and len(data) == self.block_size:
1283
            return self.put_block(data)
1284
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1285
        return binascii.hexlify(h)
1286

    
1287
    # Path functions.
1288

    
1289
    def _generate_uuid(self):
1290
        return str(uuidlib.uuid4())
1291

    
1292
    def _put_object_node(self, path, parent, name):
1293
        path = '/'.join((path, name))
1294
        node = self.node.node_lookup(path)
1295
        if node is None:
1296
            node = self.node.node_create(parent, path)
1297
        return path, node
1298

    
1299
    def _put_path(self, user, parent, path,
1300
                  update_statistics_ancestors_depth=None):
1301
        node = self.node.node_create(parent, path)
1302
        self.node.version_create(node, None, 0, '', None, user,
1303
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1304
                                 update_statistics_ancestors_depth)
1305
        return node
1306

    
1307
    def _lookup_account(self, account, create=True):
1308
        for_update = True if create else False
1309
        node = self.node.node_lookup(account, for_update=for_update)
1310
        if node is None and create:
1311
            node = self._put_path(
1312
                account, self.ROOTNODE, account,
1313
                update_statistics_ancestors_depth=-1)  # User is account.
1314
        return account, node
1315

    
1316
    def _lookup_container(self, account, container):
1317
        for_update = True if self.lock_container_path else False
1318
        path = '/'.join((account, container))
1319
        node = self.node.node_lookup(path, for_update)
1320
        if node is None:
1321
            raise ItemNotExists('Container does not exist')
1322
        return path, node
1323

    
1324
    def _lookup_object(self, account, container, name):
1325
        path = '/'.join((account, container, name))
1326
        node = self.node.node_lookup(path)
1327
        if node is None:
1328
            raise ItemNotExists('Object does not exist')
1329
        return path, node
1330

    
1331
    def _lookup_objects(self, paths):
1332
        nodes = self.node.node_lookup_bulk(paths)
1333
        return paths, nodes
1334

    
1335
    def _get_properties(self, node, until=None):
1336
        """Return properties until the timestamp given."""
1337

    
1338
        before = until if until is not None else inf
1339
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1340
        if props is None and until is not None:
1341
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1342
        if props is None:
1343
            raise ItemNotExists('Path does not exist')
1344
        return props
1345

    
1346
    def _get_statistics(self, node, until=None, compute=False):
1347
        """Return (count, sum of size, timestamp) of everything under node."""
1348

    
1349
        if until is not None:
1350
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1351
        elif compute:
1352
            stats = self.node.statistics_latest(node,
1353
                                                except_cluster=CLUSTER_DELETED)
1354
        else:
1355
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1356
        if stats is None:
1357
            stats = (0, 0, 0)
1358
        return stats
1359

    
1360
    def _get_version(self, node, version=None):
1361
        if version is None:
1362
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1363
            if props is None:
1364
                raise ItemNotExists('Object does not exist')
1365
        else:
1366
            try:
1367
                version = int(version)
1368
            except ValueError:
1369
                raise VersionNotExists('Version does not exist')
1370
            props = self.node.version_get_properties(version)
1371
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1372
                raise VersionNotExists('Version does not exist')
1373
        return props
1374

    
1375
    def _get_versions(self, nodes):
1376
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1377

    
1378
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1379
                               type=None, hash=None, checksum=None,
1380
                               cluster=CLUSTER_NORMAL, is_copy=False,
1381
                               update_statistics_ancestors_depth=None):
1382
        """Create a new version of the node."""
1383

    
1384
        props = self.node.version_lookup(
1385
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1386
        if props is not None:
1387
            src_version_id = props[self.SERIAL]
1388
            src_hash = props[self.HASH]
1389
            src_size = props[self.SIZE]
1390
            src_type = props[self.TYPE]
1391
            src_checksum = props[self.CHECKSUM]
1392
        else:
1393
            src_version_id = None
1394
            src_hash = None
1395
            src_size = 0
1396
            src_type = ''
1397
            src_checksum = ''
1398
        if size is None:  # Set metadata.
1399
            hash = src_hash  # This way hash can be set to None
1400
                             # (account or container).
1401
            size = src_size
1402
        if type is None:
1403
            type = src_type
1404
        if checksum is None:
1405
            checksum = src_checksum
1406
        uuid = self._generate_uuid(
1407
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1408

    
1409
        if src_node is None:
1410
            pre_version_id = src_version_id
1411
        else:
1412
            pre_version_id = None
1413
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1414
            if props is not None:
1415
                pre_version_id = props[self.SERIAL]
1416
        if pre_version_id is not None:
1417
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1418
                                        update_statistics_ancestors_depth)
1419

    
1420
        dest_version_id, mtime = self.node.version_create(
1421
            node, hash, size, type, src_version_id, user, uuid, checksum,
1422
            cluster, update_statistics_ancestors_depth)
1423

    
1424
        self.node.attribute_unset_is_latest(node, dest_version_id)
1425

    
1426
        return pre_version_id, dest_version_id
1427

    
1428
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1429
                                node, meta, replace=False):
1430
        if src_version_id is not None:
1431
            self.node.attribute_copy(src_version_id, dest_version_id)
1432
        if not replace:
1433
            self.node.attribute_del(dest_version_id, domain, (
1434
                k for k, v in meta.iteritems() if v == ''))
1435
            self.node.attribute_set(dest_version_id, domain, node, (
1436
                (k, v) for k, v in meta.iteritems() if v != ''))
1437
        else:
1438
            self.node.attribute_del(dest_version_id, domain)
1439
            self.node.attribute_set(dest_version_id, domain, node, ((
1440
                k, v) for k, v in meta.iteritems()))
1441

    
1442
    def _put_metadata(self, user, node, domain, meta, replace=False,
1443
                      update_statistics_ancestors_depth=None):
1444
        """Create a new version and store metadata."""
1445

    
1446
        src_version_id, dest_version_id = self._put_version_duplicate(
1447
            user, node,
1448
            update_statistics_ancestors_depth=
1449
            update_statistics_ancestors_depth)
1450
        self._put_metadata_duplicate(
1451
            src_version_id, dest_version_id, domain, node, meta, replace)
1452
        return src_version_id, dest_version_id
1453

    
1454
    def _list_limits(self, listing, marker, limit):
1455
        start = 0
1456
        if marker:
1457
            try:
1458
                start = listing.index(marker) + 1
1459
            except ValueError:
1460
                pass
1461
        if not limit or limit > 10000:
1462
            limit = 10000
1463
        return start, limit
1464

    
1465
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1466
                                marker=None, limit=10000, virtual=True,
1467
                                domain=None, keys=None, until=None,
1468
                                size_range=None, allowed=None,
1469
                                all_props=False):
1470
        keys = keys or []
1471
        allowed = allowed or []
1472
        cont_prefix = path + '/'
1473
        prefix = cont_prefix + prefix
1474
        start = cont_prefix + marker if marker else None
1475
        before = until if until is not None else inf
1476
        filterq = keys if domain else []
1477
        sizeq = size_range
1478

    
1479
        objects, prefixes = self.node.latest_version_list(
1480
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1481
            allowed, domain, filterq, sizeq, all_props)
1482
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1483
        objects.sort(key=lambda x: x[0])
1484
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1485
        return objects
1486

    
1487
    # Reporting functions.
1488

    
1489
    @debug_method
1490
    def _report_size_change(self, user, account, size, details=None):
1491
        details = details or {}
1492

    
1493
        if size == 0:
1494
            return
1495

    
1496
        account_node = self._lookup_account(account, True)[1]
1497
        total = self._get_statistics(account_node, compute=True)[1]
1498
        details.update({'user': user, 'total': total})
1499
        self.messages.append(
1500
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1501
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1502

    
1503
        if not self.using_external_quotaholder:
1504
            return
1505

    
1506
        try:
1507
            name = details['path'] if 'path' in details else ''
1508
            serial = self.astakosclient.issue_one_commission(
1509
                token=self.service_token,
1510
                holder=account,
1511
                source=DEFAULT_SOURCE,
1512
                provisions={'pithos.diskspace': size},
1513
                name=name)
1514
        except BaseException, e:
1515
            raise QuotaError(e)
1516
        else:
1517
            self.serials.append(serial)
1518

    
1519
    @debug_method
1520
    def _report_object_change(self, user, account, path, details=None):
1521
        details = details or {}
1522
        details.update({'user': user})
1523
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1524
                              account, QUEUE_INSTANCE_ID, 'object', path,
1525
                              details))
1526

    
1527
    @debug_method
1528
    def _report_sharing_change(self, user, account, path, details=None):
1529
        details = details or {}
1530
        details.update({'user': user})
1531
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1532
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1533
                              details))
1534

    
1535
    # Policy functions.
1536

    
1537
    def _check_policy(self, policy, is_account_policy=True):
1538
        default_policy = self.default_account_policy \
1539
            if is_account_policy else self.default_container_policy
1540
        for k in policy.keys():
1541
            if policy[k] == '':
1542
                policy[k] = default_policy.get(k)
1543
        for k, v in policy.iteritems():
1544
            if k == 'quota':
1545
                q = int(v)  # May raise ValueError.
1546
                if q < 0:
1547
                    raise ValueError
1548
            elif k == 'versioning':
1549
                if v not in ['auto', 'none']:
1550
                    raise ValueError
1551
            else:
1552
                raise ValueError
1553

    
1554
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1555
        default_policy = self.default_account_policy \
1556
            if is_account_policy else self.default_container_policy
1557
        if replace:
1558
            for k, v in default_policy.iteritems():
1559
                if k not in policy:
1560
                    policy[k] = v
1561
        self.node.policy_set(node, policy)
1562

    
1563
    def _get_policy(self, node, is_account_policy=True):
1564
        default_policy = self.default_account_policy \
1565
            if is_account_policy else self.default_container_policy
1566
        policy = default_policy.copy()
1567
        policy.update(self.node.policy_get(node))
1568
        return policy
1569

    
1570
    def _apply_versioning(self, account, container, version_id,
1571
                          update_statistics_ancestors_depth=None):
1572
        """Delete the provided version if such is the policy.
1573
           Return size of object removed.
1574
        """
1575

    
1576
        if version_id is None:
1577
            return 0
1578
        path, node = self._lookup_container(account, container)
1579
        versioning = self._get_policy(
1580
            node, is_account_policy=False)['versioning']
1581
        if versioning != 'auto':
1582
            hash, size = self.node.version_remove(
1583
                version_id, update_statistics_ancestors_depth)
1584
            self.store.map_delete(hash)
1585
            return size
1586
        elif self.free_versioning:
1587
            return self.node.version_get_properties(
1588
                version_id, keys=('size',))[0]
1589
        return 0
1590

    
1591
    # Access control functions.
1592

    
1593
    def _check_groups(self, groups):
1594
        # raise ValueError('Bad characters in groups')
1595
        pass
1596

    
1597
    def _check_permissions(self, path, permissions):
1598
        # raise ValueError('Bad characters in permissions')
1599
        pass
1600

    
1601
    def _get_formatted_paths(self, paths):
1602
        formatted = []
1603
        for p in paths:
1604
            node = self.node.node_lookup(p)
1605
            props = None
1606
            if node is not None:
1607
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1608
            if props is not None:
1609
                if props[self.TYPE].split(';', 1)[0].strip() in (
1610
                        'application/directory', 'application/folder'):
1611
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1612
                formatted.append((p, self.MATCH_EXACT))
1613
        return formatted
1614

    
1615
    def _get_permissions_path(self, account, container, name):
1616
        path = '/'.join((account, container, name))
1617
        permission_paths = self.permissions.access_inherit(path)
1618
        permission_paths.sort()
1619
        permission_paths.reverse()
1620
        for p in permission_paths:
1621
            if p == path:
1622
                return p
1623
            else:
1624
                if p.count('/') < 2:
1625
                    continue
1626
                node = self.node.node_lookup(p)
1627
                props = None
1628
                if node is not None:
1629
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1630
                if props is not None:
1631
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1632
                            'application/directory', 'application/folder'):
1633
                        return p
1634
        return None
1635

    
1636
    def _can_read(self, user, account, container, name):
1637
        if user == account:
1638
            return True
1639
        path = '/'.join((account, container, name))
1640
        if self.permissions.public_get(path) is not None:
1641
            return True
1642
        path = self._get_permissions_path(account, container, name)
1643
        if not path:
1644
            raise NotAllowedError
1645
        if (not self.permissions.access_check(path, self.READ, user) and not
1646
                self.permissions.access_check(path, self.WRITE, user)):
1647
            raise NotAllowedError
1648

    
1649
    def _can_write(self, user, account, container, name):
1650
        if user == account:
1651
            return True
1652
        path = '/'.join((account, container, name))
1653
        path = self._get_permissions_path(account, container, name)
1654
        if not path:
1655
            raise NotAllowedError
1656
        if not self.permissions.access_check(path, self.WRITE, user):
1657
            raise NotAllowedError
1658

    
1659
    def _allowed_accounts(self, user):
1660
        allow = set()
1661
        for path in self.permissions.access_list_paths(user):
1662
            allow.add(path.split('/', 1)[0])
1663
        return sorted(allow)
1664

    
1665
    def _allowed_containers(self, user, account):
1666
        allow = set()
1667
        for path in self.permissions.access_list_paths(user, account):
1668
            allow.add(path.split('/', 2)[1])
1669
        return sorted(allow)
1670

    
1671
    # Domain functions
1672

    
1673
    @debug_method
1674
    def get_domain_objects(self, domain, user=None):
1675
        allowed_paths = self.permissions.access_list_paths(
1676
            user, include_owned=user is not None, include_containers=False)
1677
        if not allowed_paths:
1678
            return []
1679
        obj_list = self.node.domain_object_list(
1680
            domain, allowed_paths, CLUSTER_NORMAL)
1681
        return [(path,
1682
                 self._build_metadata(props, user_defined_meta),
1683
                 self.permissions.access_get(path)) for
1684
                path, props, user_defined_meta in obj_list]
1685

    
1686
    # util functions
1687

    
1688
    def _build_metadata(self, props, user_defined=None,
1689
                        include_user_defined=True):
1690
        meta = {'bytes': props[self.SIZE],
1691
                'type': props[self.TYPE],
1692
                'hash': props[self.HASH],
1693
                'version': props[self.SERIAL],
1694
                'version_timestamp': props[self.MTIME],
1695
                'modified_by': props[self.MUSER],
1696
                'uuid': props[self.UUID],
1697
                'checksum': props[self.CHECKSUM]}
1698
        if include_user_defined and user_defined is not None:
1699
            meta.update(user_defined)
1700
        return meta
1701

    
1702
    def _has_read_access(self, user, path):
1703
        try:
1704
            account, container, object = path.split('/', 2)
1705
        except ValueError:
1706
            raise ValueError('Invalid object path')
1707

    
1708
        assert isinstance(user, basestring), "Invalid user"
1709

    
1710
        try:
1711
            self._can_read(user, account, container, object)
1712
        except NotAllowedError:
1713
            return False
1714
        else:
1715
            return True