Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 369a7b41

History | View | Annotate | Download (68.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
98
DEFAULT_HASH_ALGORITHM = 'sha256'
99
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
100
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
101
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
102
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
103
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
104
                               'abcdefghijklmnopqrstuvwxyz'
105
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
106
DEFAULT_PUBLIC_URL_SECURITY = 16
107

    
108
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
109
QUEUE_CLIENT_ID = 'pithos'
110
QUEUE_INSTANCE_ID = '1'
111

    
112
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
113

    
114
inf = float('inf')
115

    
116
ULTIMATE_ANSWER = 42
117

    
118
DEFAULT_SOURCE = 'system'
119

    
120
logger = logging.getLogger(__name__)
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 block_size=None, hash_algorithm=None,
132
                 queue_module=None, queue_hosts=None, queue_exchange=None,
133
                 astakos_url=None, service_token=None,
134
                 astakosclient_poolsize=None,
135
                 free_versioning=True, block_params=None,
136
                 public_url_security=None,
137
                 public_url_alphabet=None,
138
                 account_quota_policy=None,
139
                 container_quota_policy=None,
140
                 container_versioning_policy=None):
141
        db_module = db_module or DEFAULT_DB_MODULE
142
        db_connection = db_connection or DEFAULT_DB_CONNECTION
143
        block_module = block_module or DEFAULT_BLOCK_MODULE
144
        block_path = block_path or DEFAULT_BLOCK_PATH
145
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
146
        block_params = block_params or DEFAULT_BLOCK_PARAMS
147
        block_size = block_size or DEFAULT_BLOCK_SIZE
148
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
149
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
150
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
151
        container_quota_policy = container_quota_policy \
152
            or DEFAULT_CONTAINER_QUOTA
153
        container_versioning_policy = container_versioning_policy \
154
            or DEFAULT_CONTAINER_VERSIONING
155

    
156
        self.default_account_policy = {'quota': account_quota_policy}
157
        self.default_container_policy = {
158
            'quota': container_quota_policy,
159
            'versioning': container_versioning_policy
160
        }
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.public_url_security = (public_url_security or
165
            DEFAULT_PUBLIC_URL_SECURITY)
166
        self.public_url_alphabet = (public_url_alphabet or
167
            DEFAULT_PUBLIC_URL_ALPHABET)
168

    
169
        self.hash_algorithm = hash_algorithm
170
        self.block_size = block_size
171
        self.free_versioning = free_versioning
172

    
173
        def load_module(m):
174
            __import__(m)
175
            return sys.modules[m]
176

    
177
        self.db_module = load_module(db_module)
178
        self.wrapper = self.db_module.DBWrapper(db_connection)
179
        params = {'wrapper': self.wrapper}
180
        self.permissions = self.db_module.Permissions(**params)
181
        self.config = self.db_module.Config(**params)
182
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
183
        for x in ['READ', 'WRITE']:
184
            setattr(self, x, getattr(self.db_module, x))
185
        self.node = self.db_module.Node(**params)
186
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
187
            setattr(self, x, getattr(self.db_module, x))
188

    
189
        self.block_module = load_module(block_module)
190
        self.block_params = block_params
191
        params = {'path': block_path,
192
                  'block_size': self.block_size,
193
                  'hash_algorithm': self.hash_algorithm,
194
                  'umask': block_umask}
195
        params.update(self.block_params)
196
        self.store = self.block_module.Store(**params)
197

    
198
        if queue_module and queue_hosts:
199
            self.queue_module = load_module(queue_module)
200
            params = {'hosts': queue_hosts,
201
                      'exchange': queue_exchange,
202
                      'client_id': QUEUE_CLIENT_ID}
203
            self.queue = self.queue_module.Queue(**params)
204
        else:
205
            class NoQueue:
206
                def send(self, *args):
207
                    pass
208

    
209
                def close(self):
210
                    pass
211

    
212
            self.queue = NoQueue()
213

    
214
        self.astakos_url = astakos_url
215
        self.service_token = service_token
216

    
217
        if not astakos_url or not AstakosClient:
218
            self.astakosclient = DisabledAstakosClient(
219
                astakos_url,
220
                use_pool=True,
221
                pool_size=astakosclient_poolsize)
222
        else:
223
            self.astakosclient = AstakosClient(
224
                astakos_url,
225
                use_pool=True,
226
                pool_size=astakosclient_poolsize)
227

    
228
        self.serials = []
229
        self.messages = []
230

    
231
    def close(self):
232
        self.wrapper.close()
233
        self.queue.close()
234

    
235
    @property
236
    def using_external_quotaholder(self):
237
        return not isinstance(self.astakosclient, DisabledAstakosClient)
238

    
239
    def list_accounts(self, user, marker=None, limit=10000):
240
        """Return a list of accounts the user can access."""
241

    
242
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
243
        allowed = self._allowed_accounts(user)
244
        start, limit = self._list_limits(allowed, marker, limit)
245
        return allowed[start:start + limit]
246

    
247
    def get_account_meta(
248
            self, user, account, domain, until=None, include_user_defined=True,
249
            external_quota=None):
250
        """Return a dictionary with the account metadata for the domain."""
251

    
252
        logger.debug(
253
            "get_account_meta: %s %s %s %s", user, account, domain, until)
254
        path, node = self._lookup_account(account, user == account)
255
        if user != account:
256
            if until or node is None or account not in self._allowed_accounts(user):
257
                raise NotAllowedError
258
        try:
259
            props = self._get_properties(node, until)
260
            mtime = props[self.MTIME]
261
        except NameError:
262
            props = None
263
            mtime = until
264
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
265
        tstamp = max(tstamp, mtime)
266
        if until is None:
267
            modified = tstamp
268
        else:
269
            modified = self._get_statistics(
270
                node, compute=True)[2]  # Overall last modification.
271
            modified = max(modified, mtime)
272

    
273
        if user != account:
274
            meta = {'name': account}
275
        else:
276
            meta = {}
277
            if props is not None and include_user_defined:
278
                meta.update(
279
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
280
            if until is not None:
281
                meta.update({'until_timestamp': tstamp})
282
            meta.update({'name': account, 'count': count, 'bytes': bytes})
283
            if self.using_external_quotaholder:
284
                external_quota = external_quota or {}
285
                meta['bytes'] = external_quota.get('usage', 0)
286
        meta.update({'modified': modified})
287
        return meta
288

    
289
    def update_account_meta(self, user, account, domain, meta, replace=False):
290
        """Update the metadata associated with the account for the domain."""
291

    
292
        logger.debug("update_account_meta: %s %s %s %s %s", user,
293
                     account, domain, meta, replace)
294
        if user != account:
295
            raise NotAllowedError
296
        path, node = self._lookup_account(account, True)
297
        self._put_metadata(user, node, domain, meta, replace,
298
                           update_statistics_ancestors_depth=-1)
299

    
300
    def get_account_groups(self, user, account):
301
        """Return a dictionary with the user groups defined for this account."""
302

    
303
        logger.debug("get_account_groups: %s %s", user, account)
304
        if user != account:
305
            if account not in self._allowed_accounts(user):
306
                raise NotAllowedError
307
            return {}
308
        self._lookup_account(account, True)
309
        return self.permissions.group_dict(account)
310

    
311
    def update_account_groups(self, user, account, groups, replace=False):
312
        """Update the groups associated with the account."""
313

    
314
        logger.debug("update_account_groups: %s %s %s %s", user,
315
                     account, groups, replace)
316
        if user != account:
317
            raise NotAllowedError
318
        self._lookup_account(account, True)
319
        self._check_groups(groups)
320
        if replace:
321
            self.permissions.group_destroy(account)
322
        for k, v in groups.iteritems():
323
            if not replace:  # If not already deleted.
324
                self.permissions.group_delete(account, k)
325
            if v:
326
                self.permissions.group_addmany(account, k, v)
327

    
328
    def get_account_policy(self, user, account, external_quota=None):
329
        """Return a dictionary with the account policy."""
330

    
331
        logger.debug("get_account_policy: %s %s", user, account)
332
        if user != account:
333
            if account not in self._allowed_accounts(user):
334
                raise NotAllowedError
335
            return {}
336
        path, node = self._lookup_account(account, True)
337
        policy = self._get_policy(node, is_account_policy=True)
338
        if self.using_external_quotaholder:
339
            external_quota = external_quota or {}
340
            policy['quota'] = external_quota.get('limit', 0)
341
        return policy
342

    
343
    def update_account_policy(self, user, account, policy, replace=False):
344
        """Update the policy associated with the account."""
345

    
346
        logger.debug("update_account_policy: %s %s %s %s", user,
347
                     account, policy, replace)
348
        if user != account:
349
            raise NotAllowedError
350
        path, node = self._lookup_account(account, True)
351
        self._check_policy(policy, is_account_policy=True)
352
        self._put_policy(node, policy, replace, is_account_policy=True)
353

    
354
    def put_account(self, user, account, policy=None):
355
        """Create a new account with the given name."""
356

    
357
        logger.debug("put_account: %s %s %s", user, account, policy)
358
        policy = policy or {}
359
        if user != account:
360
            raise NotAllowedError
361
        node = self.node.node_lookup(account)
362
        if node is not None:
363
            raise AccountExists('Account already exists')
364
        if policy:
365
            self._check_policy(policy, is_account_policy=True)
366
        node = self._put_path(user, self.ROOTNODE, account,
367
                              update_statistics_ancestors_depth=-1)
368
        self._put_policy(node, policy, True, is_account_policy=True)
369

    
370
    def delete_account(self, user, account):
371
        """Delete the account with the given name."""
372

    
373
        logger.debug("delete_account: %s %s", user, account)
374
        if user != account:
375
            raise NotAllowedError
376
        node = self.node.node_lookup(account)
377
        if node is None:
378
            return
379
        if not self.node.node_remove(node,
380
                                     update_statistics_ancestors_depth=-1):
381
            raise AccountNotEmpty('Account is not empty')
382
        self.permissions.group_destroy(account)
383

    
384
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
385
        """Return a list of containers existing under an account."""
386

    
387
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
388
                     account, marker, limit, shared, until, public)
389
        if user != account:
390
            if until or account not in self._allowed_accounts(user):
391
                raise NotAllowedError
392
            allowed = self._allowed_containers(user, account)
393
            start, limit = self._list_limits(allowed, marker, limit)
394
            return allowed[start:start + limit]
395
        if shared or public:
396
            allowed = set()
397
            if shared:
398
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
399
            if public:
400
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
401
            allowed = sorted(allowed)
402
            start, limit = self._list_limits(allowed, marker, limit)
403
            return allowed[start:start + limit]
404
        node = self.node.node_lookup(account)
405
        containers = [x[0] for x in self._list_object_properties(
406
            node, account, '', '/', marker, limit, False, None, [], until)]
407
        start, limit = self._list_limits(
408
            [x[0] for x in containers], marker, limit)
409
        return containers[start:start + limit]
410

    
411
    def list_container_meta(self, user, account, container, domain, until=None):
412
        """Return a list with all the container's object meta keys for the domain."""
413

    
414
        logger.debug("list_container_meta: %s %s %s %s %s", user,
415
                     account, container, domain, until)
416
        allowed = []
417
        if user != account:
418
            if until:
419
                raise NotAllowedError
420
            allowed = self.permissions.access_list_paths(
421
                user, '/'.join((account, container)))
422
            if not allowed:
423
                raise NotAllowedError
424
        path, node = self._lookup_container(account, container)
425
        before = until if until is not None else inf
426
        allowed = self._get_formatted_paths(allowed)
427
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
428

    
429
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
430
        """Return a dictionary with the container metadata for the domain."""
431

    
432
        logger.debug("get_container_meta: %s %s %s %s %s", user,
433
                     account, container, domain, until)
434
        if user != account:
435
            if until or container not in self._allowed_containers(user, account):
436
                raise NotAllowedError
437
        path, node = self._lookup_container(account, container)
438
        props = self._get_properties(node, until)
439
        mtime = props[self.MTIME]
440
        count, bytes, tstamp = self._get_statistics(node, until)
441
        tstamp = max(tstamp, mtime)
442
        if until is None:
443
            modified = tstamp
444
        else:
445
            modified = self._get_statistics(
446
                node)[2]  # Overall last modification.
447
            modified = max(modified, mtime)
448

    
449
        if user != account:
450
            meta = {'name': container}
451
        else:
452
            meta = {}
453
            if include_user_defined:
454
                meta.update(
455
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
456
            if until is not None:
457
                meta.update({'until_timestamp': tstamp})
458
            meta.update({'name': container, 'count': count, 'bytes': bytes})
459
        meta.update({'modified': modified})
460
        return meta
461

    
462
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
463
        """Update the metadata associated with the container for the domain."""
464

    
465
        logger.debug("update_container_meta: %s %s %s %s %s %s",
466
                     user, account, container, domain, meta, replace)
467
        if user != account:
468
            raise NotAllowedError
469
        path, node = self._lookup_container(account, container)
470
        src_version_id, dest_version_id = self._put_metadata(
471
            user, node, domain, meta, replace,
472
            update_statistics_ancestors_depth=0)
473
        if src_version_id is not None:
474
            versioning = self._get_policy(
475
                node, is_account_policy=False)['versioning']
476
            if versioning != 'auto':
477
                self.node.version_remove(src_version_id,
478
                                         update_statistics_ancestors_depth=0)
479

    
480
    def get_container_policy(self, user, account, container):
481
        """Return a dictionary with the container policy."""
482

    
483
        logger.debug(
484
            "get_container_policy: %s %s %s", user, account, container)
485
        if user != account:
486
            if container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
            return {}
489
        path, node = self._lookup_container(account, container)
490
        return self._get_policy(node, is_account_policy=False)
491

    
492
    def update_container_policy(self, user, account, container, policy, replace=False):
493
        """Update the policy associated with the container."""
494

    
495
        logger.debug("update_container_policy: %s %s %s %s %s",
496
                     user, account, container, policy, replace)
497
        if user != account:
498
            raise NotAllowedError
499
        path, node = self._lookup_container(account, container)
500
        self._check_policy(policy, is_account_policy=False)
501
        self._put_policy(node, policy, replace, is_account_policy=False)
502

    
503
    def put_container(self, user, account, container, policy=None):
504
        """Create a new container with the given name."""
505

    
506
        logger.debug(
507
            "put_container: %s %s %s %s", user, account, container, policy)
508
        policy = policy or {}
509
        if user != account:
510
            raise NotAllowedError
511
        try:
512
            path, node = self._lookup_container(account, container)
513
        except NameError:
514
            pass
515
        else:
516
            raise ContainerExists('Container already exists')
517
        if policy:
518
            self._check_policy(policy, is_account_policy=False)
519
        path = '/'.join((account, container))
520
        node = self._put_path(
521
            user, self._lookup_account(account, True)[1], path,
522
            update_statistics_ancestors_depth=-1)
523
        self._put_policy(node, policy, True, is_account_policy=False)
524

    
525
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
526
        """Delete/purge the container with the given name."""
527

    
528
        logger.debug("delete_container: %s %s %s %s %s %s", user,
529
                     account, container, until, prefix, delimiter)
530
        if user != account:
531
            raise NotAllowedError
532
        path, node = self._lookup_container(account, container)
533

    
534
        if until is not None:
535
            hashes, size, serials = self.node.node_purge_children(
536
                node, until, CLUSTER_HISTORY,
537
                update_statistics_ancestors_depth=0)
538
            for h in hashes:
539
                self.store.map_delete(h)
540
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
541
                                          update_statistics_ancestors_depth=0)
542
            if not self.free_versioning:
543
                self._report_size_change(
544
                    user, account, -size, {
545
                        'action':'container purge',
546
                        'path': path,
547
                        'versions': ','.join(str(i) for i in serials)
548
                    }
549
                )
550
            return
551

    
552
        if not delimiter:
553
            if self._get_statistics(node)[0] > 0:
554
                raise ContainerNotEmpty('Container is not empty')
555
            hashes, size, serials = self.node.node_purge_children(
556
                node, inf, CLUSTER_HISTORY,
557
                update_statistics_ancestors_depth=0)
558
            for h in hashes:
559
                self.store.map_delete(h)
560
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
561
                                          update_statistics_ancestors_depth=0)
562
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
563
            if not self.free_versioning:
564
                self._report_size_change(
565
                    user, account, -size, {
566
                        'action':'container purge',
567
                        'path': path,
568
                        'versions': ','.join(str(i) for i in serials)
569
                    }
570
                )
571
        else:
572
            # remove only contents
573
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
574
            paths = []
575
            for t in src_names:
576
                path = '/'.join((account, container, t[0]))
577
                node = t[2]
578
                src_version_id, dest_version_id = self._put_version_duplicate(
579
                    user, node, size=0, type='', hash=None, checksum='',
580
                    cluster=CLUSTER_DELETED,
581
                    update_statistics_ancestors_depth=1)
582
                del_size = self._apply_versioning(
583
                    account, container, src_version_id,
584
                    update_statistics_ancestors_depth=1)
585
                self._report_size_change(
586
                        user, account, -del_size, {
587
                                'action': 'object delete',
588
                                'path': path,
589
                        'versions': ','.join([str(dest_version_id)])
590
                     }
591
                )
592
                self._report_object_change(
593
                    user, account, path, details={'action': 'object delete'})
594
                paths.append(path)
595
            self.permissions.access_clear_bulk(paths)
596

    
597
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
598
        if user != account and until:
599
            raise NotAllowedError
600
        if shared and public:
601
            # get shared first
602
            shared_paths = self._list_object_permissions(
603
                user, account, container, prefix, shared=True, public=False)
604
            objects = set()
605
            if shared_paths:
606
                path, node = self._lookup_container(account, container)
607
                shared_paths = self._get_formatted_paths(shared_paths)
608
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
609

    
610
            # get public
611
            objects |= set(self._list_public_object_properties(
612
                user, account, container, prefix, all_props))
613
            objects = list(objects)
614

    
615
            objects.sort(key=lambda x: x[0])
616
            start, limit = self._list_limits(
617
                [x[0] for x in objects], marker, limit)
618
            return objects[start:start + limit]
619
        elif public:
620
            objects = self._list_public_object_properties(
621
                user, account, container, prefix, all_props)
622
            start, limit = self._list_limits(
623
                [x[0] for x in objects], marker, limit)
624
            return objects[start:start + limit]
625

    
626
        allowed = self._list_object_permissions(
627
            user, account, container, prefix, shared, public)
628
        if shared and not allowed:
629
            return []
630
        path, node = self._lookup_container(account, container)
631
        allowed = self._get_formatted_paths(allowed)
632
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
633
        start, limit = self._list_limits(
634
            [x[0] for x in objects], marker, limit)
635
        return objects[start:start + limit]
636

    
637
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
638
        public = self._list_object_permissions(
639
            user, account, container, prefix, shared=False, public=True)
640
        paths, nodes = self._lookup_objects(public)
641
        path = '/'.join((account, container))
642
        cont_prefix = path + '/'
643
        paths = [x[len(cont_prefix):] for x in paths]
644
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
645
        objects = [(path,) + props for path, props in zip(paths, props)]
646
        return objects
647

    
648
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
649
        objects = []
650
        while True:
651
            marker = objects[-1] if objects else None
652
            limit = 10000
653
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
654
            objects.extend(l)
655
            if not l or len(l) < limit:
656
                break
657
        return objects
658

    
659
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
660
        allowed = []
661
        path = '/'.join((account, container, prefix)).rstrip('/')
662
        if user != account:
663
            allowed = self.permissions.access_list_paths(user, path)
664
            if not allowed:
665
                raise NotAllowedError
666
        else:
667
            allowed = set()
668
            if shared:
669
                allowed.update(self.permissions.access_list_shared(path))
670
            if public:
671
                allowed.update(
672
                    [x[0] for x in self.permissions.public_list(path)])
673
            allowed = sorted(allowed)
674
            if not allowed:
675
                return []
676
        return allowed
677

    
678
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
679
        """Return a list of object (name, version_id) tuples existing under a container."""
680

    
681
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
682
        keys = keys or []
683
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
684

    
685
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
686
        """Return a list of object metadata dicts existing under a container."""
687

    
688
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
689
        keys = keys or []
690
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
691
        objects = []
692
        for p in props:
693
            if len(p) == 2:
694
                objects.append({'subdir': p[0]})
695
            else:
696
                objects.append({'name': p[0],
697
                                'bytes': p[self.SIZE + 1],
698
                                'type': p[self.TYPE + 1],
699
                                'hash': p[self.HASH + 1],
700
                                'version': p[self.SERIAL + 1],
701
                                'version_timestamp': p[self.MTIME + 1],
702
                                'modified': p[self.MTIME + 1] if until is None else None,
703
                                'modified_by': p[self.MUSER + 1],
704
                                'uuid': p[self.UUID + 1],
705
                                'checksum': p[self.CHECKSUM + 1]})
706
        return objects
707

    
708
    def list_object_permissions(self, user, account, container, prefix=''):
709
        """Return a list of paths that enforce permissions under a container."""
710

    
711
        logger.debug("list_object_permissions: %s %s %s %s", user,
712
                     account, container, prefix)
713
        return self._list_object_permissions(user, account, container, prefix, True, False)
714

    
715
    def list_object_public(self, user, account, container, prefix=''):
716
        """Return a dict mapping paths to public ids for objects that are public under a container."""
717

    
718
        logger.debug("list_object_public: %s %s %s %s", user,
719
                     account, container, prefix)
720
        public = {}
721
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
722
            public[path] = p
723
        return public
724

    
725
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
726
        """Return a dictionary with the object metadata for the domain."""
727

    
728
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
729
                     account, container, name, domain, version)
730
        self._can_read(user, account, container, name)
731
        path, node = self._lookup_object(account, container, name)
732
        props = self._get_version(node, version)
733
        if version is None:
734
            modified = props[self.MTIME]
735
        else:
736
            try:
737
                modified = self._get_version(
738
                    node)[self.MTIME]  # Overall last modification.
739
            except NameError:  # Object may be deleted.
740
                del_props = self.node.version_lookup(
741
                    node, inf, CLUSTER_DELETED)
742
                if del_props is None:
743
                    raise ItemNotExists('Object does not exist')
744
                modified = del_props[self.MTIME]
745

    
746
        meta = {}
747
        if include_user_defined:
748
            meta.update(
749
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
750
        meta.update({'name': name,
751
                     'bytes': props[self.SIZE],
752
                     'type': props[self.TYPE],
753
                     'hash': props[self.HASH],
754
                     'version': props[self.SERIAL],
755
                     'version_timestamp': props[self.MTIME],
756
                     'modified': modified,
757
                     'modified_by': props[self.MUSER],
758
                     'uuid': props[self.UUID],
759
                     'checksum': props[self.CHECKSUM]})
760
        return meta
761

    
762
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
763
        """Update the metadata associated with the object for the domain and return the new version."""
764

    
765
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
766
                     user, account, container, name, domain, meta, replace)
767
        self._can_write(user, account, container, name)
768
        path, node = self._lookup_object(account, container, name)
769
        src_version_id, dest_version_id = self._put_metadata(
770
            user, node, domain, meta, replace,
771
            update_statistics_ancestors_depth=1)
772
        self._apply_versioning(account, container, src_version_id,
773
                               update_statistics_ancestors_depth=1)
774
        return dest_version_id
775

    
776
    def get_object_permissions(self, user, account, container, name):
777
        """Return the action allowed on the object, the path
778
        from which the object gets its permissions from,
779
        along with a dictionary containing the permissions."""
780

    
781
        logger.debug("get_object_permissions: %s %s %s %s", user,
782
                     account, container, name)
783
        allowed = 'write'
784
        permissions_path = self._get_permissions_path(account, container, name)
785
        if user != account:
786
            if self.permissions.access_check(permissions_path, self.WRITE, user):
787
                allowed = 'write'
788
            elif self.permissions.access_check(permissions_path, self.READ, user):
789
                allowed = 'read'
790
            else:
791
                raise NotAllowedError
792
        self._lookup_object(account, container, name)
793
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
794

    
795
    def update_object_permissions(self, user, account, container, name, permissions):
796
        """Update the permissions associated with the object."""
797

    
798
        logger.debug("update_object_permissions: %s %s %s %s %s",
799
                     user, account, container, name, permissions)
800
        if user != account:
801
            raise NotAllowedError
802
        path = self._lookup_object(account, container, name)[0]
803
        self._check_permissions(path, permissions)
804
        self.permissions.access_set(path, permissions)
805
        self._report_sharing_change(user, account, path, {'members':
806
                                    self.permissions.access_members(path)})
807

    
808
    def get_object_public(self, user, account, container, name):
809
        """Return the public id of the object if applicable."""
810

    
811
        logger.debug(
812
            "get_object_public: %s %s %s %s", user, account, container, name)
813
        self._can_read(user, account, container, name)
814
        path = self._lookup_object(account, container, name)[0]
815
        p = self.permissions.public_get(path)
816
        return p
817

    
818
    def update_object_public(self, user, account, container, name, public):
819
        """Update the public status of the object."""
820

    
821
        logger.debug("update_object_public: %s %s %s %s %s", user,
822
                     account, container, name, public)
823
        self._can_write(user, account, container, name)
824
        path = self._lookup_object(account, container, name)[0]
825
        if not public:
826
            self.permissions.public_unset(path)
827
        else:
828
            self.permissions.public_set(
829
                path, self.public_url_security, self.public_url_alphabet
830
            )
831

    
832
    def get_object_hashmap(self, user, account, container, name, version=None):
833
        """Return the object's size and a list with partial hashes."""
834

    
835
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
836
                     account, container, name, version)
837
        self._can_read(user, account, container, name)
838
        path, node = self._lookup_object(account, container, name)
839
        props = self._get_version(node, version)
840
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
841
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
842

    
843
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
844
        if permissions is not None and user != account:
845
            raise NotAllowedError
846
        self._can_write(user, account, container, name)
847
        if permissions is not None:
848
            path = '/'.join((account, container, name))
849
            self._check_permissions(path, permissions)
850

    
851
        account_path, account_node = self._lookup_account(account, True)
852
        container_path, container_node = self._lookup_container(
853
            account, container)
854

    
855
        path, node = self._put_object_node(
856
            container_path, container_node, name)
857
        pre_version_id, dest_version_id = self._put_version_duplicate(
858
            user, node, src_node=src_node, size=size, type=type, hash=hash,
859
            checksum=checksum, is_copy=is_copy,
860
            update_statistics_ancestors_depth=1)
861

    
862
        # Handle meta.
863
        if src_version_id is None:
864
            src_version_id = pre_version_id
865
        self._put_metadata_duplicate(
866
            src_version_id, dest_version_id, domain, meta, replace_meta)
867

    
868
        del_size = self._apply_versioning(account, container, pre_version_id,
869
                                          update_statistics_ancestors_depth=1)
870
        size_delta = size - del_size
871
        if size_delta > 0:
872
            # Check account quota.
873
            if not self.using_external_quotaholder:
874
                account_quota = long(
875
                    self._get_policy(account_node, is_account_policy=True
876
                    )['quota']
877
                )
878
                account_usage = self._get_statistics(account_node, compute=True)[1]
879
                if (account_quota > 0 and account_usage > account_quota):
880
                    raise QuotaError(
881
                        'Account quota exceeded: limit: %s, usage: %s' % (
882
                            account_quota, account_usage
883
                        )
884
                    )
885

    
886
            # Check container quota.
887
            container_quota = long(
888
                self._get_policy(container_node, is_account_policy=False
889
                )['quota']
890
            )
891
            container_usage = self._get_statistics(container_node)[1]
892
            if (container_quota > 0 and container_usage > container_quota):
893
                # This must be executed in a transaction, so the version is
894
                # never created if it fails.
895
                raise QuotaError(
896
                    'Container quota exceeded: limit: %s, usage: %s' % (
897
                        container_quota, container_usage
898
                    )
899
                )
900

    
901
        self._report_size_change(user, account, size_delta,
902
                                 {'action': 'object update', 'path': path,
903
                                  'versions': ','.join([str(dest_version_id)])})
904
        if permissions is not None:
905
            self.permissions.access_set(path, permissions)
906
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
907

    
908
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
909
        return dest_version_id
910

    
911
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
912
        """Create/update an object with the specified size and partial hashes."""
913

    
914
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
915
                     account, container, name, size, type, hashmap, checksum)
916
        meta = meta or {}
917
        if size == 0:  # No such thing as an empty hashmap.
918
            hashmap = [self.put_block('')]
919
        map = HashMap(self.block_size, self.hash_algorithm)
920
        map.extend([binascii.unhexlify(x) for x in hashmap])
921
        missing = self.store.block_search(map)
922
        if missing:
923
            ie = IndexError()
924
            ie.data = [binascii.hexlify(x) for x in missing]
925
            raise ie
926

    
927
        hash = map.hash()
928
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
929
        self.store.map_put(hash, map)
930
        return dest_version_id
931

    
932
    def update_object_checksum(self, user, account, container, name, version, checksum):
933
        """Update an object's checksum."""
934

    
935
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
936
                     user, account, container, name, version, checksum)
937
        # Update objects with greater version and same hashmap and size (fix metadata updates).
938
        self._can_write(user, account, container, name)
939
        path, node = self._lookup_object(account, container, name)
940
        props = self._get_version(node, version)
941
        versions = self.node.node_get_versions(node)
942
        for x in versions:
943
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
944
                self.node.version_put_property(
945
                    x[self.SERIAL], 'checksum', checksum)
946

    
947
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
948
        dest_meta = dest_meta or {}
949
        dest_version_ids = []
950
        self._can_read(user, src_account, src_container, src_name)
951
        path, node = self._lookup_object(src_account, src_container, src_name)
952
        # TODO: Will do another fetch of the properties in duplicate version...
953
        props = self._get_version(
954
            node, src_version)  # Check to see if source exists.
955
        src_version_id = props[self.SERIAL]
956
        hash = props[self.HASH]
957
        size = props[self.SIZE]
958
        is_copy = not is_move and (src_account, src_container, src_name) != (
959
            dest_account, dest_container, dest_name)  # New uuid.
960
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
961
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
962
            self._delete_object(user, src_account, src_container, src_name)
963

    
964
        if delimiter:
965
            prefix = src_name + \
966
                delimiter if not src_name.endswith(delimiter) else src_name
967
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
968
            src_names.sort(key=lambda x: x[2])  # order by nodes
969
            paths = [elem[0] for elem in src_names]
970
            nodes = [elem[2] for elem in src_names]
971
            # TODO: Will do another fetch of the properties in duplicate version...
972
            props = self._get_versions(nodes)  # Check to see if source exists.
973

    
974
            for prop, path, node in zip(props, paths, nodes):
975
                src_version_id = prop[self.SERIAL]
976
                hash = prop[self.HASH]
977
                vtype = prop[self.TYPE]
978
                size = prop[self.SIZE]
979
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
980
                    delimiter) else dest_name
981
                vdest_name = path.replace(prefix, dest_prefix, 1)
982
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
983
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
984
                    self._delete_object(user, src_account, src_container, path)
985
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
986

    
987
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
988
        """Copy an object's data and metadata."""
989

    
990
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
991
        meta = meta or {}
992
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
993
        return dest_version_id
994

    
995
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
996
        """Move an object's data and metadata."""
997

    
998
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
999
        meta = meta or {}
1000
        if user != src_account:
1001
            raise NotAllowedError
1002
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1003
        return dest_version_id
1004

    
1005
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1006
        if user != account:
1007
            raise NotAllowedError
1008

    
1009
        if until is not None:
1010
            path = '/'.join((account, container, name))
1011
            node = self.node.node_lookup(path)
1012
            if node is None:
1013
                return
1014
            hashes = []
1015
            size = 0
1016
            serials = []
1017
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1018
                                           update_statistics_ancestors_depth=1)
1019
            hashes += h
1020
            size += s
1021
            serials += v
1022
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1023
                                           update_statistics_ancestors_depth=1)
1024
            hashes += h
1025
            if not self.free_versioning:
1026
                size += s
1027
            serials += v
1028
            for h in hashes:
1029
                self.store.map_delete(h)
1030
            self.node.node_purge(node, until, CLUSTER_DELETED,
1031
                                 update_statistics_ancestors_depth=1)
1032
            try:
1033
                props = self._get_version(node)
1034
            except NameError:
1035
                self.permissions.access_clear(path)
1036
            self._report_size_change(
1037
                user, account, -size, {
1038
                    'action': 'object purge',
1039
                    'path': path,
1040
                    'versions': ','.join(str(i) for i in serials)
1041
                }
1042
            )
1043
            return
1044

    
1045
        path, node = self._lookup_object(account, container, name)
1046
        src_version_id, dest_version_id = self._put_version_duplicate(
1047
            user, node, size=0, type='', hash=None, checksum='',
1048
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1049
        del_size = self._apply_versioning(account, container, src_version_id,
1050
                                          update_statistics_ancestors_depth=1)
1051
        self._report_size_change(user, account, -del_size,
1052
                                 {'action': 'object delete', 'path': path,
1053
                                  'versions': ','.join([str(dest_version_id)])})
1054
        self._report_object_change(
1055
            user, account, path, details={'action': 'object delete'})
1056
        self.permissions.access_clear(path)
1057

    
1058
        if delimiter:
1059
            prefix = name + delimiter if not name.endswith(delimiter) else name
1060
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1061
            paths = []
1062
            for t in src_names:
1063
                path = '/'.join((account, container, t[0]))
1064
                node = t[2]
1065
                src_version_id, dest_version_id = self._put_version_duplicate(
1066
                    user, node, size=0, type='', hash=None, checksum='',
1067
                    cluster=CLUSTER_DELETED,
1068
                    update_statistics_ancestors_depth=1)
1069
                del_size = self._apply_versioning(
1070
                    account, container, src_version_id,
1071
                    update_statistics_ancestors_depth=1)
1072
                self._report_size_change(user, account, -del_size,
1073
                                         {'action': 'object delete',
1074
                                          'path': path,
1075
                                          'versions': ','.join([str(dest_version_id)])})
1076
                self._report_object_change(
1077
                    user, account, path, details={'action': 'object delete'})
1078
                paths.append(path)
1079
            self.permissions.access_clear_bulk(paths)
1080

    
1081
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1082
        """Delete/purge an object."""
1083

    
1084
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1085
                     account, container, name, until, prefix, delimiter)
1086
        self._delete_object(user, account, container, name, until, delimiter)
1087

    
1088
    def list_versions(self, user, account, container, name):
1089
        """Return a list of all (version, version_timestamp) tuples for an object."""
1090

    
1091
        logger.debug(
1092
            "list_versions: %s %s %s %s", user, account, container, name)
1093
        self._can_read(user, account, container, name)
1094
        path, node = self._lookup_object(account, container, name)
1095
        versions = self.node.node_get_versions(node)
1096
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1097

    
1098
    def get_uuid(self, user, uuid):
1099
        """Return the (account, container, name) for the UUID given."""
1100

    
1101
        logger.debug("get_uuid: %s %s", user, uuid)
1102
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1103
        if info is None:
1104
            raise NameError
1105
        path, serial = info
1106
        account, container, name = path.split('/', 2)
1107
        self._can_read(user, account, container, name)
1108
        return (account, container, name)
1109

    
1110
    def get_public(self, user, public):
1111
        """Return the (account, container, name) for the public id given."""
1112

    
1113
        logger.debug("get_public: %s %s", user, public)
1114
        path = self.permissions.public_path(public)
1115
        if path is None:
1116
            raise NameError
1117
        account, container, name = path.split('/', 2)
1118
        self._can_read(user, account, container, name)
1119
        return (account, container, name)
1120

    
1121
    def get_block(self, hash):
1122
        """Return a block's data."""
1123

    
1124
        logger.debug("get_block: %s", hash)
1125
        block = self.store.block_get(binascii.unhexlify(hash))
1126
        if not block:
1127
            raise ItemNotExists('Block does not exist')
1128
        return block
1129

    
1130
    def put_block(self, data):
1131
        """Store a block and return the hash."""
1132

    
1133
        logger.debug("put_block: %s", len(data))
1134
        return binascii.hexlify(self.store.block_put(data))
1135

    
1136
    def update_block(self, hash, data, offset=0):
1137
        """Update a known block and return the hash."""
1138

    
1139
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1140
        if offset == 0 and len(data) == self.block_size:
1141
            return self.put_block(data)
1142
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1143
        return binascii.hexlify(h)
1144

    
1145
    # Path functions.
1146

    
1147
    def _generate_uuid(self):
1148
        return str(uuidlib.uuid4())
1149

    
1150
    def _put_object_node(self, path, parent, name):
1151
        path = '/'.join((path, name))
1152
        node = self.node.node_lookup(path)
1153
        if node is None:
1154
            node = self.node.node_create(parent, path)
1155
        return path, node
1156

    
1157
    def _put_path(self, user, parent, path,
1158
                  update_statistics_ancestors_depth=None):
1159
        node = self.node.node_create(parent, path)
1160
        self.node.version_create(node, None, 0, '', None, user,
1161
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1162
                                 update_statistics_ancestors_depth)
1163
        return node
1164

    
1165
    def _lookup_account(self, account, create=True):
1166
        node = self.node.node_lookup(account)
1167
        if node is None and create:
1168
            node = self._put_path(
1169
                account, self.ROOTNODE, account,
1170
                update_statistics_ancestors_depth=-1)  # User is account.
1171
        return account, node
1172

    
1173
    def _lookup_container(self, account, container):
1174
        path = '/'.join((account, container))
1175
        node = self.node.node_lookup(path, for_update=True)
1176
        if node is None:
1177
            raise ItemNotExists('Container does not exist')
1178
        return path, node
1179

    
1180
    def _lookup_object(self, account, container, name):
1181
        path = '/'.join((account, container, name))
1182
        node = self.node.node_lookup(path)
1183
        if node is None:
1184
            raise ItemNotExists('Object does not exist')
1185
        return path, node
1186

    
1187
    def _lookup_objects(self, paths):
1188
        nodes = self.node.node_lookup_bulk(paths)
1189
        return paths, nodes
1190

    
1191
    def _get_properties(self, node, until=None):
1192
        """Return properties until the timestamp given."""
1193

    
1194
        before = until if until is not None else inf
1195
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1196
        if props is None and until is not None:
1197
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1198
        if props is None:
1199
            raise ItemNotExists('Path does not exist')
1200
        return props
1201

    
1202
    def _get_statistics(self, node, until=None, compute=False):
1203
        """Return count, sum of size and latest timestamp of everything under node."""
1204

    
1205
        if until is not None:
1206
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1207
        elif compute:
1208
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1209
        else:
1210
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1211
        if stats is None:
1212
            stats = (0, 0, 0)
1213
        return stats
1214

    
1215
    def _get_version(self, node, version=None):
1216
        if version is None:
1217
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1218
            if props is None:
1219
                raise ItemNotExists('Object does not exist')
1220
        else:
1221
            try:
1222
                version = int(version)
1223
            except ValueError:
1224
                raise VersionNotExists('Version does not exist')
1225
            props = self.node.version_get_properties(version)
1226
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1227
                raise VersionNotExists('Version does not exist')
1228
        return props
1229

    
1230
    def _get_versions(self, nodes):
1231
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1232

    
1233
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1234
                               type=None, hash=None, checksum=None,
1235
                               cluster=CLUSTER_NORMAL, is_copy=False,
1236
                               update_statistics_ancestors_depth=None):
1237
        """Create a new version of the node."""
1238

    
1239
        props = self.node.version_lookup(
1240
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1241
        if props is not None:
1242
            src_version_id = props[self.SERIAL]
1243
            src_hash = props[self.HASH]
1244
            src_size = props[self.SIZE]
1245
            src_type = props[self.TYPE]
1246
            src_checksum = props[self.CHECKSUM]
1247
        else:
1248
            src_version_id = None
1249
            src_hash = None
1250
            src_size = 0
1251
            src_type = ''
1252
            src_checksum = ''
1253
        if size is None:  # Set metadata.
1254
            hash = src_hash  # This way hash can be set to None (account or container).
1255
            size = src_size
1256
        if type is None:
1257
            type = src_type
1258
        if checksum is None:
1259
            checksum = src_checksum
1260
        uuid = self._generate_uuid(
1261
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1262

    
1263
        if src_node is None:
1264
            pre_version_id = src_version_id
1265
        else:
1266
            pre_version_id = None
1267
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1268
            if props is not None:
1269
                pre_version_id = props[self.SERIAL]
1270
        if pre_version_id is not None:
1271
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1272
                                        update_statistics_ancestors_depth)
1273

    
1274
        dest_version_id, mtime = self.node.version_create(
1275
            node, hash, size, type, src_version_id, user, uuid, checksum,
1276
            cluster, update_statistics_ancestors_depth)
1277
        return pre_version_id, dest_version_id
1278

    
1279
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1280
        if src_version_id is not None:
1281
            self.node.attribute_copy(src_version_id, dest_version_id)
1282
        if not replace:
1283
            self.node.attribute_del(dest_version_id, domain, (
1284
                k for k, v in meta.iteritems() if v == ''))
1285
            self.node.attribute_set(dest_version_id, domain, (
1286
                (k, v) for k, v in meta.iteritems() if v != ''))
1287
        else:
1288
            self.node.attribute_del(dest_version_id, domain)
1289
            self.node.attribute_set(dest_version_id, domain, ((
1290
                k, v) for k, v in meta.iteritems()))
1291

    
1292
    def _put_metadata(self, user, node, domain, meta, replace=False,
1293
                      update_statistics_ancestors_depth=None):
1294
        """Create a new version and store metadata."""
1295

    
1296
        src_version_id, dest_version_id = self._put_version_duplicate(
1297
            user, node,
1298
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1299
        self._put_metadata_duplicate(
1300
            src_version_id, dest_version_id, domain, meta, replace)
1301
        return src_version_id, dest_version_id
1302

    
1303
    def _list_limits(self, listing, marker, limit):
1304
        start = 0
1305
        if marker:
1306
            try:
1307
                start = listing.index(marker) + 1
1308
            except ValueError:
1309
                pass
1310
        if not limit or limit > 10000:
1311
            limit = 10000
1312
        return start, limit
1313

    
1314
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1315
        keys = keys or []
1316
        allowed = allowed or []
1317
        cont_prefix = path + '/'
1318
        prefix = cont_prefix + prefix
1319
        start = cont_prefix + marker if marker else None
1320
        before = until if until is not None else inf
1321
        filterq = keys if domain else []
1322
        sizeq = size_range
1323

    
1324
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1325
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1326
        objects.sort(key=lambda x: x[0])
1327
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1328
        return objects
1329

    
1330
    # Reporting functions.
1331

    
1332
    def _report_size_change(self, user, account, size, details=None):
1333
        details = details or {}
1334

    
1335
        if size == 0:
1336
            return
1337

    
1338
        account_node = self._lookup_account(account, True)[1]
1339
        total = self._get_statistics(account_node, compute=True)[1]
1340
        details.update({'user': user, 'total': total})
1341
        logger.debug(
1342
            "_report_size_change: %s %s %s %s", user, account, size, details)
1343
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1344
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1345
                              float(size), details))
1346

    
1347
        if not self.using_external_quotaholder:
1348
            return
1349

    
1350
        try:
1351
            name = details['path'] if 'path' in details else ''
1352
            serial = self.astakosclient.issue_one_commission(
1353
                token=self.service_token,
1354
                holder=account,
1355
                source=DEFAULT_SOURCE,
1356
                provisions={'pithos.diskspace': size},
1357
                name=name
1358
                )
1359
        except BaseException, e:
1360
            raise QuotaError(e)
1361
        else:
1362
            self.serials.append(serial)
1363

    
1364
    def _report_object_change(self, user, account, path, details=None):
1365
        details = details or {}
1366
        details.update({'user': user})
1367
        logger.debug("_report_object_change: %s %s %s %s", user,
1368
                     account, path, details)
1369
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1370
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1371

    
1372
    def _report_sharing_change(self, user, account, path, details=None):
1373
        logger.debug("_report_permissions_change: %s %s %s %s",
1374
                     user, account, path, details)
1375
        details = details or {}
1376
        details.update({'user': user})
1377
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1378
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1379

    
1380
    # Policy functions.
1381

    
1382
    def _check_policy(self, policy, is_account_policy=True):
1383
        default_policy = self.default_account_policy \
1384
            if is_account_policy else self.default_container_policy
1385
        for k in policy.keys():
1386
            if policy[k] == '':
1387
                policy[k] = default_policy.get(k)
1388
        for k, v in policy.iteritems():
1389
            if k == 'quota':
1390
                q = int(v)  # May raise ValueError.
1391
                if q < 0:
1392
                    raise ValueError
1393
            elif k == 'versioning':
1394
                if v not in ['auto', 'none']:
1395
                    raise ValueError
1396
            else:
1397
                raise ValueError
1398

    
1399
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1400
        default_policy = self.default_account_policy \
1401
            if is_account_policy else self.default_container_policy
1402
        if replace:
1403
            for k, v in default_policy.iteritems():
1404
                if k not in policy:
1405
                    policy[k] = v
1406
        self.node.policy_set(node, policy)
1407

    
1408
    def _get_policy(self, node, is_account_policy=True):
1409
        default_policy = self.default_account_policy \
1410
            if is_account_policy else self.default_container_policy
1411
        policy = default_policy.copy()
1412
        policy.update(self.node.policy_get(node))
1413
        return policy
1414

    
1415
    def _apply_versioning(self, account, container, version_id,
1416
                          update_statistics_ancestors_depth=None):
1417
        """Delete the provided version if such is the policy.
1418
           Return size of object removed.
1419
        """
1420

    
1421
        if version_id is None:
1422
            return 0
1423
        path, node = self._lookup_container(account, container)
1424
        versioning = self._get_policy(
1425
            node, is_account_policy=False)['versioning']
1426
        if versioning != 'auto':
1427
            hash, size = self.node.version_remove(
1428
                version_id, update_statistics_ancestors_depth)
1429
            self.store.map_delete(hash)
1430
            return size
1431
        elif self.free_versioning:
1432
            return self.node.version_get_properties(
1433
                version_id, keys=('size',))[0]
1434
        return 0
1435

    
1436
    # Access control functions.
1437

    
1438
    def _check_groups(self, groups):
1439
        # raise ValueError('Bad characters in groups')
1440
        pass
1441

    
1442
    def _check_permissions(self, path, permissions):
1443
        # raise ValueError('Bad characters in permissions')
1444
        pass
1445

    
1446
    def _get_formatted_paths(self, paths):
1447
        formatted = []
1448
        for p in paths:
1449
            node = self.node.node_lookup(p)
1450
            props = None
1451
            if node is not None:
1452
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1453
            if props is not None:
1454
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1455
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1456
                formatted.append((p, self.MATCH_EXACT))
1457
        return formatted
1458

    
1459
    def _get_permissions_path(self, account, container, name):
1460
        path = '/'.join((account, container, name))
1461
        permission_paths = self.permissions.access_inherit(path)
1462
        permission_paths.sort()
1463
        permission_paths.reverse()
1464
        for p in permission_paths:
1465
            if p == path:
1466
                return p
1467
            else:
1468
                if p.count('/') < 2:
1469
                    continue
1470
                node = self.node.node_lookup(p)
1471
                props = None
1472
                if node is not None:
1473
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1474
                if props is not None:
1475
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1476
                        return p
1477
        return None
1478

    
1479
    def _can_read(self, user, account, container, name):
1480
        if user == account:
1481
            return True
1482
        path = '/'.join((account, container, name))
1483
        if self.permissions.public_get(path) is not None:
1484
            return True
1485
        path = self._get_permissions_path(account, container, name)
1486
        if not path:
1487
            raise NotAllowedError
1488
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1489
            raise NotAllowedError
1490

    
1491
    def _can_write(self, user, account, container, name):
1492
        if user == account:
1493
            return True
1494
        path = '/'.join((account, container, name))
1495
        path = self._get_permissions_path(account, container, name)
1496
        if not path:
1497
            raise NotAllowedError
1498
        if not self.permissions.access_check(path, self.WRITE, user):
1499
            raise NotAllowedError
1500

    
1501
    def _allowed_accounts(self, user):
1502
        allow = set()
1503
        for path in self.permissions.access_list_paths(user):
1504
            allow.add(path.split('/', 1)[0])
1505
        return sorted(allow)
1506

    
1507
    def _allowed_containers(self, user, account):
1508
        allow = set()
1509
        for path in self.permissions.access_list_paths(user, account):
1510
            allow.add(path.split('/', 2)[1])
1511
        return sorted(allow)
1512

    
1513
    # Domain functions
1514

    
1515
    def get_domain_objects(self, domain, user=None):
1516
        allowed_paths = self.permissions.access_list_paths(user)
1517
        if not allowed_paths:
1518
            return []
1519
        obj_list = self.node.domain_object_list(
1520
            domain, allowed_paths, CLUSTER_NORMAL)
1521
        return [(path,
1522
                 self._build_metadata(props, user_defined_meta),
1523
                 self.permissions.access_get(path)) for
1524
                path, props, user_defined_meta in obj_list]
1525

    
1526
    # util functions
1527

    
1528
    def _build_metadata(self, props, user_defined=None,
1529
                        include_user_defined=True):
1530
        meta = {'bytes': props[self.SIZE],
1531
                'type': props[self.TYPE],
1532
                'hash': props[self.HASH],
1533
                'version': props[self.SERIAL],
1534
                'version_timestamp': props[self.MTIME],
1535
                'modified_by': props[self.MUSER],
1536
                'uuid': props[self.UUID],
1537
                'checksum': props[self.CHECKSUM]}
1538
        if include_user_defined and user_defined != None:
1539
            meta.update(user_defined)
1540
        return meta
1541

    
1542
    def _has_read_access(self, user, path):
1543
        try:
1544
            account, container, object = path.split('/', 2)
1545
        except ValueError:
1546
            raise ValueError('Invalid object path')
1547

    
1548
        assert isinstance(user, basestring), "Invalid user"
1549

    
1550
        try:
1551
            self._can_read(user, account, container, object)
1552
        except NotAllowedError:
1553
            return False
1554
        else:
1555
            return True