Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ af395b9c

History | View | Annotate | Download (69.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.block_module = load_module(block_module)
213
        self.block_params = block_params
214
        params = {'path': block_path,
215
                  'block_size': self.block_size,
216
                  'hash_algorithm': self.hash_algorithm,
217
                  'umask': block_umask}
218
        params.update(self.block_params)
219
        self.store = self.block_module.Store(**params)
220

    
221
        if queue_module and queue_hosts:
222
            self.queue_module = load_module(queue_module)
223
            params = {'hosts': queue_hosts,
224
                      'exchange': queue_exchange,
225
                      'client_id': QUEUE_CLIENT_ID}
226
            self.queue = self.queue_module.Queue(**params)
227
        else:
228
            class NoQueue:
229
                def send(self, *args):
230
                    pass
231

    
232
                def close(self):
233
                    pass
234

    
235
            self.queue = NoQueue()
236

    
237
        self.astakos_url = astakos_url
238
        self.service_token = service_token
239

    
240
        if not astakos_url or not AstakosClient:
241
            self.astakosclient = DisabledAstakosClient(
242
                astakos_url,
243
                use_pool=True,
244
                pool_size=astakosclient_poolsize)
245
        else:
246
            self.astakosclient = AstakosClient(
247
                astakos_url,
248
                use_pool=True,
249
                pool_size=astakosclient_poolsize)
250

    
251
        self.serials = []
252
        self.messages = []
253

    
254
        self._move_object = partial(self._copy_object, is_move=True)
255

    
256
        self.lock_container_path = False
257

    
258
    def pre_exec(self, lock_container_path=False):
259
        self.lock_container_path = lock_container_path
260
        self.wrapper.execute()
261

    
262
    def post_exec(self, success_status=True):
263
        if success_status:
264
            # send messages produced
265
            for m in self.messages:
266
                self.queue.send(*m)
267

    
268
            # register serials
269
            if self.serials:
270
                self.commission_serials.insert_many(
271
                    self.serials)
272

    
273
                # commit to ensure that the serials are registered
274
                # even if resolve commission fails
275
                self.wrapper.commit()
276

    
277
                # start new transaction
278
                self.wrapper.execute()
279

    
280
                r = self.astakosclient.resolve_commissions(
281
                    token=self.service_token,
282
                    accept_serials=self.serials,
283
                    reject_serials=[])
284
                self.commission_serials.delete_many(
285
                    r['accepted'])
286

    
287
            self.wrapper.commit()
288
        else:
289
            if self.serials:
290
                self.astakosclient.resolve_commissions(
291
                    token=self.service_token,
292
                    accept_serials=[],
293
                    reject_serials=self.serials)
294
            self.wrapper.rollback()
295

    
296
    def close(self):
297
        self.wrapper.close()
298
        self.queue.close()
299

    
300
    @property
301
    def using_external_quotaholder(self):
302
        return not isinstance(self.astakosclient, DisabledAstakosClient)
303

    
304
    @debug_method
305
    def list_accounts(self, user, marker=None, limit=10000):
306
        """Return a list of accounts the user can access."""
307

    
308
        allowed = self._allowed_accounts(user)
309
        start, limit = self._list_limits(allowed, marker, limit)
310
        return allowed[start:start + limit]
311

    
312
    @debug_method
313
    def get_account_meta(
314
            self, user, account, domain, until=None, include_user_defined=True,
315
            external_quota=None):
316
        """Return a dictionary with the account metadata for the domain."""
317

    
318
        path, node = self._lookup_account(account, user == account)
319
        if user != account:
320
            if until or (node is None) or (account not
321
                                           in self._allowed_accounts(user)):
322
                raise NotAllowedError
323
        try:
324
            props = self._get_properties(node, until)
325
            mtime = props[self.MTIME]
326
        except NameError:
327
            props = None
328
            mtime = until
329
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
330
        tstamp = max(tstamp, mtime)
331
        if until is None:
332
            modified = tstamp
333
        else:
334
            modified = self._get_statistics(
335
                node, compute=True)[2]  # Overall last modification.
336
            modified = max(modified, mtime)
337

    
338
        if user != account:
339
            meta = {'name': account}
340
        else:
341
            meta = {}
342
            if props is not None and include_user_defined:
343
                meta.update(
344
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
345
            if until is not None:
346
                meta.update({'until_timestamp': tstamp})
347
            meta.update({'name': account, 'count': count, 'bytes': bytes})
348
            if self.using_external_quotaholder:
349
                external_quota = external_quota or {}
350
                meta['bytes'] = external_quota.get('usage', 0)
351
        meta.update({'modified': modified})
352
        return meta
353

    
354
    @debug_method
355
    def update_account_meta(self, user, account, domain, meta, replace=False):
356
        """Update the metadata associated with the account for the domain."""
357

    
358
        if user != account:
359
            raise NotAllowedError
360
        path, node = self._lookup_account(account, True)
361
        self._put_metadata(user, node, domain, meta, replace,
362
                           update_statistics_ancestors_depth=-1)
363

    
364
    @debug_method
365
    def get_account_groups(self, user, account):
366
        """Return a dictionary with the user groups defined for the account."""
367

    
368
        if user != account:
369
            if account not in self._allowed_accounts(user):
370
                raise NotAllowedError
371
            return {}
372
        self._lookup_account(account, True)
373
        return self.permissions.group_dict(account)
374

    
375
    @debug_method
376
    def update_account_groups(self, user, account, groups, replace=False):
377
        """Update the groups associated with the account."""
378

    
379
        if user != account:
380
            raise NotAllowedError
381
        self._lookup_account(account, True)
382
        self._check_groups(groups)
383
        if replace:
384
            self.permissions.group_destroy(account)
385
        for k, v in groups.iteritems():
386
            if not replace:  # If not already deleted.
387
                self.permissions.group_delete(account, k)
388
            if v:
389
                self.permissions.group_addmany(account, k, v)
390

    
391
    @debug_method
392
    def get_account_policy(self, user, account, external_quota=None):
393
        """Return a dictionary with the account policy."""
394

    
395
        if user != account:
396
            if account not in self._allowed_accounts(user):
397
                raise NotAllowedError
398
            return {}
399
        path, node = self._lookup_account(account, True)
400
        policy = self._get_policy(node, is_account_policy=True)
401
        if self.using_external_quotaholder:
402
            external_quota = external_quota or {}
403
            policy['quota'] = external_quota.get('limit', 0)
404
        return policy
405

    
406
    @debug_method
407
    def update_account_policy(self, user, account, policy, replace=False):
408
        """Update the policy associated with the account."""
409

    
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_account(account, True)
413
        self._check_policy(policy, is_account_policy=True)
414
        self._put_policy(node, policy, replace, is_account_policy=True)
415

    
416
    @debug_method
417
    def put_account(self, user, account, policy=None):
418
        """Create a new account with the given name."""
419

    
420
        policy = policy or {}
421
        if user != account:
422
            raise NotAllowedError
423
        node = self.node.node_lookup(account)
424
        if node is not None:
425
            raise AccountExists('Account already exists')
426
        if policy:
427
            self._check_policy(policy, is_account_policy=True)
428
        node = self._put_path(user, self.ROOTNODE, account,
429
                              update_statistics_ancestors_depth=-1)
430
        self._put_policy(node, policy, True, is_account_policy=True)
431

    
432
    @debug_method
433
    def delete_account(self, user, account):
434
        """Delete the account with the given name."""
435

    
436
        if user != account:
437
            raise NotAllowedError
438
        node = self.node.node_lookup(account)
439
        if node is None:
440
            return
441
        if not self.node.node_remove(node,
442
                                     update_statistics_ancestors_depth=-1):
443
            raise AccountNotEmpty('Account is not empty')
444
        self.permissions.group_destroy(account)
445

    
446
    @debug_method
447
    def list_containers(self, user, account, marker=None, limit=10000,
448
                        shared=False, until=None, public=False):
449
        """Return a list of containers existing under an account."""
450

    
451
        if user != account:
452
            if until or account not in self._allowed_accounts(user):
453
                raise NotAllowedError
454
            allowed = self._allowed_containers(user, account)
455
            start, limit = self._list_limits(allowed, marker, limit)
456
            return allowed[start:start + limit]
457
        if shared or public:
458
            allowed = set()
459
            if shared:
460
                allowed.update([x.split('/', 2)[1] for x in
461
                               self.permissions.access_list_shared(account)])
462
            if public:
463
                allowed.update([x[0].split('/', 2)[1] for x in
464
                               self.permissions.public_list(account)])
465
            allowed = sorted(allowed)
466
            start, limit = self._list_limits(allowed, marker, limit)
467
            return allowed[start:start + limit]
468
        node = self.node.node_lookup(account)
469
        containers = [x[0] for x in self._list_object_properties(
470
            node, account, '', '/', marker, limit, False, None, [], until)]
471
        start, limit = self._list_limits(
472
            [x[0] for x in containers], marker, limit)
473
        return containers[start:start + limit]
474

    
475
    @debug_method
476
    def list_container_meta(self, user, account, container, domain,
477
                            until=None):
478
        """Return a list of the container's object meta keys for a domain."""
479

    
480
        allowed = []
481
        if user != account:
482
            if until:
483
                raise NotAllowedError
484
            allowed = self.permissions.access_list_paths(
485
                user, '/'.join((account, container)))
486
            if not allowed:
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        before = until if until is not None else inf
490
        allowed = self._get_formatted_paths(allowed)
491
        return self.node.latest_attribute_keys(node, domain, before,
492
                                               CLUSTER_DELETED, allowed)
493

    
494
    @debug_method
495
    def get_container_meta(self, user, account, container, domain, until=None,
496
                           include_user_defined=True):
497
        """Return a dictionary with the container metadata for the domain."""
498

    
499
        if user != account:
500
            if until or container not in self._allowed_containers(user,
501
                                                                  account):
502
                raise NotAllowedError
503
        path, node = self._lookup_container(account, container)
504
        props = self._get_properties(node, until)
505
        mtime = props[self.MTIME]
506
        count, bytes, tstamp = self._get_statistics(node, until)
507
        tstamp = max(tstamp, mtime)
508
        if until is None:
509
            modified = tstamp
510
        else:
511
            modified = self._get_statistics(
512
                node)[2]  # Overall last modification.
513
            modified = max(modified, mtime)
514

    
515
        if user != account:
516
            meta = {'name': container}
517
        else:
518
            meta = {}
519
            if include_user_defined:
520
                meta.update(
521
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
522
            if until is not None:
523
                meta.update({'until_timestamp': tstamp})
524
            meta.update({'name': container, 'count': count, 'bytes': bytes})
525
        meta.update({'modified': modified})
526
        return meta
527

    
528
    @debug_method
529
    def update_container_meta(self, user, account, container, domain, meta,
530
                              replace=False):
531
        """Update the metadata associated with the container for the domain."""
532

    
533
        if user != account:
534
            raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536
        src_version_id, dest_version_id = self._put_metadata(
537
            user, node, domain, meta, replace,
538
            update_statistics_ancestors_depth=0)
539
        if src_version_id is not None:
540
            versioning = self._get_policy(
541
                node, is_account_policy=False)['versioning']
542
            if versioning != 'auto':
543
                self.node.version_remove(src_version_id,
544
                                         update_statistics_ancestors_depth=0)
545

    
546
    @debug_method
547
    def get_container_policy(self, user, account, container):
548
        """Return a dictionary with the container policy."""
549

    
550
        if user != account:
551
            if container not in self._allowed_containers(user, account):
552
                raise NotAllowedError
553
            return {}
554
        path, node = self._lookup_container(account, container)
555
        return self._get_policy(node, is_account_policy=False)
556

    
557
    @debug_method
558
    def update_container_policy(self, user, account, container, policy,
559
                                replace=False):
560
        """Update the policy associated with the container."""
561

    
562
        if user != account:
563
            raise NotAllowedError
564
        path, node = self._lookup_container(account, container)
565
        self._check_policy(policy, is_account_policy=False)
566
        self._put_policy(node, policy, replace, is_account_policy=False)
567

    
568
    @debug_method
569
    def put_container(self, user, account, container, policy=None):
570
        """Create a new container with the given name."""
571

    
572
        policy = policy or {}
573
        if user != account:
574
            raise NotAllowedError
575
        try:
576
            path, node = self._lookup_container(account, container)
577
        except NameError:
578
            pass
579
        else:
580
            raise ContainerExists('Container already exists')
581
        if policy:
582
            self._check_policy(policy, is_account_policy=False)
583
        path = '/'.join((account, container))
584
        node = self._put_path(
585
            user, self._lookup_account(account, True)[1], path,
586
            update_statistics_ancestors_depth=-1)
587
        self._put_policy(node, policy, True, is_account_policy=False)
588

    
589
    @debug_method
590
    def delete_container(self, user, account, container, until=None, prefix='',
591
                         delimiter=None):
592
        """Delete/purge the container with the given name."""
593

    
594
        if user != account:
595
            raise NotAllowedError
596
        path, node = self._lookup_container(account, container)
597

    
598
        if until is not None:
599
            hashes, size, serials = self.node.node_purge_children(
600
                node, until, CLUSTER_HISTORY,
601
                update_statistics_ancestors_depth=0)
602
            for h in hashes:
603
                self.store.map_delete(h)
604
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
605
                                          update_statistics_ancestors_depth=0)
606
            if not self.free_versioning:
607
                self._report_size_change(
608
                    user, account, -size, {
609
                        'action': 'container purge',
610
                        'path': path,
611
                        'versions': ','.join(str(i) for i in serials)
612
                    }
613
                )
614
            return
615

    
616
        if not delimiter:
617
            if self._get_statistics(node)[0] > 0:
618
                raise ContainerNotEmpty('Container is not empty')
619
            hashes, size, serials = self.node.node_purge_children(
620
                node, inf, CLUSTER_HISTORY,
621
                update_statistics_ancestors_depth=0)
622
            for h in hashes:
623
                self.store.map_delete(h)
624
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
625
                                          update_statistics_ancestors_depth=0)
626
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
627
            if not self.free_versioning:
628
                self._report_size_change(
629
                    user, account, -size, {
630
                        'action': 'container purge',
631
                        'path': path,
632
                        'versions': ','.join(str(i) for i in serials)
633
                    }
634
                )
635
        else:
636
            # remove only contents
637
            src_names = self._list_objects_no_limit(
638
                user, account, container, prefix='', delimiter=None,
639
                virtual=False, domain=None, keys=[], shared=False, until=None,
640
                size_range=None, all_props=True, public=False)
641
            paths = []
642
            for t in src_names:
643
                path = '/'.join((account, container, t[0]))
644
                node = t[2]
645
                if not self._exists(node):
646
                    continue
647
                src_version_id, dest_version_id = self._put_version_duplicate(
648
                    user, node, size=0, type='', hash=None, checksum='',
649
                    cluster=CLUSTER_DELETED,
650
                    update_statistics_ancestors_depth=1)
651
                del_size = self._apply_versioning(
652
                    account, container, src_version_id,
653
                    update_statistics_ancestors_depth=1)
654
                self._report_size_change(
655
                    user, account, -del_size, {
656
                        'action': 'object delete',
657
                        'path': path,
658
                        'versions': ','.join([str(dest_version_id)])})
659
                self._report_object_change(
660
                    user, account, path, details={'action': 'object delete'})
661
                paths.append(path)
662
            self.permissions.access_clear_bulk(paths)
663

    
664
    def _list_objects(self, user, account, container, prefix, delimiter,
665
                      marker, limit, virtual, domain, keys, shared, until,
666
                      size_range, all_props, public):
667
        if user != account and until:
668
            raise NotAllowedError
669
        if shared and public:
670
            # get shared first
671
            shared_paths = self._list_object_permissions(
672
                user, account, container, prefix, shared=True, public=False)
673
            objects = set()
674
            if shared_paths:
675
                path, node = self._lookup_container(account, container)
676
                shared_paths = self._get_formatted_paths(shared_paths)
677
                objects |= set(self._list_object_properties(
678
                    node, path, prefix, delimiter, marker, limit, virtual,
679
                    domain, keys, until, size_range, shared_paths, all_props))
680

    
681
            # get public
682
            objects |= set(self._list_public_object_properties(
683
                user, account, container, prefix, all_props))
684
            objects = list(objects)
685

    
686
            objects.sort(key=lambda x: x[0])
687
            start, limit = self._list_limits(
688
                [x[0] for x in objects], marker, limit)
689
            return objects[start:start + limit]
690
        elif public:
691
            objects = self._list_public_object_properties(
692
                user, account, container, prefix, all_props)
693
            start, limit = self._list_limits(
694
                [x[0] for x in objects], marker, limit)
695
            return objects[start:start + limit]
696

    
697
        allowed = self._list_object_permissions(
698
            user, account, container, prefix, shared, public)
699
        if shared and not allowed:
700
            return []
701
        path, node = self._lookup_container(account, container)
702
        allowed = self._get_formatted_paths(allowed)
703
        objects = self._list_object_properties(
704
            node, path, prefix, delimiter, marker, limit, virtual, domain,
705
            keys, until, size_range, allowed, all_props)
706
        start, limit = self._list_limits(
707
            [x[0] for x in objects], marker, limit)
708
        return objects[start:start + limit]
709

    
710
    def _list_public_object_properties(self, user, account, container, prefix,
711
                                       all_props):
712
        public = self._list_object_permissions(
713
            user, account, container, prefix, shared=False, public=True)
714
        paths, nodes = self._lookup_objects(public)
715
        path = '/'.join((account, container))
716
        cont_prefix = path + '/'
717
        paths = [x[len(cont_prefix):] for x in paths]
718
        objects = [(p,) + props for p, props in
719
                   zip(paths, self.node.version_lookup_bulk(
720
                       nodes, all_props=all_props))]
721
        return objects
722

    
723
    def _list_objects_no_limit(self, user, account, container, prefix,
724
                               delimiter, virtual, domain, keys, shared, until,
725
                               size_range, all_props, public):
726
        objects = []
727
        while True:
728
            marker = objects[-1] if objects else None
729
            limit = 10000
730
            l = self._list_objects(
731
                user, account, container, prefix, delimiter, marker, limit,
732
                virtual, domain, keys, shared, until, size_range, all_props,
733
                public)
734
            objects.extend(l)
735
            if not l or len(l) < limit:
736
                break
737
        return objects
738

    
739
    def _list_object_permissions(self, user, account, container, prefix,
740
                                 shared, public):
741
        allowed = []
742
        path = '/'.join((account, container, prefix)).rstrip('/')
743
        if user != account:
744
            allowed = self.permissions.access_list_paths(user, path)
745
            if not allowed:
746
                raise NotAllowedError
747
        else:
748
            allowed = set()
749
            if shared:
750
                allowed.update(self.permissions.access_list_shared(path))
751
            if public:
752
                allowed.update(
753
                    [x[0] for x in self.permissions.public_list(path)])
754
            allowed = sorted(allowed)
755
            if not allowed:
756
                return []
757
        return allowed
758

    
759
    @debug_method
760
    def list_objects(self, user, account, container, prefix='', delimiter=None,
761
                     marker=None, limit=10000, virtual=True, domain=None,
762
                     keys=None, shared=False, until=None, size_range=None,
763
                     public=False):
764
        """List (object name, object version_id) under a container."""
765

    
766
        keys = keys or []
767
        return self._list_objects(
768
            user, account, container, prefix, delimiter, marker, limit,
769
            virtual, domain, keys, shared, until, size_range, False, public)
770

    
771
    @debug_method
772
    def list_object_meta(self, user, account, container, prefix='',
773
                         delimiter=None, marker=None, limit=10000,
774
                         virtual=True, domain=None, keys=None, shared=False,
775
                         until=None, size_range=None, public=False):
776
        """Return a list of metadata dicts of objects under a container."""
777

    
778
        keys = keys or []
779
        props = self._list_objects(
780
            user, account, container, prefix, delimiter, marker, limit,
781
            virtual, domain, keys, shared, until, size_range, True, public)
782
        objects = []
783
        for p in props:
784
            if len(p) == 2:
785
                objects.append({'subdir': p[0]})
786
            else:
787
                objects.append({
788
                    'name': p[0],
789
                    'bytes': p[self.SIZE + 1],
790
                    'type': p[self.TYPE + 1],
791
                    'hash': p[self.HASH + 1],
792
                    'version': p[self.SERIAL + 1],
793
                    'version_timestamp': p[self.MTIME + 1],
794
                    'modified': p[self.MTIME + 1] if until is None else None,
795
                    'modified_by': p[self.MUSER + 1],
796
                    'uuid': p[self.UUID + 1],
797
                    'checksum': p[self.CHECKSUM + 1]})
798
        return objects
799

    
800
    @debug_method
801
    def list_object_permissions(self, user, account, container, prefix=''):
802
        """Return a list of paths enforce permissions under a container."""
803

    
804
        return self._list_object_permissions(user, account, container, prefix,
805
                                             True, False)
806

    
807
    @debug_method
808
    def list_object_public(self, user, account, container, prefix=''):
809
        """Return a mapping of object paths to public ids under a container."""
810

    
811
        public = {}
812
        for path, p in self.permissions.public_list('/'.join((account,
813
                                                              container,
814
                                                              prefix))):
815
            public[path] = p
816
        return public
817

    
818
    @debug_method
819
    def get_object_meta(self, user, account, container, name, domain,
820
                        version=None, include_user_defined=True):
821
        """Return a dictionary with the object metadata for the domain."""
822

    
823
        self._can_read(user, account, container, name)
824
        path, node = self._lookup_object(account, container, name)
825
        props = self._get_version(node, version)
826
        if version is None:
827
            modified = props[self.MTIME]
828
        else:
829
            try:
830
                modified = self._get_version(
831
                    node)[self.MTIME]  # Overall last modification.
832
            except NameError:  # Object may be deleted.
833
                del_props = self.node.version_lookup(
834
                    node, inf, CLUSTER_DELETED)
835
                if del_props is None:
836
                    raise ItemNotExists('Object does not exist')
837
                modified = del_props[self.MTIME]
838

    
839
        meta = {}
840
        if include_user_defined:
841
            meta.update(
842
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
843
        meta.update({'name': name,
844
                     'bytes': props[self.SIZE],
845
                     'type': props[self.TYPE],
846
                     'hash': props[self.HASH],
847
                     'version': props[self.SERIAL],
848
                     'version_timestamp': props[self.MTIME],
849
                     'modified': modified,
850
                     'modified_by': props[self.MUSER],
851
                     'uuid': props[self.UUID],
852
                     'checksum': props[self.CHECKSUM]})
853
        return meta
854

    
855
    @debug_method
856
    def update_object_meta(self, user, account, container, name, domain, meta,
857
                           replace=False):
858
        """Update object metadata for a domain and return the new version."""
859

    
860
        self._can_write(user, account, container, name)
861
        path, node = self._lookup_object(account, container, name)
862
        src_version_id, dest_version_id = self._put_metadata(
863
            user, node, domain, meta, replace,
864
            update_statistics_ancestors_depth=1)
865
        self._apply_versioning(account, container, src_version_id,
866
                               update_statistics_ancestors_depth=1)
867
        return dest_version_id
868

    
869
    @debug_method
870
    def get_object_permissions(self, user, account, container, name):
871
        """Return the action allowed on the object, the path
872
        from which the object gets its permissions from,
873
        along with a dictionary containing the permissions."""
874

    
875
        allowed = 'write'
876
        permissions_path = self._get_permissions_path(account, container, name)
877
        if user != account:
878
            if self.permissions.access_check(permissions_path, self.WRITE,
879
                                             user):
880
                allowed = 'write'
881
            elif self.permissions.access_check(permissions_path, self.READ,
882
                                               user):
883
                allowed = 'read'
884
            else:
885
                raise NotAllowedError
886
        self._lookup_object(account, container, name)
887
        return (allowed,
888
                permissions_path,
889
                self.permissions.access_get(permissions_path))
890

    
891
    @debug_method
892
    def update_object_permissions(self, user, account, container, name,
893
                                  permissions):
894
        """Update the permissions associated with the object."""
895

    
896
        if user != account:
897
            raise NotAllowedError
898
        path = self._lookup_object(account, container, name)[0]
899
        self._check_permissions(path, permissions)
900
        self.permissions.access_set(path, permissions)
901
        self._report_sharing_change(user, account, path, {'members':
902
                                    self.permissions.access_members(path)})
903

    
904
    @debug_method
905
    def get_object_public(self, user, account, container, name):
906
        """Return the public id of the object if applicable."""
907

    
908
        self._can_read(user, account, container, name)
909
        path = self._lookup_object(account, container, name)[0]
910
        p = self.permissions.public_get(path)
911
        return p
912

    
913
    @debug_method
914
    def update_object_public(self, user, account, container, name, public):
915
        """Update the public status of the object."""
916

    
917
        self._can_write(user, account, container, name)
918
        path = self._lookup_object(account, container, name)[0]
919
        if not public:
920
            self.permissions.public_unset(path)
921
        else:
922
            self.permissions.public_set(
923
                path, self.public_url_security, self.public_url_alphabet)
924

    
925
    @debug_method
926
    def get_object_hashmap(self, user, account, container, name, version=None):
927
        """Return the object's size and a list with partial hashes."""
928

    
929
        self._can_read(user, account, container, name)
930
        path, node = self._lookup_object(account, container, name)
931
        props = self._get_version(node, version)
932
        if props[self.HASH] is None:
933
            return 0, ()
934
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
935
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
936

    
937
    def _update_object_hash(self, user, account, container, name, size, type,
938
                            hash, checksum, domain, meta, replace_meta,
939
                            permissions, src_node=None, src_version_id=None,
940
                            is_copy=False, report_size_change=True):
941
        if permissions is not None and user != account:
942
            raise NotAllowedError
943
        self._can_write(user, account, container, name)
944
        if permissions is not None:
945
            path = '/'.join((account, container, name))
946
            self._check_permissions(path, permissions)
947

    
948
        account_path, account_node = self._lookup_account(account, True)
949
        container_path, container_node = self._lookup_container(
950
            account, container)
951

    
952
        path, node = self._put_object_node(
953
            container_path, container_node, name)
954
        pre_version_id, dest_version_id = self._put_version_duplicate(
955
            user, node, src_node=src_node, size=size, type=type, hash=hash,
956
            checksum=checksum, is_copy=is_copy,
957
            update_statistics_ancestors_depth=1)
958

    
959
        # Handle meta.
960
        if src_version_id is None:
961
            src_version_id = pre_version_id
962
        self._put_metadata_duplicate(
963
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
964

    
965
        del_size = self._apply_versioning(account, container, pre_version_id,
966
                                          update_statistics_ancestors_depth=1)
967
        size_delta = size - del_size
968
        if size_delta > 0:
969
            # Check account quota.
970
            if not self.using_external_quotaholder:
971
                account_quota = long(self._get_policy(
972
                    account_node, is_account_policy=True)['quota'])
973
                account_usage = self._get_statistics(account_node,
974
                                                     compute=True)[1]
975
                if (account_quota > 0 and account_usage > account_quota):
976
                    raise QuotaError(
977
                        'Account quota exceeded: limit: %s, usage: %s' % (
978
                            account_quota, account_usage))
979

    
980
            # Check container quota.
981
            container_quota = long(self._get_policy(
982
                container_node, is_account_policy=False)['quota'])
983
            container_usage = self._get_statistics(container_node)[1]
984
            if (container_quota > 0 and container_usage > container_quota):
985
                # This must be executed in a transaction, so the version is
986
                # never created if it fails.
987
                raise QuotaError(
988
                    'Container quota exceeded: limit: %s, usage: %s' % (
989
                        container_quota, container_usage
990
                    )
991
                )
992

    
993
        if report_size_change:
994
            self._report_size_change(
995
                user, account, size_delta,
996
                {'action': 'object update', 'path': path,
997
                 'versions': ','.join([str(dest_version_id)])})
998
        if permissions is not None:
999
            self.permissions.access_set(path, permissions)
1000
            self._report_sharing_change(
1001
                user, account, path,
1002
                {'members': self.permissions.access_members(path)})
1003

    
1004
        self._report_object_change(
1005
            user, account, path,
1006
            details={'version': dest_version_id, 'action': 'object update'})
1007
        return dest_version_id
1008

    
1009
    @debug_method
1010
    def update_object_hashmap(self, user, account, container, name, size, type,
1011
                              hashmap, checksum, domain, meta=None,
1012
                              replace_meta=False, permissions=None):
1013
        """Create/update an object's hashmap and return the new version."""
1014

    
1015
        meta = meta or {}
1016
        if size == 0:  # No such thing as an empty hashmap.
1017
            hashmap = [self.put_block('')]
1018
        map = HashMap(self.block_size, self.hash_algorithm)
1019
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1020
        missing = self.store.block_search(map)
1021
        if missing:
1022
            ie = IndexError()
1023
            ie.data = [binascii.hexlify(x) for x in missing]
1024
            raise ie
1025

    
1026
        hash = map.hash()
1027
        hexlified = binascii.hexlify(hash)
1028
        dest_version_id = self._update_object_hash(
1029
            user, account, container, name, size, type, hexlified, checksum,
1030
            domain, meta, replace_meta, permissions)
1031
        self.store.map_put(hash, map)
1032
        return dest_version_id, hexlified
1033

    
1034
    @debug_method
1035
    def update_object_checksum(self, user, account, container, name, version,
1036
                               checksum):
1037
        """Update an object's checksum."""
1038

    
1039
        # Update objects with greater version and same hashmap
1040
        # and size (fix metadata updates).
1041
        self._can_write(user, account, container, name)
1042
        path, node = self._lookup_object(account, container, name)
1043
        props = self._get_version(node, version)
1044
        versions = self.node.node_get_versions(node)
1045
        for x in versions:
1046
            if (x[self.SERIAL] >= int(version) and
1047
                x[self.HASH] == props[self.HASH] and
1048
                    x[self.SIZE] == props[self.SIZE]):
1049
                self.node.version_put_property(
1050
                    x[self.SERIAL], 'checksum', checksum)
1051

    
1052
    def _copy_object(self, user, src_account, src_container, src_name,
1053
                     dest_account, dest_container, dest_name, type,
1054
                     dest_domain=None, dest_meta=None, replace_meta=False,
1055
                     permissions=None, src_version=None, is_move=False,
1056
                     delimiter=None):
1057

    
1058
        report_size_change = not is_move
1059
        dest_meta = dest_meta or {}
1060
        dest_version_ids = []
1061
        self._can_read(user, src_account, src_container, src_name)
1062
        path, node = self._lookup_object(src_account, src_container, src_name)
1063
        # TODO: Will do another fetch of the properties in duplicate version...
1064
        props = self._get_version(
1065
            node, src_version)  # Check to see if source exists.
1066
        src_version_id = props[self.SERIAL]
1067
        hash = props[self.HASH]
1068
        size = props[self.SIZE]
1069
        is_copy = not is_move and (src_account, src_container, src_name) != (
1070
            dest_account, dest_container, dest_name)  # New uuid.
1071
        dest_version_ids.append(self._update_object_hash(
1072
            user, dest_account, dest_container, dest_name, size, type, hash,
1073
            None, dest_domain, dest_meta, replace_meta, permissions,
1074
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1075
            report_size_change=report_size_change))
1076
        if is_move and ((src_account, src_container, src_name) !=
1077
                        (dest_account, dest_container, dest_name)):
1078
            self._delete_object(user, src_account, src_container, src_name,
1079
                                report_size_change=report_size_change)
1080

    
1081
        if delimiter:
1082
            prefix = (src_name + delimiter if not
1083
                      src_name.endswith(delimiter) else src_name)
1084
            src_names = self._list_objects_no_limit(
1085
                user, src_account, src_container, prefix, delimiter=None,
1086
                virtual=False, domain=None, keys=[], shared=False, until=None,
1087
                size_range=None, all_props=True, public=False)
1088
            src_names.sort(key=lambda x: x[2])  # order by nodes
1089
            paths = [elem[0] for elem in src_names]
1090
            nodes = [elem[2] for elem in src_names]
1091
            # TODO: Will do another fetch of the properties
1092
            # in duplicate version...
1093
            props = self._get_versions(nodes)  # Check to see if source exists.
1094

    
1095
            for prop, path, node in zip(props, paths, nodes):
1096
                src_version_id = prop[self.SERIAL]
1097
                hash = prop[self.HASH]
1098
                vtype = prop[self.TYPE]
1099
                size = prop[self.SIZE]
1100
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1101
                    delimiter) else dest_name
1102
                vdest_name = path.replace(prefix, dest_prefix, 1)
1103
                dest_version_ids.append(self._update_object_hash(
1104
                    user, dest_account, dest_container, vdest_name, size,
1105
                    vtype, hash, None, dest_domain, meta={},
1106
                    replace_meta=False, permissions=None, src_node=node,
1107
                    src_version_id=src_version_id, is_copy=is_copy))
1108
                if is_move and ((src_account, src_container, src_name) !=
1109
                                (dest_account, dest_container, dest_name)):
1110
                    self._delete_object(user, src_account, src_container, path)
1111
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1112
                dest_version_ids)
1113

    
1114
    @debug_method
1115
    def copy_object(self, user, src_account, src_container, src_name,
1116
                    dest_account, dest_container, dest_name, type, domain,
1117
                    meta=None, replace_meta=False, permissions=None,
1118
                    src_version=None, delimiter=None):
1119
        """Copy an object's data and metadata."""
1120

    
1121
        meta = meta or {}
1122
        dest_version_id = self._copy_object(
1123
            user, src_account, src_container, src_name, dest_account,
1124
            dest_container, dest_name, type, domain, meta, replace_meta,
1125
            permissions, src_version, False, delimiter)
1126
        return dest_version_id
1127

    
1128
    @debug_method
1129
    def move_object(self, user, src_account, src_container, src_name,
1130
                    dest_account, dest_container, dest_name, type, domain,
1131
                    meta=None, replace_meta=False, permissions=None,
1132
                    delimiter=None):
1133
        """Move an object's data and metadata."""
1134

    
1135
        meta = meta or {}
1136
        if user != src_account:
1137
            raise NotAllowedError
1138
        dest_version_id = self._move_object(
1139
            user, src_account, src_container, src_name, dest_account,
1140
            dest_container, dest_name, type, domain, meta, replace_meta,
1141
            permissions, None, delimiter=delimiter)
1142
        return dest_version_id
1143

    
1144
    def _delete_object(self, user, account, container, name, until=None,
1145
                       delimiter=None, report_size_change=True):
1146
        if user != account:
1147
            raise NotAllowedError
1148

    
1149
        if until is not None:
1150
            path = '/'.join((account, container, name))
1151
            node = self.node.node_lookup(path)
1152
            if node is None:
1153
                return
1154
            hashes = []
1155
            size = 0
1156
            serials = []
1157
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1158
                                           update_statistics_ancestors_depth=1)
1159
            hashes += h
1160
            size += s
1161
            serials += v
1162
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1163
                                           update_statistics_ancestors_depth=1)
1164
            hashes += h
1165
            if not self.free_versioning:
1166
                size += s
1167
            serials += v
1168
            for h in hashes:
1169
                self.store.map_delete(h)
1170
            self.node.node_purge(node, until, CLUSTER_DELETED,
1171
                                 update_statistics_ancestors_depth=1)
1172
            try:
1173
                self._get_version(node)
1174
            except NameError:
1175
                self.permissions.access_clear(path)
1176
            self._report_size_change(
1177
                user, account, -size, {
1178
                    'action': 'object purge',
1179
                    'path': path,
1180
                    'versions': ','.join(str(i) for i in serials)
1181
                }
1182
            )
1183
            return
1184

    
1185
        path, node = self._lookup_object(account, container, name)
1186
        if not self._exists(node):
1187
            raise ItemNotExists('Object is deleted.')
1188
        src_version_id, dest_version_id = self._put_version_duplicate(
1189
            user, node, size=0, type='', hash=None, checksum='',
1190
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1191
        del_size = self._apply_versioning(account, container, src_version_id,
1192
                                          update_statistics_ancestors_depth=1)
1193
        if report_size_change:
1194
            self._report_size_change(
1195
                user, account, -del_size,
1196
                {'action': 'object delete',
1197
                 'path': path,
1198
                 'versions': ','.join([str(dest_version_id)])})
1199
        self._report_object_change(
1200
            user, account, path, details={'action': 'object delete'})
1201
        self.permissions.access_clear(path)
1202

    
1203
        if delimiter:
1204
            prefix = name + delimiter if not name.endswith(delimiter) else name
1205
            src_names = self._list_objects_no_limit(
1206
                user, account, container, prefix, delimiter=None,
1207
                virtual=False, domain=None, keys=[], shared=False, until=None,
1208
                size_range=None, all_props=True, public=False)
1209
            paths = []
1210
            for t in src_names:
1211
                path = '/'.join((account, container, t[0]))
1212
                node = t[2]
1213
                if not self._exists(node):
1214
                    continue
1215
                src_version_id, dest_version_id = self._put_version_duplicate(
1216
                    user, node, size=0, type='', hash=None, checksum='',
1217
                    cluster=CLUSTER_DELETED,
1218
                    update_statistics_ancestors_depth=1)
1219
                del_size = self._apply_versioning(
1220
                    account, container, src_version_id,
1221
                    update_statistics_ancestors_depth=1)
1222
                if report_size_change:
1223
                    self._report_size_change(
1224
                        user, account, -del_size,
1225
                        {'action': 'object delete',
1226
                         'path': path,
1227
                         'versions': ','.join([str(dest_version_id)])})
1228
                self._report_object_change(
1229
                    user, account, path, details={'action': 'object delete'})
1230
                paths.append(path)
1231
            self.permissions.access_clear_bulk(paths)
1232

    
1233
    @debug_method
1234
    def delete_object(self, user, account, container, name, until=None,
1235
                      prefix='', delimiter=None):
1236
        """Delete/purge an object."""
1237

    
1238
        self._delete_object(user, account, container, name, until, delimiter)
1239

    
1240
    @debug_method
1241
    def list_versions(self, user, account, container, name):
1242
        """Return a list of all object (version, version_timestamp) tuples."""
1243

    
1244
        self._can_read(user, account, container, name)
1245
        path, node = self._lookup_object(account, container, name)
1246
        versions = self.node.node_get_versions(node)
1247
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1248
                x[self.CLUSTER] != CLUSTER_DELETED]
1249

    
1250
    @debug_method
1251
    def get_uuid(self, user, uuid):
1252
        """Return the (account, container, name) for the UUID given."""
1253

    
1254
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1255
        if info is None:
1256
            raise NameError
1257
        path, serial = info
1258
        account, container, name = path.split('/', 2)
1259
        self._can_read(user, account, container, name)
1260
        return (account, container, name)
1261

    
1262
    @debug_method
1263
    def get_public(self, user, public):
1264
        """Return the (account, container, name) for the public id given."""
1265

    
1266
        path = self.permissions.public_path(public)
1267
        if path is None:
1268
            raise NameError
1269
        account, container, name = path.split('/', 2)
1270
        self._can_read(user, account, container, name)
1271
        return (account, container, name)
1272

    
1273
    def get_block(self, hash):
1274
        """Return a block's data."""
1275

    
1276
        logger.debug("get_block: %s", hash)
1277
        block = self.store.block_get(self._unhexlify_hash(hash))
1278
        if not block:
1279
            raise ItemNotExists('Block does not exist')
1280
        return block
1281

    
1282
    def put_block(self, data):
1283
        """Store a block and return the hash."""
1284

    
1285
        logger.debug("put_block: %s", len(data))
1286
        return binascii.hexlify(self.store.block_put(data))
1287

    
1288
    def update_block(self, hash, data, offset=0):
1289
        """Update a known block and return the hash."""
1290

    
1291
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1292
        if offset == 0 and len(data) == self.block_size:
1293
            return self.put_block(data)
1294
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1295
        return binascii.hexlify(h)
1296

    
1297
    # Path functions.
1298

    
1299
    def _generate_uuid(self):
1300
        return str(uuidlib.uuid4())
1301

    
1302
    def _put_object_node(self, path, parent, name):
1303
        path = '/'.join((path, name))
1304
        node = self.node.node_lookup(path)
1305
        if node is None:
1306
            node = self.node.node_create(parent, path)
1307
        return path, node
1308

    
1309
    def _put_path(self, user, parent, path,
1310
                  update_statistics_ancestors_depth=None):
1311
        node = self.node.node_create(parent, path)
1312
        self.node.version_create(node, None, 0, '', None, user,
1313
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1314
                                 update_statistics_ancestors_depth)
1315
        return node
1316

    
1317
    def _lookup_account(self, account, create=True):
1318
        node = self.node.node_lookup(account)
1319
        if node is None and create:
1320
            node = self._put_path(
1321
                account, self.ROOTNODE, account,
1322
                update_statistics_ancestors_depth=-1)  # User is account.
1323
        return account, node
1324

    
1325
    def _lookup_container(self, account, container):
1326
        for_update = True if self.lock_container_path else False
1327
        path = '/'.join((account, container))
1328
        node = self.node.node_lookup(path, for_update)
1329
        if node is None:
1330
            raise ItemNotExists('Container does not exist')
1331
        return path, node
1332

    
1333
    def _lookup_object(self, account, container, name):
1334
        path = '/'.join((account, container, name))
1335
        node = self.node.node_lookup(path)
1336
        if node is None:
1337
            raise ItemNotExists('Object does not exist')
1338
        return path, node
1339

    
1340
    def _lookup_objects(self, paths):
1341
        nodes = self.node.node_lookup_bulk(paths)
1342
        return paths, nodes
1343

    
1344
    def _get_properties(self, node, until=None):
1345
        """Return properties until the timestamp given."""
1346

    
1347
        before = until if until is not None else inf
1348
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1349
        if props is None and until is not None:
1350
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1351
        if props is None:
1352
            raise ItemNotExists('Path does not exist')
1353
        return props
1354

    
1355
    def _get_statistics(self, node, until=None, compute=False):
1356
        """Return (count, sum of size, timestamp) of everything under node."""
1357

    
1358
        if until is not None:
1359
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1360
        elif compute:
1361
            stats = self.node.statistics_latest(node,
1362
                                                except_cluster=CLUSTER_DELETED)
1363
        else:
1364
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1365
        if stats is None:
1366
            stats = (0, 0, 0)
1367
        return stats
1368

    
1369
    def _get_version(self, node, version=None):
1370
        if version is None:
1371
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1372
            if props is None:
1373
                raise ItemNotExists('Object does not exist')
1374
        else:
1375
            try:
1376
                version = int(version)
1377
            except ValueError:
1378
                raise VersionNotExists('Version does not exist')
1379
            props = self.node.version_get_properties(version, node=node)
1380
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1381
                raise VersionNotExists('Version does not exist')
1382
        return props
1383

    
1384
    def _get_versions(self, nodes):
1385
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1386

    
1387
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1388
                               type=None, hash=None, checksum=None,
1389
                               cluster=CLUSTER_NORMAL, is_copy=False,
1390
                               update_statistics_ancestors_depth=None):
1391
        """Create a new version of the node."""
1392

    
1393
        props = self.node.version_lookup(
1394
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1395
        if props is not None:
1396
            src_version_id = props[self.SERIAL]
1397
            src_hash = props[self.HASH]
1398
            src_size = props[self.SIZE]
1399
            src_type = props[self.TYPE]
1400
            src_checksum = props[self.CHECKSUM]
1401
        else:
1402
            src_version_id = None
1403
            src_hash = None
1404
            src_size = 0
1405
            src_type = ''
1406
            src_checksum = ''
1407
        if size is None:  # Set metadata.
1408
            hash = src_hash  # This way hash can be set to None
1409
                             # (account or container).
1410
            size = src_size
1411
        if type is None:
1412
            type = src_type
1413
        if checksum is None:
1414
            checksum = src_checksum
1415
        uuid = self._generate_uuid(
1416
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1417

    
1418
        if src_node is None:
1419
            pre_version_id = src_version_id
1420
        else:
1421
            pre_version_id = None
1422
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1423
            if props is not None:
1424
                pre_version_id = props[self.SERIAL]
1425
        if pre_version_id is not None:
1426
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1427
                                        update_statistics_ancestors_depth)
1428

    
1429
        dest_version_id, mtime = self.node.version_create(
1430
            node, hash, size, type, src_version_id, user, uuid, checksum,
1431
            cluster, update_statistics_ancestors_depth)
1432

    
1433
        self.node.attribute_unset_is_latest(node, dest_version_id)
1434

    
1435
        return pre_version_id, dest_version_id
1436

    
1437
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1438
                                node, meta, replace=False):
1439
        if src_version_id is not None:
1440
            self.node.attribute_copy(src_version_id, dest_version_id)
1441
        if not replace:
1442
            self.node.attribute_del(dest_version_id, domain, (
1443
                k for k, v in meta.iteritems() if v == ''))
1444
            self.node.attribute_set(dest_version_id, domain, node, (
1445
                (k, v) for k, v in meta.iteritems() if v != ''))
1446
        else:
1447
            self.node.attribute_del(dest_version_id, domain)
1448
            self.node.attribute_set(dest_version_id, domain, node, ((
1449
                k, v) for k, v in meta.iteritems()))
1450

    
1451
    def _put_metadata(self, user, node, domain, meta, replace=False,
1452
                      update_statistics_ancestors_depth=None):
1453
        """Create a new version and store metadata."""
1454

    
1455
        src_version_id, dest_version_id = self._put_version_duplicate(
1456
            user, node,
1457
            update_statistics_ancestors_depth=
1458
            update_statistics_ancestors_depth)
1459
        self._put_metadata_duplicate(
1460
            src_version_id, dest_version_id, domain, node, meta, replace)
1461
        return src_version_id, dest_version_id
1462

    
1463
    def _list_limits(self, listing, marker, limit):
1464
        start = 0
1465
        if marker:
1466
            try:
1467
                start = listing.index(marker) + 1
1468
            except ValueError:
1469
                pass
1470
        if not limit or limit > 10000:
1471
            limit = 10000
1472
        return start, limit
1473

    
1474
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1475
                                marker=None, limit=10000, virtual=True,
1476
                                domain=None, keys=None, until=None,
1477
                                size_range=None, allowed=None,
1478
                                all_props=False):
1479
        keys = keys or []
1480
        allowed = allowed or []
1481
        cont_prefix = path + '/'
1482
        prefix = cont_prefix + prefix
1483
        start = cont_prefix + marker if marker else None
1484
        before = until if until is not None else inf
1485
        filterq = keys if domain else []
1486
        sizeq = size_range
1487

    
1488
        objects, prefixes = self.node.latest_version_list(
1489
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1490
            allowed, domain, filterq, sizeq, all_props)
1491
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1492
        objects.sort(key=lambda x: x[0])
1493
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1494
        return objects
1495

    
1496
    # Reporting functions.
1497

    
1498
    @debug_method
1499
    def _report_size_change(self, user, account, size, details=None):
1500
        details = details or {}
1501

    
1502
        if size == 0:
1503
            return
1504

    
1505
        account_node = self._lookup_account(account, True)[1]
1506
        total = self._get_statistics(account_node, compute=True)[1]
1507
        details.update({'user': user, 'total': total})
1508
        self.messages.append(
1509
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1510
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1511

    
1512
        if not self.using_external_quotaholder:
1513
            return
1514

    
1515
        try:
1516
            name = details['path'] if 'path' in details else ''
1517
            serial = self.astakosclient.issue_one_commission(
1518
                token=self.service_token,
1519
                holder=account,
1520
                source=DEFAULT_SOURCE,
1521
                provisions={'pithos.diskspace': size},
1522
                name=name)
1523
        except BaseException, e:
1524
            raise QuotaError(e)
1525
        else:
1526
            self.serials.append(serial)
1527

    
1528
    @debug_method
1529
    def _report_object_change(self, user, account, path, details=None):
1530
        details = details or {}
1531
        details.update({'user': user})
1532
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1533
                              account, QUEUE_INSTANCE_ID, 'object', path,
1534
                              details))
1535

    
1536
    @debug_method
1537
    def _report_sharing_change(self, user, account, path, details=None):
1538
        details = details or {}
1539
        details.update({'user': user})
1540
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1541
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1542
                              details))
1543

    
1544
    # Policy functions.
1545

    
1546
    def _check_policy(self, policy, is_account_policy=True):
1547
        default_policy = self.default_account_policy \
1548
            if is_account_policy else self.default_container_policy
1549
        for k in policy.keys():
1550
            if policy[k] == '':
1551
                policy[k] = default_policy.get(k)
1552
        for k, v in policy.iteritems():
1553
            if k == 'quota':
1554
                q = int(v)  # May raise ValueError.
1555
                if q < 0:
1556
                    raise ValueError
1557
            elif k == 'versioning':
1558
                if v not in ['auto', 'none']:
1559
                    raise ValueError
1560
            else:
1561
                raise ValueError
1562

    
1563
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1564
        default_policy = self.default_account_policy \
1565
            if is_account_policy else self.default_container_policy
1566
        if replace:
1567
            for k, v in default_policy.iteritems():
1568
                if k not in policy:
1569
                    policy[k] = v
1570
        self.node.policy_set(node, policy)
1571

    
1572
    def _get_policy(self, node, is_account_policy=True):
1573
        default_policy = self.default_account_policy \
1574
            if is_account_policy else self.default_container_policy
1575
        policy = default_policy.copy()
1576
        policy.update(self.node.policy_get(node))
1577
        return policy
1578

    
1579
    def _apply_versioning(self, account, container, version_id,
1580
                          update_statistics_ancestors_depth=None):
1581
        """Delete the provided version if such is the policy.
1582
           Return size of object removed.
1583
        """
1584

    
1585
        if version_id is None:
1586
            return 0
1587
        path, node = self._lookup_container(account, container)
1588
        versioning = self._get_policy(
1589
            node, is_account_policy=False)['versioning']
1590
        if versioning != 'auto':
1591
            hash, size = self.node.version_remove(
1592
                version_id, update_statistics_ancestors_depth)
1593
            self.store.map_delete(hash)
1594
            return size
1595
        elif self.free_versioning:
1596
            return self.node.version_get_properties(
1597
                version_id, keys=('size',))[0]
1598
        return 0
1599

    
1600
    # Access control functions.
1601

    
1602
    def _check_groups(self, groups):
1603
        # raise ValueError('Bad characters in groups')
1604
        pass
1605

    
1606
    def _check_permissions(self, path, permissions):
1607
        # raise ValueError('Bad characters in permissions')
1608
        pass
1609

    
1610
    def _get_formatted_paths(self, paths):
1611
        formatted = []
1612
        for p in paths:
1613
            node = self.node.node_lookup(p)
1614
            props = None
1615
            if node is not None:
1616
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1617
            if props is not None:
1618
                if props[self.TYPE].split(';', 1)[0].strip() in (
1619
                        'application/directory', 'application/folder'):
1620
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1621
                formatted.append((p, self.MATCH_EXACT))
1622
        return formatted
1623

    
1624
    def _get_permissions_path(self, account, container, name):
1625
        path = '/'.join((account, container, name))
1626
        permission_paths = self.permissions.access_inherit(path)
1627
        permission_paths.sort()
1628
        permission_paths.reverse()
1629
        for p in permission_paths:
1630
            if p == path:
1631
                return p
1632
            else:
1633
                if p.count('/') < 2:
1634
                    continue
1635
                node = self.node.node_lookup(p)
1636
                props = None
1637
                if node is not None:
1638
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1639
                if props is not None:
1640
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1641
                            'application/directory', 'application/folder'):
1642
                        return p
1643
        return None
1644

    
1645
    def _can_read(self, user, account, container, name):
1646
        if user == account:
1647
            return True
1648
        path = '/'.join((account, container, name))
1649
        if self.permissions.public_get(path) is not None:
1650
            return True
1651
        path = self._get_permissions_path(account, container, name)
1652
        if not path:
1653
            raise NotAllowedError
1654
        if (not self.permissions.access_check(path, self.READ, user) and not
1655
                self.permissions.access_check(path, self.WRITE, user)):
1656
            raise NotAllowedError
1657

    
1658
    def _can_write(self, user, account, container, name):
1659
        if user == account:
1660
            return True
1661
        path = '/'.join((account, container, name))
1662
        path = self._get_permissions_path(account, container, name)
1663
        if not path:
1664
            raise NotAllowedError
1665
        if not self.permissions.access_check(path, self.WRITE, user):
1666
            raise NotAllowedError
1667

    
1668
    def _allowed_accounts(self, user):
1669
        allow = set()
1670
        for path in self.permissions.access_list_paths(user):
1671
            allow.add(path.split('/', 1)[0])
1672
        return sorted(allow)
1673

    
1674
    def _allowed_containers(self, user, account):
1675
        allow = set()
1676
        for path in self.permissions.access_list_paths(user, account):
1677
            allow.add(path.split('/', 2)[1])
1678
        return sorted(allow)
1679

    
1680
    # Domain functions
1681

    
1682
    @debug_method
1683
    def get_domain_objects(self, domain, user=None):
1684
        allowed_paths = self.permissions.access_list_paths(
1685
            user, include_owned=user is not None, include_containers=False)
1686
        if not allowed_paths:
1687
            return []
1688
        obj_list = self.node.domain_object_list(
1689
            domain, allowed_paths, CLUSTER_NORMAL)
1690
        return [(path,
1691
                 self._build_metadata(props, user_defined_meta),
1692
                 self.permissions.access_get(path)) for
1693
                path, props, user_defined_meta in obj_list]
1694

    
1695
    # util functions
1696

    
1697
    def _build_metadata(self, props, user_defined=None,
1698
                        include_user_defined=True):
1699
        meta = {'bytes': props[self.SIZE],
1700
                'type': props[self.TYPE],
1701
                'hash': props[self.HASH],
1702
                'version': props[self.SERIAL],
1703
                'version_timestamp': props[self.MTIME],
1704
                'modified_by': props[self.MUSER],
1705
                'uuid': props[self.UUID],
1706
                'checksum': props[self.CHECKSUM]}
1707
        if include_user_defined and user_defined is not None:
1708
            meta.update(user_defined)
1709
        return meta
1710

    
1711
    def _exists(self, node):
1712
        try:
1713
            self._get_version(node)
1714
        except ItemNotExists:
1715
            return False
1716
        else:
1717
            return True
1718

    
1719
    def _unhexlify_hash(self, hash):
1720
        try:
1721
            return binascii.unhexlify(hash)
1722
        except TypeError:
1723
            raise InvalidHash(hash)