Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 133e3fcf

History | View | Annotate | Download (68.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
98
DEFAULT_HASH_ALGORITHM = 'sha256'
99
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
100
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
101
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
102
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
103
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
104
                               'abcdefghijklmnopqrstuvwxyz'
105
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
106
DEFAULT_PUBLIC_URL_SECURITY = 16
107

    
108
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
109
QUEUE_CLIENT_ID = 'pithos'
110
QUEUE_INSTANCE_ID = '1'
111

    
112
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
113

    
114
inf = float('inf')
115

    
116
ULTIMATE_ANSWER = 42
117

    
118
DEFAULT_SOURCE = 'system'
119

    
120
logger = logging.getLogger(__name__)
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 block_size=None, hash_algorithm=None,
132
                 queue_module=None, queue_hosts=None, queue_exchange=None,
133
                 astakos_url=None, service_token=None,
134
                 astakosclient_poolsize=None,
135
                 free_versioning=True, block_params=None,
136
                 public_url_security=None,
137
                 public_url_alphabet=None,
138
                 account_quota_policy=None,
139
                 container_quota_policy=None,
140
                 container_versioning_policy=None):
141
        db_module = db_module or DEFAULT_DB_MODULE
142
        db_connection = db_connection or DEFAULT_DB_CONNECTION
143
        block_module = block_module or DEFAULT_BLOCK_MODULE
144
        block_path = block_path or DEFAULT_BLOCK_PATH
145
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
146
        block_params = block_params or DEFAULT_BLOCK_PARAMS
147
        block_size = block_size or DEFAULT_BLOCK_SIZE
148
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
149
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
150
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
151
        container_quota_policy = container_quota_policy \
152
            or DEFAULT_CONTAINER_QUOTA
153
        container_versioning_policy = container_versioning_policy \
154
            or DEFAULT_CONTAINER_VERSIONING
155

    
156
        self.default_account_policy = {'quota': account_quota_policy}
157
        self.default_container_policy = {
158
            'quota': container_quota_policy,
159
            'versioning': container_versioning_policy
160
        }
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.public_url_security = (public_url_security or
165
            DEFAULT_PUBLIC_URL_SECURITY)
166
        self.public_url_alphabet = (public_url_alphabet or
167
            DEFAULT_PUBLIC_URL_ALPHABET)
168

    
169
        self.hash_algorithm = hash_algorithm
170
        self.block_size = block_size
171
        self.free_versioning = free_versioning
172

    
173
        def load_module(m):
174
            __import__(m)
175
            return sys.modules[m]
176

    
177
        self.db_module = load_module(db_module)
178
        self.wrapper = self.db_module.DBWrapper(db_connection)
179
        params = {'wrapper': self.wrapper}
180
        self.permissions = self.db_module.Permissions(**params)
181
        self.config = self.db_module.Config(**params)
182
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
183
        for x in ['READ', 'WRITE']:
184
            setattr(self, x, getattr(self.db_module, x))
185
        self.node = self.db_module.Node(**params)
186
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
187
            setattr(self, x, getattr(self.db_module, x))
188

    
189
        self.block_module = load_module(block_module)
190
        self.block_params = block_params
191
        params = {'path': block_path,
192
                  'block_size': self.block_size,
193
                  'hash_algorithm': self.hash_algorithm,
194
                  'umask': block_umask}
195
        params.update(self.block_params)
196
        self.store = self.block_module.Store(**params)
197

    
198
        if queue_module and queue_hosts:
199
            self.queue_module = load_module(queue_module)
200
            params = {'hosts': queue_hosts,
201
                      'exchange': queue_exchange,
202
                      'client_id': QUEUE_CLIENT_ID}
203
            self.queue = self.queue_module.Queue(**params)
204
        else:
205
            class NoQueue:
206
                def send(self, *args):
207
                    pass
208

    
209
                def close(self):
210
                    pass
211

    
212
            self.queue = NoQueue()
213

    
214
        self.astakos_url = astakos_url
215
        self.service_token = service_token
216

    
217
        if not astakos_url or not AstakosClient:
218
            self.astakosclient = DisabledAstakosClient(
219
                astakos_url,
220
                use_pool=True,
221
                pool_size=astakosclient_poolsize)
222
        else:
223
            self.astakosclient = AstakosClient(
224
                astakos_url,
225
                use_pool=True,
226
                pool_size=astakosclient_poolsize)
227

    
228
        self.serials = []
229
        self.messages = []
230

    
231
    def close(self):
232
        self.wrapper.close()
233
        self.queue.close()
234

    
235
    @property
236
    def using_external_quotaholder(self):
237
        return not isinstance(self.astakosclient, DisabledAstakosClient)
238

    
239
    def list_accounts(self, user, marker=None, limit=10000):
240
        """Return a list of accounts the user can access."""
241

    
242
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
243
        allowed = self._allowed_accounts(user)
244
        start, limit = self._list_limits(allowed, marker, limit)
245
        return allowed[start:start + limit]
246

    
247
    def get_account_meta(
248
            self, user, account, domain, until=None, include_user_defined=True,
249
            external_quota=None):
250
        """Return a dictionary with the account metadata for the domain."""
251

    
252
        logger.debug(
253
            "get_account_meta: %s %s %s %s", user, account, domain, until)
254
        path, node = self._lookup_account(account, user == account)
255
        if user != account:
256
            if until or node is None or account not in self._allowed_accounts(user):
257
                raise NotAllowedError
258
        try:
259
            props = self._get_properties(node, until)
260
            mtime = props[self.MTIME]
261
        except NameError:
262
            props = None
263
            mtime = until
264
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
265
        tstamp = max(tstamp, mtime)
266
        if until is None:
267
            modified = tstamp
268
        else:
269
            modified = self._get_statistics(
270
                node, compute=True)[2]  # Overall last modification.
271
            modified = max(modified, mtime)
272

    
273
        if user != account:
274
            meta = {'name': account}
275
        else:
276
            meta = {}
277
            if props is not None and include_user_defined:
278
                meta.update(
279
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
280
            if until is not None:
281
                meta.update({'until_timestamp': tstamp})
282
            meta.update({'name': account, 'count': count, 'bytes': bytes})
283
            if self.using_external_quotaholder:
284
                external_quota = external_quota or {}
285
                meta['bytes'] = external_quota.get('usage', 0)
286
        meta.update({'modified': modified})
287
        return meta
288

    
289
    def update_account_meta(self, user, account, domain, meta, replace=False):
290
        """Update the metadata associated with the account for the domain."""
291

    
292
        logger.debug("update_account_meta: %s %s %s %s %s", user,
293
                     account, domain, meta, replace)
294
        if user != account:
295
            raise NotAllowedError
296
        path, node = self._lookup_account(account, True)
297
        self._put_metadata(user, node, domain, meta, replace,
298
                           update_statistics_ancestors_depth=-1)
299

    
300
    def get_account_groups(self, user, account):
301
        """Return a dictionary with the user groups defined for this account."""
302

    
303
        logger.debug("get_account_groups: %s %s", user, account)
304
        if user != account:
305
            if account not in self._allowed_accounts(user):
306
                raise NotAllowedError
307
            return {}
308
        self._lookup_account(account, True)
309
        return self.permissions.group_dict(account)
310

    
311
    def update_account_groups(self, user, account, groups, replace=False):
312
        """Update the groups associated with the account."""
313

    
314
        logger.debug("update_account_groups: %s %s %s %s", user,
315
                     account, groups, replace)
316
        if user != account:
317
            raise NotAllowedError
318
        self._lookup_account(account, True)
319
        self._check_groups(groups)
320
        if replace:
321
            self.permissions.group_destroy(account)
322
        for k, v in groups.iteritems():
323
            if not replace:  # If not already deleted.
324
                self.permissions.group_delete(account, k)
325
            if v:
326
                self.permissions.group_addmany(account, k, v)
327

    
328
    def get_account_policy(self, user, account, external_quota=None):
329
        """Return a dictionary with the account policy."""
330

    
331
        logger.debug("get_account_policy: %s %s", user, account)
332
        if user != account:
333
            if account not in self._allowed_accounts(user):
334
                raise NotAllowedError
335
            return {}
336
        path, node = self._lookup_account(account, True)
337
        policy = self._get_policy(node, is_account_policy=True)
338
        if self.using_external_quotaholder:
339
            external_quota = external_quota or {}
340
            policy['quota'] = external_quota.get('limit', 0)
341
        return policy
342

    
343
    def update_account_policy(self, user, account, policy, replace=False):
344
        """Update the policy associated with the account."""
345

    
346
        logger.debug("update_account_policy: %s %s %s %s", user,
347
                     account, policy, replace)
348
        if user != account:
349
            raise NotAllowedError
350
        path, node = self._lookup_account(account, True)
351
        self._check_policy(policy, is_account_policy=True)
352
        self._put_policy(node, policy, replace, is_account_policy=True)
353

    
354
    def put_account(self, user, account, policy=None):
355
        """Create a new account with the given name."""
356

    
357
        logger.debug("put_account: %s %s %s", user, account, policy)
358
        policy = policy or {}
359
        if user != account:
360
            raise NotAllowedError
361
        node = self.node.node_lookup(account)
362
        if node is not None:
363
            raise AccountExists('Account already exists')
364
        if policy:
365
            self._check_policy(policy, is_account_policy=True)
366
        node = self._put_path(user, self.ROOTNODE, account,
367
                              update_statistics_ancestors_depth=-1)
368
        self._put_policy(node, policy, True, is_account_policy=True)
369

    
370
    def delete_account(self, user, account):
371
        """Delete the account with the given name."""
372

    
373
        logger.debug("delete_account: %s %s", user, account)
374
        if user != account:
375
            raise NotAllowedError
376
        node = self.node.node_lookup(account)
377
        if node is None:
378
            return
379
        if not self.node.node_remove(node,
380
                                     update_statistics_ancestors_depth=-1):
381
            raise AccountNotEmpty('Account is not empty')
382
        self.permissions.group_destroy(account)
383

    
384
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
385
        """Return a list of containers existing under an account."""
386

    
387
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
388
                     account, marker, limit, shared, until, public)
389
        if user != account:
390
            if until or account not in self._allowed_accounts(user):
391
                raise NotAllowedError
392
            allowed = self._allowed_containers(user, account)
393
            start, limit = self._list_limits(allowed, marker, limit)
394
            return allowed[start:start + limit]
395
        if shared or public:
396
            allowed = set()
397
            if shared:
398
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
399
            if public:
400
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
401
            allowed = sorted(allowed)
402
            start, limit = self._list_limits(allowed, marker, limit)
403
            return allowed[start:start + limit]
404
        node = self.node.node_lookup(account)
405
        containers = [x[0] for x in self._list_object_properties(
406
            node, account, '', '/', marker, limit, False, None, [], until)]
407
        start, limit = self._list_limits(
408
            [x[0] for x in containers], marker, limit)
409
        return containers[start:start + limit]
410

    
411
    def list_container_meta(self, user, account, container, domain, until=None):
412
        """Return a list with all the container's object meta keys for the domain."""
413

    
414
        logger.debug("list_container_meta: %s %s %s %s %s", user,
415
                     account, container, domain, until)
416
        allowed = []
417
        if user != account:
418
            if until:
419
                raise NotAllowedError
420
            allowed = self.permissions.access_list_paths(
421
                user, '/'.join((account, container)))
422
            if not allowed:
423
                raise NotAllowedError
424
        path, node = self._lookup_container(account, container)
425
        before = until if until is not None else inf
426
        allowed = self._get_formatted_paths(allowed)
427
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
428

    
429
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
430
        """Return a dictionary with the container metadata for the domain."""
431

    
432
        logger.debug("get_container_meta: %s %s %s %s %s", user,
433
                     account, container, domain, until)
434
        if user != account:
435
            if until or container not in self._allowed_containers(user, account):
436
                raise NotAllowedError
437
        path, node = self._lookup_container(account, container)
438
        props = self._get_properties(node, until)
439
        mtime = props[self.MTIME]
440
        count, bytes, tstamp = self._get_statistics(node, until)
441
        tstamp = max(tstamp, mtime)
442
        if until is None:
443
            modified = tstamp
444
        else:
445
            modified = self._get_statistics(
446
                node)[2]  # Overall last modification.
447
            modified = max(modified, mtime)
448

    
449
        if user != account:
450
            meta = {'name': container}
451
        else:
452
            meta = {}
453
            if include_user_defined:
454
                meta.update(
455
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
456
            if until is not None:
457
                meta.update({'until_timestamp': tstamp})
458
            meta.update({'name': container, 'count': count, 'bytes': bytes})
459
        meta.update({'modified': modified})
460
        return meta
461

    
462
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
463
        """Update the metadata associated with the container for the domain."""
464

    
465
        logger.debug("update_container_meta: %s %s %s %s %s %s",
466
                     user, account, container, domain, meta, replace)
467
        if user != account:
468
            raise NotAllowedError
469
        path, node = self._lookup_container(account, container)
470
        src_version_id, dest_version_id = self._put_metadata(
471
            user, node, domain, meta, replace,
472
            update_statistics_ancestors_depth=0)
473
        if src_version_id is not None:
474
            versioning = self._get_policy(
475
                node, is_account_policy=False)['versioning']
476
            if versioning != 'auto':
477
                self.node.version_remove(src_version_id,
478
                                         update_statistics_ancestors_depth=0)
479

    
480
    def get_container_policy(self, user, account, container):
481
        """Return a dictionary with the container policy."""
482

    
483
        logger.debug(
484
            "get_container_policy: %s %s %s", user, account, container)
485
        if user != account:
486
            if container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
            return {}
489
        path, node = self._lookup_container(account, container)
490
        return self._get_policy(node, is_account_policy=False)
491

    
492
    def update_container_policy(self, user, account, container, policy, replace=False):
493
        """Update the policy associated with the container."""
494

    
495
        logger.debug("update_container_policy: %s %s %s %s %s",
496
                     user, account, container, policy, replace)
497
        if user != account:
498
            raise NotAllowedError
499
        path, node = self._lookup_container(account, container)
500
        self._check_policy(policy, is_account_policy=False)
501
        self._put_policy(node, policy, replace, is_account_policy=False)
502

    
503
    def put_container(self, user, account, container, policy=None):
504
        """Create a new container with the given name."""
505

    
506
        logger.debug(
507
            "put_container: %s %s %s %s", user, account, container, policy)
508
        policy = policy or {}
509
        if user != account:
510
            raise NotAllowedError
511
        try:
512
            path, node = self._lookup_container(account, container)
513
        except NameError:
514
            pass
515
        else:
516
            raise ContainerExists('Container already exists')
517
        if policy:
518
            self._check_policy(policy, is_account_policy=False)
519
        path = '/'.join((account, container))
520
        node = self._put_path(
521
            user, self._lookup_account(account, True)[1], path,
522
            update_statistics_ancestors_depth=-1)
523
        self._put_policy(node, policy, True, is_account_policy=False)
524

    
525
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
526
        """Delete/purge the container with the given name."""
527

    
528
        logger.debug("delete_container: %s %s %s %s %s %s", user,
529
                     account, container, until, prefix, delimiter)
530
        if user != account:
531
            raise NotAllowedError
532
        path, node = self._lookup_container(account, container)
533

    
534
        if until is not None:
535
            hashes, size, serials = self.node.node_purge_children(
536
                node, until, CLUSTER_HISTORY,
537
                update_statistics_ancestors_depth=0)
538
            for h in hashes:
539
                self.store.map_delete(h)
540
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
541
                                          update_statistics_ancestors_depth=0)
542
            if not self.free_versioning:
543
                self._report_size_change(
544
                    user, account, -size, {
545
                        'action':'container purge',
546
                        'path': path,
547
                        'versions': ','.join(str(i) for i in serials)
548
                    }
549
                )
550
            return
551

    
552
        if not delimiter:
553
            if self._get_statistics(node)[0] > 0:
554
                raise ContainerNotEmpty('Container is not empty')
555
            hashes, size, serials = self.node.node_purge_children(
556
                node, inf, CLUSTER_HISTORY,
557
                update_statistics_ancestors_depth=0)
558
            for h in hashes:
559
                self.store.map_delete(h)
560
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
561
                                          update_statistics_ancestors_depth=0)
562
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
563
            if not self.free_versioning:
564
                self._report_size_change(
565
                    user, account, -size, {
566
                        'action':'container purge',
567
                        'path': path,
568
                        'versions': ','.join(str(i) for i in serials)
569
                    }
570
                )
571
        else:
572
            # remove only contents
573
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
574
            paths = []
575
            for t in src_names:
576
                path = '/'.join((account, container, t[0]))
577
                node = t[2]
578
                src_version_id, dest_version_id = self._put_version_duplicate(
579
                    user, node, size=0, type='', hash=None, checksum='',
580
                    cluster=CLUSTER_DELETED,
581
                    update_statistics_ancestors_depth=1)
582
                del_size = self._apply_versioning(
583
                    account, container, src_version_id,
584
                    update_statistics_ancestors_depth=1)
585
                self._report_size_change(
586
                        user, account, -del_size, {
587
                                'action': 'object delete',
588
                                'path': path,
589
                        'versions': ','.join([str(dest_version_id)])
590
                     }
591
                )
592
                self._report_object_change(
593
                    user, account, path, details={'action': 'object delete'})
594
                paths.append(path)
595
            self.permissions.access_clear_bulk(paths)
596

    
597
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
598
        if user != account and until:
599
            raise NotAllowedError
600
        if shared and public:
601
            # get shared first
602
            shared_paths = self._list_object_permissions(
603
                user, account, container, prefix, shared=True, public=False)
604
            objects = set()
605
            if shared_paths:
606
                path, node = self._lookup_container(account, container)
607
                shared_paths = self._get_formatted_paths(shared_paths)
608
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
609

    
610
            # get public
611
            objects |= set(self._list_public_object_properties(
612
                user, account, container, prefix, all_props))
613
            objects = list(objects)
614

    
615
            objects.sort(key=lambda x: x[0])
616
            start, limit = self._list_limits(
617
                [x[0] for x in objects], marker, limit)
618
            return objects[start:start + limit]
619
        elif public:
620
            objects = self._list_public_object_properties(
621
                user, account, container, prefix, all_props)
622
            start, limit = self._list_limits(
623
                [x[0] for x in objects], marker, limit)
624
            return objects[start:start + limit]
625

    
626
        allowed = self._list_object_permissions(
627
            user, account, container, prefix, shared, public)
628
        if shared and not allowed:
629
            return []
630
        path, node = self._lookup_container(account, container)
631
        allowed = self._get_formatted_paths(allowed)
632
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
633
        start, limit = self._list_limits(
634
            [x[0] for x in objects], marker, limit)
635
        return objects[start:start + limit]
636

    
637
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
638
        public = self._list_object_permissions(
639
            user, account, container, prefix, shared=False, public=True)
640
        paths, nodes = self._lookup_objects(public)
641
        path = '/'.join((account, container))
642
        cont_prefix = path + '/'
643
        paths = [x[len(cont_prefix):] for x in paths]
644
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
645
        objects = [(path,) + props for path, props in zip(paths, props)]
646
        return objects
647

    
648
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
649
        objects = []
650
        while True:
651
            marker = objects[-1] if objects else None
652
            limit = 10000
653
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
654
            objects.extend(l)
655
            if not l or len(l) < limit:
656
                break
657
        return objects
658

    
659
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
660
        allowed = []
661
        path = '/'.join((account, container, prefix)).rstrip('/')
662
        if user != account:
663
            allowed = self.permissions.access_list_paths(user, path)
664
            if not allowed:
665
                raise NotAllowedError
666
        else:
667
            allowed = set()
668
            if shared:
669
                allowed.update(self.permissions.access_list_shared(path))
670
            if public:
671
                allowed.update(
672
                    [x[0] for x in self.permissions.public_list(path)])
673
            allowed = sorted(allowed)
674
            if not allowed:
675
                return []
676
        return allowed
677

    
678
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
679
        """Return a list of object (name, version_id) tuples existing under a container."""
680

    
681
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
682
        keys = keys or []
683
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
684

    
685
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
686
        """Return a list of object metadata dicts existing under a container."""
687

    
688
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
689
        keys = keys or []
690
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
691
        objects = []
692
        for p in props:
693
            if len(p) == 2:
694
                objects.append({'subdir': p[0]})
695
            else:
696
                objects.append({'name': p[0],
697
                                'bytes': p[self.SIZE + 1],
698
                                'type': p[self.TYPE + 1],
699
                                'hash': p[self.HASH + 1],
700
                                'version': p[self.SERIAL + 1],
701
                                'version_timestamp': p[self.MTIME + 1],
702
                                'modified': p[self.MTIME + 1] if until is None else None,
703
                                'modified_by': p[self.MUSER + 1],
704
                                'uuid': p[self.UUID + 1],
705
                                'checksum': p[self.CHECKSUM + 1]})
706
        return objects
707

    
708
    def list_object_permissions(self, user, account, container, prefix=''):
709
        """Return a list of paths that enforce permissions under a container."""
710

    
711
        logger.debug("list_object_permissions: %s %s %s %s", user,
712
                     account, container, prefix)
713
        return self._list_object_permissions(user, account, container, prefix, True, False)
714

    
715
    def list_object_public(self, user, account, container, prefix=''):
716
        """Return a dict mapping paths to public ids for objects that are public under a container."""
717

    
718
        logger.debug("list_object_public: %s %s %s %s", user,
719
                     account, container, prefix)
720
        public = {}
721
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
722
            public[path] = p
723
        return public
724

    
725
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
726
        """Return a dictionary with the object metadata for the domain."""
727

    
728
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
729
                     account, container, name, domain, version)
730
        self._can_read(user, account, container, name)
731
        path, node = self._lookup_object(account, container, name)
732
        props = self._get_version(node, version)
733
        if version is None:
734
            modified = props[self.MTIME]
735
        else:
736
            try:
737
                modified = self._get_version(
738
                    node)[self.MTIME]  # Overall last modification.
739
            except NameError:  # Object may be deleted.
740
                del_props = self.node.version_lookup(
741
                    node, inf, CLUSTER_DELETED)
742
                if del_props is None:
743
                    raise ItemNotExists('Object does not exist')
744
                modified = del_props[self.MTIME]
745

    
746
        meta = {}
747
        if include_user_defined:
748
            meta.update(
749
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
750
        meta.update({'name': name,
751
                     'bytes': props[self.SIZE],
752
                     'type': props[self.TYPE],
753
                     'hash': props[self.HASH],
754
                     'version': props[self.SERIAL],
755
                     'version_timestamp': props[self.MTIME],
756
                     'modified': modified,
757
                     'modified_by': props[self.MUSER],
758
                     'uuid': props[self.UUID],
759
                     'checksum': props[self.CHECKSUM]})
760
        return meta
761

    
762
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
763
        """Update the metadata associated with the object for the domain and return the new version."""
764

    
765
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
766
                     user, account, container, name, domain, meta, replace)
767
        self._can_write(user, account, container, name)
768
        path, node = self._lookup_object(account, container, name)
769
        src_version_id, dest_version_id = self._put_metadata(
770
            user, node, domain, meta, replace,
771
            update_statistics_ancestors_depth=1)
772
        self._apply_versioning(account, container, src_version_id,
773
                               update_statistics_ancestors_depth=1)
774
        return dest_version_id
775

    
776
    def get_object_permissions(self, user, account, container, name):
777
        """Return the action allowed on the object, the path
778
        from which the object gets its permissions from,
779
        along with a dictionary containing the permissions."""
780

    
781
        logger.debug("get_object_permissions: %s %s %s %s", user,
782
                     account, container, name)
783
        allowed = 'write'
784
        permissions_path = self._get_permissions_path(account, container, name)
785
        if user != account:
786
            if self.permissions.access_check(permissions_path, self.WRITE, user):
787
                allowed = 'write'
788
            elif self.permissions.access_check(permissions_path, self.READ, user):
789
                allowed = 'read'
790
            else:
791
                raise NotAllowedError
792
        self._lookup_object(account, container, name)
793
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
794

    
795
    def update_object_permissions(self, user, account, container, name, permissions):
796
        """Update the permissions associated with the object."""
797

    
798
        logger.debug("update_object_permissions: %s %s %s %s %s",
799
                     user, account, container, name, permissions)
800
        if user != account:
801
            raise NotAllowedError
802
        path = self._lookup_object(account, container, name)[0]
803
        self._check_permissions(path, permissions)
804
        self.permissions.access_set(path, permissions)
805
        self._report_sharing_change(user, account, path, {'members':
806
                                    self.permissions.access_members(path)})
807

    
808
    def get_object_public(self, user, account, container, name):
809
        """Return the public id of the object if applicable."""
810

    
811
        logger.debug(
812
            "get_object_public: %s %s %s %s", user, account, container, name)
813
        self._can_read(user, account, container, name)
814
        path = self._lookup_object(account, container, name)[0]
815
        p = self.permissions.public_get(path)
816
        return p
817

    
818
    def update_object_public(self, user, account, container, name, public):
819
        """Update the public status of the object."""
820

    
821
        logger.debug("update_object_public: %s %s %s %s %s", user,
822
                     account, container, name, public)
823
        self._can_write(user, account, container, name)
824
        path = self._lookup_object(account, container, name)[0]
825
        if not public:
826
            self.permissions.public_unset(path)
827
        else:
828
            self.permissions.public_set(
829
                path, self.public_url_security, self.public_url_alphabet
830
            )
831

    
832
    def get_object_hashmap(self, user, account, container, name, version=None):
833
        """Return the object's size and a list with partial hashes."""
834

    
835
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
836
                     account, container, name, version)
837
        self._can_read(user, account, container, name)
838
        path, node = self._lookup_object(account, container, name)
839
        props = self._get_version(node, version)
840
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
841
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
842

    
843
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
844
        if permissions is not None and user != account:
845
            raise NotAllowedError
846
        self._can_write(user, account, container, name)
847
        if permissions is not None:
848
            path = '/'.join((account, container, name))
849
            self._check_permissions(path, permissions)
850

    
851
        account_path, account_node = self._lookup_account(account, True)
852
        container_path, container_node = self._lookup_container(
853
            account, container)
854

    
855
        path, node = self._put_object_node(
856
            container_path, container_node, name)
857
        pre_version_id, dest_version_id = self._put_version_duplicate(
858
            user, node, src_node=src_node, size=size, type=type, hash=hash,
859
            checksum=checksum, is_copy=is_copy,
860
            update_statistics_ancestors_depth=1)
861

    
862
        # Handle meta.
863
        if src_version_id is None:
864
            src_version_id = pre_version_id
865
        self._put_metadata_duplicate(
866
            src_version_id, dest_version_id, domain, meta, replace_meta)
867

    
868
        del_size = self._apply_versioning(account, container, pre_version_id,
869
                                          update_statistics_ancestors_depth=1)
870
        size_delta = size - del_size
871
        if size_delta > 0:
872
            # Check account quota.
873
            if not self.using_external_quotaholder:
874
                account_quota = long(
875
                    self._get_policy(account_node, is_account_policy=True
876
                    )['quota']
877
                )
878
                account_usage = self._get_statistics(account_node, compute=True)[1]
879
                if (account_quota > 0 and account_usage > account_quota):
880
                    raise QuotaError(
881
                        'Account quota exceeded: limit: %s, usage: %s' % (
882
                            account_quota, account_usage
883
                        )
884
                    )
885

    
886
            # Check container quota.
887
            container_quota = long(
888
                self._get_policy(container_node, is_account_policy=False
889
                )['quota']
890
            )
891
            container_usage = self._get_statistics(container_node)[1]
892
            if (container_quota > 0 and container_usage > container_quota):
893
                # This must be executed in a transaction, so the version is
894
                # never created if it fails.
895
                raise QuotaError(
896
                    'Container quota exceeded: limit: %s, usage: %s' % (
897
                        container_quota, container_usage
898
                    )
899
                )
900

    
901
        self._report_size_change(user, account, size_delta,
902
                                 {'action': 'object update', 'path': path,
903
                                  'versions': ','.join([str(dest_version_id)])})
904
        if permissions is not None:
905
            self.permissions.access_set(path, permissions)
906
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
907

    
908
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
909
        return dest_version_id
910

    
911
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
912
        """Create/update an object with the specified size and partial hashes."""
913

    
914
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
915
                     account, container, name, size, type, hashmap, checksum)
916
        meta = meta or {}
917
        if size == 0:  # No such thing as an empty hashmap.
918
            hashmap = [self.put_block('')]
919
        map = HashMap(self.block_size, self.hash_algorithm)
920
        map.extend([binascii.unhexlify(x) for x in hashmap])
921
        missing = self.store.block_search(map)
922
        if missing:
923
            ie = IndexError()
924
            ie.data = [binascii.hexlify(x) for x in missing]
925
            raise ie
926

    
927
        hash = map.hash()
928
        hexlified = binascii.hexlify(hash)
929
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, hexlified, checksum, domain, meta, replace_meta, permissions)
930
        self.store.map_put(hash, map)
931
        return dest_version_id, hexlified
932

    
933
    def update_object_checksum(self, user, account, container, name, version, checksum):
934
        """Update an object's checksum."""
935

    
936
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
937
                     user, account, container, name, version, checksum)
938
        # Update objects with greater version and same hashmap and size (fix metadata updates).
939
        self._can_write(user, account, container, name)
940
        path, node = self._lookup_object(account, container, name)
941
        props = self._get_version(node, version)
942
        versions = self.node.node_get_versions(node)
943
        for x in versions:
944
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
945
                self.node.version_put_property(
946
                    x[self.SERIAL], 'checksum', checksum)
947

    
948
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
949
        dest_meta = dest_meta or {}
950
        dest_version_ids = []
951
        self._can_read(user, src_account, src_container, src_name)
952
        path, node = self._lookup_object(src_account, src_container, src_name)
953
        # TODO: Will do another fetch of the properties in duplicate version...
954
        props = self._get_version(
955
            node, src_version)  # Check to see if source exists.
956
        src_version_id = props[self.SERIAL]
957
        hash = props[self.HASH]
958
        size = props[self.SIZE]
959
        is_copy = not is_move and (src_account, src_container, src_name) != (
960
            dest_account, dest_container, dest_name)  # New uuid.
961
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
962
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
963
            self._delete_object(user, src_account, src_container, src_name)
964

    
965
        if delimiter:
966
            prefix = src_name + \
967
                delimiter if not src_name.endswith(delimiter) else src_name
968
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
969
            src_names.sort(key=lambda x: x[2])  # order by nodes
970
            paths = [elem[0] for elem in src_names]
971
            nodes = [elem[2] for elem in src_names]
972
            # TODO: Will do another fetch of the properties in duplicate version...
973
            props = self._get_versions(nodes)  # Check to see if source exists.
974

    
975
            for prop, path, node in zip(props, paths, nodes):
976
                src_version_id = prop[self.SERIAL]
977
                hash = prop[self.HASH]
978
                vtype = prop[self.TYPE]
979
                size = prop[self.SIZE]
980
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
981
                    delimiter) else dest_name
982
                vdest_name = path.replace(prefix, dest_prefix, 1)
983
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
984
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
985
                    self._delete_object(user, src_account, src_container, path)
986
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
987

    
988
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
989
        """Copy an object's data and metadata."""
990

    
991
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
992
        meta = meta or {}
993
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
994
        return dest_version_id
995

    
996
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
997
        """Move an object's data and metadata."""
998

    
999
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1000
        meta = meta or {}
1001
        if user != src_account:
1002
            raise NotAllowedError
1003
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1004
        return dest_version_id
1005

    
1006
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1007
        if user != account:
1008
            raise NotAllowedError
1009

    
1010
        if until is not None:
1011
            path = '/'.join((account, container, name))
1012
            node = self.node.node_lookup(path)
1013
            if node is None:
1014
                return
1015
            hashes = []
1016
            size = 0
1017
            serials = []
1018
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1019
                                           update_statistics_ancestors_depth=1)
1020
            hashes += h
1021
            size += s
1022
            serials += v
1023
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1024
                                           update_statistics_ancestors_depth=1)
1025
            hashes += h
1026
            if not self.free_versioning:
1027
                size += s
1028
            serials += v
1029
            for h in hashes:
1030
                self.store.map_delete(h)
1031
            self.node.node_purge(node, until, CLUSTER_DELETED,
1032
                                 update_statistics_ancestors_depth=1)
1033
            try:
1034
                props = self._get_version(node)
1035
            except NameError:
1036
                self.permissions.access_clear(path)
1037
            self._report_size_change(
1038
                user, account, -size, {
1039
                    'action': 'object purge',
1040
                    'path': path,
1041
                    'versions': ','.join(str(i) for i in serials)
1042
                }
1043
            )
1044
            return
1045

    
1046
        path, node = self._lookup_object(account, container, name)
1047
        src_version_id, dest_version_id = self._put_version_duplicate(
1048
            user, node, size=0, type='', hash=None, checksum='',
1049
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1050
        del_size = self._apply_versioning(account, container, src_version_id,
1051
                                          update_statistics_ancestors_depth=1)
1052
        self._report_size_change(user, account, -del_size,
1053
                                 {'action': 'object delete', 'path': path,
1054
                                  'versions': ','.join([str(dest_version_id)])})
1055
        self._report_object_change(
1056
            user, account, path, details={'action': 'object delete'})
1057
        self.permissions.access_clear(path)
1058

    
1059
        if delimiter:
1060
            prefix = name + delimiter if not name.endswith(delimiter) else name
1061
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1062
            paths = []
1063
            for t in src_names:
1064
                path = '/'.join((account, container, t[0]))
1065
                node = t[2]
1066
                src_version_id, dest_version_id = self._put_version_duplicate(
1067
                    user, node, size=0, type='', hash=None, checksum='',
1068
                    cluster=CLUSTER_DELETED,
1069
                    update_statistics_ancestors_depth=1)
1070
                del_size = self._apply_versioning(
1071
                    account, container, src_version_id,
1072
                    update_statistics_ancestors_depth=1)
1073
                self._report_size_change(user, account, -del_size,
1074
                                         {'action': 'object delete',
1075
                                          'path': path,
1076
                                          'versions': ','.join([str(dest_version_id)])})
1077
                self._report_object_change(
1078
                    user, account, path, details={'action': 'object delete'})
1079
                paths.append(path)
1080
            self.permissions.access_clear_bulk(paths)
1081

    
1082
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1083
        """Delete/purge an object."""
1084

    
1085
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1086
                     account, container, name, until, prefix, delimiter)
1087
        self._delete_object(user, account, container, name, until, delimiter)
1088

    
1089
    def list_versions(self, user, account, container, name):
1090
        """Return a list of all (version, version_timestamp) tuples for an object."""
1091

    
1092
        logger.debug(
1093
            "list_versions: %s %s %s %s", user, account, container, name)
1094
        self._can_read(user, account, container, name)
1095
        path, node = self._lookup_object(account, container, name)
1096
        versions = self.node.node_get_versions(node)
1097
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1098

    
1099
    def get_uuid(self, user, uuid):
1100
        """Return the (account, container, name) for the UUID given."""
1101

    
1102
        logger.debug("get_uuid: %s %s", user, uuid)
1103
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1104
        if info is None:
1105
            raise NameError
1106
        path, serial = info
1107
        account, container, name = path.split('/', 2)
1108
        self._can_read(user, account, container, name)
1109
        return (account, container, name)
1110

    
1111
    def get_public(self, user, public):
1112
        """Return the (account, container, name) for the public id given."""
1113

    
1114
        logger.debug("get_public: %s %s", user, public)
1115
        path = self.permissions.public_path(public)
1116
        if path is None:
1117
            raise NameError
1118
        account, container, name = path.split('/', 2)
1119
        self._can_read(user, account, container, name)
1120
        return (account, container, name)
1121

    
1122
    def get_block(self, hash):
1123
        """Return a block's data."""
1124

    
1125
        logger.debug("get_block: %s", hash)
1126
        block = self.store.block_get(binascii.unhexlify(hash))
1127
        if not block:
1128
            raise ItemNotExists('Block does not exist')
1129
        return block
1130

    
1131
    def put_block(self, data):
1132
        """Store a block and return the hash."""
1133

    
1134
        logger.debug("put_block: %s", len(data))
1135
        return binascii.hexlify(self.store.block_put(data))
1136

    
1137
    def update_block(self, hash, data, offset=0):
1138
        """Update a known block and return the hash."""
1139

    
1140
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1141
        if offset == 0 and len(data) == self.block_size:
1142
            return self.put_block(data)
1143
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1144
        return binascii.hexlify(h)
1145

    
1146
    # Path functions.
1147

    
1148
    def _generate_uuid(self):
1149
        return str(uuidlib.uuid4())
1150

    
1151
    def _put_object_node(self, path, parent, name):
1152
        path = '/'.join((path, name))
1153
        node = self.node.node_lookup(path)
1154
        if node is None:
1155
            node = self.node.node_create(parent, path)
1156
        return path, node
1157

    
1158
    def _put_path(self, user, parent, path,
1159
                  update_statistics_ancestors_depth=None):
1160
        node = self.node.node_create(parent, path)
1161
        self.node.version_create(node, None, 0, '', None, user,
1162
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1163
                                 update_statistics_ancestors_depth)
1164
        return node
1165

    
1166
    def _lookup_account(self, account, create=True):
1167
        node = self.node.node_lookup(account)
1168
        if node is None and create:
1169
            node = self._put_path(
1170
                account, self.ROOTNODE, account,
1171
                update_statistics_ancestors_depth=-1)  # User is account.
1172
        return account, node
1173

    
1174
    def _lookup_container(self, account, container):
1175
        path = '/'.join((account, container))
1176
        node = self.node.node_lookup(path, for_update=True)
1177
        if node is None:
1178
            raise ItemNotExists('Container does not exist')
1179
        return path, node
1180

    
1181
    def _lookup_object(self, account, container, name):
1182
        path = '/'.join((account, container, name))
1183
        node = self.node.node_lookup(path)
1184
        if node is None:
1185
            raise ItemNotExists('Object does not exist')
1186
        return path, node
1187

    
1188
    def _lookup_objects(self, paths):
1189
        nodes = self.node.node_lookup_bulk(paths)
1190
        return paths, nodes
1191

    
1192
    def _get_properties(self, node, until=None):
1193
        """Return properties until the timestamp given."""
1194

    
1195
        before = until if until is not None else inf
1196
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1197
        if props is None and until is not None:
1198
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1199
        if props is None:
1200
            raise ItemNotExists('Path does not exist')
1201
        return props
1202

    
1203
    def _get_statistics(self, node, until=None, compute=False):
1204
        """Return count, sum of size and latest timestamp of everything under node."""
1205

    
1206
        if until is not None:
1207
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1208
        elif compute:
1209
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1210
        else:
1211
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1212
        if stats is None:
1213
            stats = (0, 0, 0)
1214
        return stats
1215

    
1216
    def _get_version(self, node, version=None):
1217
        if version is None:
1218
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1219
            if props is None:
1220
                raise ItemNotExists('Object does not exist')
1221
        else:
1222
            try:
1223
                version = int(version)
1224
            except ValueError:
1225
                raise VersionNotExists('Version does not exist')
1226
            props = self.node.version_get_properties(version)
1227
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1228
                raise VersionNotExists('Version does not exist')
1229
        return props
1230

    
1231
    def _get_versions(self, nodes):
1232
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1233

    
1234
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1235
                               type=None, hash=None, checksum=None,
1236
                               cluster=CLUSTER_NORMAL, is_copy=False,
1237
                               update_statistics_ancestors_depth=None):
1238
        """Create a new version of the node."""
1239

    
1240
        props = self.node.version_lookup(
1241
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1242
        if props is not None:
1243
            src_version_id = props[self.SERIAL]
1244
            src_hash = props[self.HASH]
1245
            src_size = props[self.SIZE]
1246
            src_type = props[self.TYPE]
1247
            src_checksum = props[self.CHECKSUM]
1248
        else:
1249
            src_version_id = None
1250
            src_hash = None
1251
            src_size = 0
1252
            src_type = ''
1253
            src_checksum = ''
1254
        if size is None:  # Set metadata.
1255
            hash = src_hash  # This way hash can be set to None (account or container).
1256
            size = src_size
1257
        if type is None:
1258
            type = src_type
1259
        if checksum is None:
1260
            checksum = src_checksum
1261
        uuid = self._generate_uuid(
1262
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1263

    
1264
        if src_node is None:
1265
            pre_version_id = src_version_id
1266
        else:
1267
            pre_version_id = None
1268
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1269
            if props is not None:
1270
                pre_version_id = props[self.SERIAL]
1271
        if pre_version_id is not None:
1272
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1273
                                        update_statistics_ancestors_depth)
1274

    
1275
        dest_version_id, mtime = self.node.version_create(
1276
            node, hash, size, type, src_version_id, user, uuid, checksum,
1277
            cluster, update_statistics_ancestors_depth)
1278
        return pre_version_id, dest_version_id
1279

    
1280
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1281
        if src_version_id is not None:
1282
            self.node.attribute_copy(src_version_id, dest_version_id)
1283
        if not replace:
1284
            self.node.attribute_del(dest_version_id, domain, (
1285
                k for k, v in meta.iteritems() if v == ''))
1286
            self.node.attribute_set(dest_version_id, domain, (
1287
                (k, v) for k, v in meta.iteritems() if v != ''))
1288
        else:
1289
            self.node.attribute_del(dest_version_id, domain)
1290
            self.node.attribute_set(dest_version_id, domain, ((
1291
                k, v) for k, v in meta.iteritems()))
1292

    
1293
    def _put_metadata(self, user, node, domain, meta, replace=False,
1294
                      update_statistics_ancestors_depth=None):
1295
        """Create a new version and store metadata."""
1296

    
1297
        src_version_id, dest_version_id = self._put_version_duplicate(
1298
            user, node,
1299
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1300
        self._put_metadata_duplicate(
1301
            src_version_id, dest_version_id, domain, meta, replace)
1302
        return src_version_id, dest_version_id
1303

    
1304
    def _list_limits(self, listing, marker, limit):
1305
        start = 0
1306
        if marker:
1307
            try:
1308
                start = listing.index(marker) + 1
1309
            except ValueError:
1310
                pass
1311
        if not limit or limit > 10000:
1312
            limit = 10000
1313
        return start, limit
1314

    
1315
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1316
        keys = keys or []
1317
        allowed = allowed or []
1318
        cont_prefix = path + '/'
1319
        prefix = cont_prefix + prefix
1320
        start = cont_prefix + marker if marker else None
1321
        before = until if until is not None else inf
1322
        filterq = keys if domain else []
1323
        sizeq = size_range
1324

    
1325
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1326
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1327
        objects.sort(key=lambda x: x[0])
1328
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1329
        return objects
1330

    
1331
    # Reporting functions.
1332

    
1333
    def _report_size_change(self, user, account, size, details=None):
1334
        details = details or {}
1335

    
1336
        if size == 0:
1337
            return
1338

    
1339
        account_node = self._lookup_account(account, True)[1]
1340
        total = self._get_statistics(account_node, compute=True)[1]
1341
        details.update({'user': user, 'total': total})
1342
        logger.debug(
1343
            "_report_size_change: %s %s %s %s", user, account, size, details)
1344
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1345
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1346
                              float(size), details))
1347

    
1348
        if not self.using_external_quotaholder:
1349
            return
1350

    
1351
        try:
1352
            name = details['path'] if 'path' in details else ''
1353
            serial = self.astakosclient.issue_one_commission(
1354
                token=self.service_token,
1355
                holder=account,
1356
                source=DEFAULT_SOURCE,
1357
                provisions={'pithos.diskspace': size},
1358
                name=name
1359
                )
1360
        except BaseException, e:
1361
            raise QuotaError(e)
1362
        else:
1363
            self.serials.append(serial)
1364

    
1365
    def _report_object_change(self, user, account, path, details=None):
1366
        details = details or {}
1367
        details.update({'user': user})
1368
        logger.debug("_report_object_change: %s %s %s %s", user,
1369
                     account, path, details)
1370
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1371
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1372

    
1373
    def _report_sharing_change(self, user, account, path, details=None):
1374
        logger.debug("_report_permissions_change: %s %s %s %s",
1375
                     user, account, path, details)
1376
        details = details or {}
1377
        details.update({'user': user})
1378
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1379
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1380

    
1381
    # Policy functions.
1382

    
1383
    def _check_policy(self, policy, is_account_policy=True):
1384
        default_policy = self.default_account_policy \
1385
            if is_account_policy else self.default_container_policy
1386
        for k in policy.keys():
1387
            if policy[k] == '':
1388
                policy[k] = default_policy.get(k)
1389
        for k, v in policy.iteritems():
1390
            if k == 'quota':
1391
                q = int(v)  # May raise ValueError.
1392
                if q < 0:
1393
                    raise ValueError
1394
            elif k == 'versioning':
1395
                if v not in ['auto', 'none']:
1396
                    raise ValueError
1397
            else:
1398
                raise ValueError
1399

    
1400
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1401
        default_policy = self.default_account_policy \
1402
            if is_account_policy else self.default_container_policy
1403
        if replace:
1404
            for k, v in default_policy.iteritems():
1405
                if k not in policy:
1406
                    policy[k] = v
1407
        self.node.policy_set(node, policy)
1408

    
1409
    def _get_policy(self, node, is_account_policy=True):
1410
        default_policy = self.default_account_policy \
1411
            if is_account_policy else self.default_container_policy
1412
        policy = default_policy.copy()
1413
        policy.update(self.node.policy_get(node))
1414
        return policy
1415

    
1416
    def _apply_versioning(self, account, container, version_id,
1417
                          update_statistics_ancestors_depth=None):
1418
        """Delete the provided version if such is the policy.
1419
           Return size of object removed.
1420
        """
1421

    
1422
        if version_id is None:
1423
            return 0
1424
        path, node = self._lookup_container(account, container)
1425
        versioning = self._get_policy(
1426
            node, is_account_policy=False)['versioning']
1427
        if versioning != 'auto':
1428
            hash, size = self.node.version_remove(
1429
                version_id, update_statistics_ancestors_depth)
1430
            self.store.map_delete(hash)
1431
            return size
1432
        elif self.free_versioning:
1433
            return self.node.version_get_properties(
1434
                version_id, keys=('size',))[0]
1435
        return 0
1436

    
1437
    # Access control functions.
1438

    
1439
    def _check_groups(self, groups):
1440
        # raise ValueError('Bad characters in groups')
1441
        pass
1442

    
1443
    def _check_permissions(self, path, permissions):
1444
        # raise ValueError('Bad characters in permissions')
1445
        pass
1446

    
1447
    def _get_formatted_paths(self, paths):
1448
        formatted = []
1449
        for p in paths:
1450
            node = self.node.node_lookup(p)
1451
            props = None
1452
            if node is not None:
1453
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1454
            if props is not None:
1455
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1456
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1457
                formatted.append((p, self.MATCH_EXACT))
1458
        return formatted
1459

    
1460
    def _get_permissions_path(self, account, container, name):
1461
        path = '/'.join((account, container, name))
1462
        permission_paths = self.permissions.access_inherit(path)
1463
        permission_paths.sort()
1464
        permission_paths.reverse()
1465
        for p in permission_paths:
1466
            if p == path:
1467
                return p
1468
            else:
1469
                if p.count('/') < 2:
1470
                    continue
1471
                node = self.node.node_lookup(p)
1472
                props = None
1473
                if node is not None:
1474
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1475
                if props is not None:
1476
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1477
                        return p
1478
        return None
1479

    
1480
    def _can_read(self, user, account, container, name):
1481
        if user == account:
1482
            return True
1483
        path = '/'.join((account, container, name))
1484
        if self.permissions.public_get(path) is not None:
1485
            return True
1486
        path = self._get_permissions_path(account, container, name)
1487
        if not path:
1488
            raise NotAllowedError
1489
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1490
            raise NotAllowedError
1491

    
1492
    def _can_write(self, user, account, container, name):
1493
        if user == account:
1494
            return True
1495
        path = '/'.join((account, container, name))
1496
        path = self._get_permissions_path(account, container, name)
1497
        if not path:
1498
            raise NotAllowedError
1499
        if not self.permissions.access_check(path, self.WRITE, user):
1500
            raise NotAllowedError
1501

    
1502
    def _allowed_accounts(self, user):
1503
        allow = set()
1504
        for path in self.permissions.access_list_paths(user):
1505
            allow.add(path.split('/', 1)[0])
1506
        return sorted(allow)
1507

    
1508
    def _allowed_containers(self, user, account):
1509
        allow = set()
1510
        for path in self.permissions.access_list_paths(user, account):
1511
            allow.add(path.split('/', 2)[1])
1512
        return sorted(allow)
1513

    
1514
    # Domain functions
1515

    
1516
    def get_domain_objects(self, domain, user=None):
1517
        allowed_paths = self.permissions.access_list_paths(user)
1518
        if not allowed_paths:
1519
            return []
1520
        obj_list = self.node.domain_object_list(
1521
            domain, allowed_paths, CLUSTER_NORMAL)
1522
        return [(path,
1523
                 self._build_metadata(props, user_defined_meta),
1524
                 self.permissions.access_get(path)) for
1525
                path, props, user_defined_meta in obj_list]
1526

    
1527
    # util functions
1528

    
1529
    def _build_metadata(self, props, user_defined=None,
1530
                        include_user_defined=True):
1531
        meta = {'bytes': props[self.SIZE],
1532
                'type': props[self.TYPE],
1533
                'hash': props[self.HASH],
1534
                'version': props[self.SERIAL],
1535
                'version_timestamp': props[self.MTIME],
1536
                'modified_by': props[self.MUSER],
1537
                'uuid': props[self.UUID],
1538
                'checksum': props[self.CHECKSUM]}
1539
        if include_user_defined and user_defined != None:
1540
            meta.update(user_defined)
1541
        return meta
1542

    
1543
    def _has_read_access(self, user, path):
1544
        try:
1545
            account, container, object = path.split('/', 2)
1546
        except ValueError:
1547
            raise ValueError('Invalid object path')
1548

    
1549
        assert isinstance(user, basestring), "Invalid user"
1550

    
1551
        try:
1552
            self._can_read(user, account, container, object)
1553
        except NotAllowedError:
1554
            return False
1555
        else:
1556
            return True