Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ ff0beb64

History | View | Annotate | Download (67.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
98
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
99
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
100
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
101
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
102
                               'abcdefghijklmnopqrstuvwxyz'
103
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
104
DEFAULT_PUBLIC_URL_SECURITY = 16
105

    
106
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
107
QUEUE_CLIENT_ID = 'pithos'
108
QUEUE_INSTANCE_ID = '1'
109

    
110
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
111

    
112
inf = float('inf')
113

    
114
ULTIMATE_ANSWER = 42
115

    
116
DEFAULT_SOURCE = 'system'
117

    
118
logger = logging.getLogger(__name__)
119

    
120

    
121
class ModularBackend(BaseBackend):
122
    """A modular backend.
123

124
    Uses modules for SQL functions and storage.
125
    """
126

    
127
    def __init__(self, db_module=None, db_connection=None,
128
                 block_module=None, block_path=None, block_umask=None,
129
                 queue_module=None, queue_hosts=None, queue_exchange=None,
130
                 astakos_url=None, service_token=None,
131
                 astakosclient_poolsize=None,
132
                 free_versioning=True, block_params=None,
133
                 public_url_security=None,
134
                 public_url_alphabet=None,
135
                 account_quota_policy=None,
136
                 container_quota_policy=None,
137
                 container_versioning_policy=None):
138
        db_module = db_module or DEFAULT_DB_MODULE
139
        db_connection = db_connection or DEFAULT_DB_CONNECTION
140
        block_module = block_module or DEFAULT_BLOCK_MODULE
141
        block_path = block_path or DEFAULT_BLOCK_PATH
142
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
143
        block_params = block_params or DEFAULT_BLOCK_PARAMS
144
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
145
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
146
        container_quota_policy = container_quota_policy \
147
            or DEFAULT_CONTAINER_QUOTA
148
        container_versioning_policy = container_versioning_policy \
149
            or DEFAULT_CONTAINER_VERSIONING
150

    
151
        self.default_account_policy = {'quota': account_quota_policy}
152
        self.default_container_policy = {
153
            'quota': container_quota_policy,
154
            'versioning': container_versioning_policy
155
        }
156
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
157
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
158

    
159
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
160
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
161

    
162
        self.hash_algorithm = 'sha256'
163
        self.block_size = 4 * 1024 * 1024  # 4MB
164
        self.free_versioning = free_versioning
165

    
166
        def load_module(m):
167
            __import__(m)
168
            return sys.modules[m]
169

    
170
        self.db_module = load_module(db_module)
171
        self.wrapper = self.db_module.DBWrapper(db_connection)
172
        params = {'wrapper': self.wrapper}
173
        self.permissions = self.db_module.Permissions(**params)
174
        self.config = self.db_module.Config(**params)
175
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
176
        for x in ['READ', 'WRITE']:
177
            setattr(self, x, getattr(self.db_module, x))
178
        self.node = self.db_module.Node(**params)
179
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
180
            setattr(self, x, getattr(self.db_module, x))
181

    
182
        self.block_module = load_module(block_module)
183
        self.block_params = block_params
184
        params = {'path': block_path,
185
                  'block_size': self.block_size,
186
                  'hash_algorithm': self.hash_algorithm,
187
                  'umask': block_umask}
188
        params.update(self.block_params)
189
        self.store = self.block_module.Store(**params)
190

    
191
        if queue_module and queue_hosts:
192
            self.queue_module = load_module(queue_module)
193
            params = {'hosts': queue_hosts,
194
                      'exchange': queue_exchange,
195
                      'client_id': QUEUE_CLIENT_ID}
196
            self.queue = self.queue_module.Queue(**params)
197
        else:
198
            class NoQueue:
199
                def send(self, *args):
200
                    pass
201

    
202
                def close(self):
203
                    pass
204

    
205
            self.queue = NoQueue()
206

    
207
        self.astakos_url = astakos_url
208
        self.service_token = service_token
209

    
210
        if not astakos_url or not AstakosClient:
211
            self.astakosclient = DisabledAstakosClient(
212
                astakos_url,
213
                use_pool=True,
214
                pool_size=astakosclient_poolsize)
215
        else:
216
            self.astakosclient = AstakosClient(
217
                astakos_url,
218
                use_pool=True,
219
                pool_size=astakosclient_poolsize)
220

    
221
        self.serials = []
222
        self.messages = []
223

    
224
    def close(self):
225
        self.wrapper.close()
226
        self.queue.close()
227

    
228
    @property
229
    def using_external_quotaholder(self):
230
        return not isinstance(self.astakosclient, DisabledAstakosClient)
231

    
232
    def list_accounts(self, user, marker=None, limit=10000):
233
        """Return a list of accounts the user can access."""
234

    
235
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
236
        allowed = self._allowed_accounts(user)
237
        start, limit = self._list_limits(allowed, marker, limit)
238
        return allowed[start:start + limit]
239

    
240
    def get_account_meta(
241
            self, user, account, domain, until=None, include_user_defined=True,
242
            external_quota=None):
243
        """Return a dictionary with the account metadata for the domain."""
244

    
245
        logger.debug(
246
            "get_account_meta: %s %s %s %s", user, account, domain, until)
247
        path, node = self._lookup_account(account, user == account)
248
        if user != account:
249
            if until or node is None or account not in self._allowed_accounts(user):
250
                raise NotAllowedError
251
        try:
252
            props = self._get_properties(node, until)
253
            mtime = props[self.MTIME]
254
        except NameError:
255
            props = None
256
            mtime = until
257
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
258
        tstamp = max(tstamp, mtime)
259
        if until is None:
260
            modified = tstamp
261
        else:
262
            modified = self._get_statistics(
263
                node, compute=True)[2]  # Overall last modification.
264
            modified = max(modified, mtime)
265

    
266
        if user != account:
267
            meta = {'name': account}
268
        else:
269
            meta = {}
270
            if props is not None and include_user_defined:
271
                meta.update(
272
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
273
            if until is not None:
274
                meta.update({'until_timestamp': tstamp})
275
            meta.update({'name': account, 'count': count, 'bytes': bytes})
276
            if self.using_external_quotaholder:
277
                external_quota = external_quota or {}
278
                meta['bytes'] = external_quota.get('usage', 0)
279
        meta.update({'modified': modified})
280
        return meta
281

    
282
    def update_account_meta(self, user, account, domain, meta, replace=False):
283
        """Update the metadata associated with the account for the domain."""
284

    
285
        logger.debug("update_account_meta: %s %s %s %s %s", user,
286
                     account, domain, meta, replace)
287
        if user != account:
288
            raise NotAllowedError
289
        path, node = self._lookup_account(account, True)
290
        self._put_metadata(user, node, domain, meta, replace,
291
                           update_statistics_ancestors_depth=-1)
292

    
293
    def get_account_groups(self, user, account):
294
        """Return a dictionary with the user groups defined for this account."""
295

    
296
        logger.debug("get_account_groups: %s %s", user, account)
297
        if user != account:
298
            if account not in self._allowed_accounts(user):
299
                raise NotAllowedError
300
            return {}
301
        self._lookup_account(account, True)
302
        return self.permissions.group_dict(account)
303

    
304
    def update_account_groups(self, user, account, groups, replace=False):
305
        """Update the groups associated with the account."""
306

    
307
        logger.debug("update_account_groups: %s %s %s %s", user,
308
                     account, groups, replace)
309
        if user != account:
310
            raise NotAllowedError
311
        self._lookup_account(account, True)
312
        self._check_groups(groups)
313
        if replace:
314
            self.permissions.group_destroy(account)
315
        for k, v in groups.iteritems():
316
            if not replace:  # If not already deleted.
317
                self.permissions.group_delete(account, k)
318
            if v:
319
                self.permissions.group_addmany(account, k, v)
320

    
321
    def get_account_policy(self, user, account, external_quota=None):
322
        """Return a dictionary with the account policy."""
323

    
324
        logger.debug("get_account_policy: %s %s", user, account)
325
        if user != account:
326
            if account not in self._allowed_accounts(user):
327
                raise NotAllowedError
328
            return {}
329
        path, node = self._lookup_account(account, True)
330
        policy = self._get_policy(node, is_account_policy=True)
331
        if self.using_external_quotaholder:
332
            external_quota = external_quota or {}
333
            policy['quota'] = external_quota.get('limit', 0)
334
        return policy
335

    
336
    def update_account_policy(self, user, account, policy, replace=False):
337
        """Update the policy associated with the account."""
338

    
339
        logger.debug("update_account_policy: %s %s %s %s", user,
340
                     account, policy, replace)
341
        if user != account:
342
            raise NotAllowedError
343
        path, node = self._lookup_account(account, True)
344
        self._check_policy(policy, is_account_policy=True)
345
        self._put_policy(node, policy, replace, is_account_policy=True)
346

    
347
    def put_account(self, user, account, policy=None):
348
        """Create a new account with the given name."""
349

    
350
        logger.debug("put_account: %s %s %s", user, account, policy)
351
        policy = policy or {}
352
        if user != account:
353
            raise NotAllowedError
354
        node = self.node.node_lookup(account)
355
        if node is not None:
356
            raise AccountExists('Account already exists')
357
        if policy:
358
            self._check_policy(policy, is_account_policy=True)
359
        node = self._put_path(user, self.ROOTNODE, account,
360
                              update_statistics_ancestors_depth=-1)
361
        self._put_policy(node, policy, True, is_account_policy=True)
362

    
363
    def delete_account(self, user, account):
364
        """Delete the account with the given name."""
365

    
366
        logger.debug("delete_account: %s %s", user, account)
367
        if user != account:
368
            raise NotAllowedError
369
        node = self.node.node_lookup(account)
370
        if node is None:
371
            return
372
        if not self.node.node_remove(node,
373
                                     update_statistics_ancestors_depth=-1):
374
            raise AccountNotEmpty('Account is not empty')
375
        self.permissions.group_destroy(account)
376

    
377
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
378
        """Return a list of containers existing under an account."""
379

    
380
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
381
                     account, marker, limit, shared, until, public)
382
        if user != account:
383
            if until or account not in self._allowed_accounts(user):
384
                raise NotAllowedError
385
            allowed = self._allowed_containers(user, account)
386
            start, limit = self._list_limits(allowed, marker, limit)
387
            return allowed[start:start + limit]
388
        if shared or public:
389
            allowed = set()
390
            if shared:
391
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
392
            if public:
393
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
394
            allowed = sorted(allowed)
395
            start, limit = self._list_limits(allowed, marker, limit)
396
            return allowed[start:start + limit]
397
        node = self.node.node_lookup(account)
398
        containers = [x[0] for x in self._list_object_properties(
399
            node, account, '', '/', marker, limit, False, None, [], until)]
400
        start, limit = self._list_limits(
401
            [x[0] for x in containers], marker, limit)
402
        return containers[start:start + limit]
403

    
404
    def list_container_meta(self, user, account, container, domain, until=None):
405
        """Return a list with all the container's object meta keys for the domain."""
406

    
407
        logger.debug("list_container_meta: %s %s %s %s %s", user,
408
                     account, container, domain, until)
409
        allowed = []
410
        if user != account:
411
            if until:
412
                raise NotAllowedError
413
            allowed = self.permissions.access_list_paths(
414
                user, '/'.join((account, container)))
415
            if not allowed:
416
                raise NotAllowedError
417
        path, node = self._lookup_container(account, container)
418
        before = until if until is not None else inf
419
        allowed = self._get_formatted_paths(allowed)
420
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
421

    
422
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
423
        """Return a dictionary with the container metadata for the domain."""
424

    
425
        logger.debug("get_container_meta: %s %s %s %s %s", user,
426
                     account, container, domain, until)
427
        if user != account:
428
            if until or container not in self._allowed_containers(user, account):
429
                raise NotAllowedError
430
        path, node = self._lookup_container(account, container)
431
        props = self._get_properties(node, until)
432
        mtime = props[self.MTIME]
433
        count, bytes, tstamp = self._get_statistics(node, until)
434
        tstamp = max(tstamp, mtime)
435
        if until is None:
436
            modified = tstamp
437
        else:
438
            modified = self._get_statistics(
439
                node)[2]  # Overall last modification.
440
            modified = max(modified, mtime)
441

    
442
        if user != account:
443
            meta = {'name': container}
444
        else:
445
            meta = {}
446
            if include_user_defined:
447
                meta.update(
448
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
449
            if until is not None:
450
                meta.update({'until_timestamp': tstamp})
451
            meta.update({'name': container, 'count': count, 'bytes': bytes})
452
        meta.update({'modified': modified})
453
        return meta
454

    
455
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
456
        """Update the metadata associated with the container for the domain."""
457

    
458
        logger.debug("update_container_meta: %s %s %s %s %s %s",
459
                     user, account, container, domain, meta, replace)
460
        if user != account:
461
            raise NotAllowedError
462
        path, node = self._lookup_container(account, container)
463
        src_version_id, dest_version_id = self._put_metadata(
464
            user, node, domain, meta, replace,
465
            update_statistics_ancestors_depth=0)
466
        if src_version_id is not None:
467
            versioning = self._get_policy(
468
                node, is_account_policy=False)['versioning']
469
            if versioning != 'auto':
470
                self.node.version_remove(src_version_id,
471
                                         update_statistics_ancestors_depth=0)
472

    
473
    def get_container_policy(self, user, account, container):
474
        """Return a dictionary with the container policy."""
475

    
476
        logger.debug(
477
            "get_container_policy: %s %s %s", user, account, container)
478
        if user != account:
479
            if container not in self._allowed_containers(user, account):
480
                raise NotAllowedError
481
            return {}
482
        path, node = self._lookup_container(account, container)
483
        return self._get_policy(node, is_account_policy=False)
484

    
485
    def update_container_policy(self, user, account, container, policy, replace=False):
486
        """Update the policy associated with the container."""
487

    
488
        logger.debug("update_container_policy: %s %s %s %s %s",
489
                     user, account, container, policy, replace)
490
        if user != account:
491
            raise NotAllowedError
492
        path, node = self._lookup_container(account, container)
493
        self._check_policy(policy, is_account_policy=False)
494
        self._put_policy(node, policy, replace, is_account_policy=False)
495

    
496
    def put_container(self, user, account, container, policy=None):
497
        """Create a new container with the given name."""
498

    
499
        logger.debug(
500
            "put_container: %s %s %s %s", user, account, container, policy)
501
        policy = policy or {}
502
        if user != account:
503
            raise NotAllowedError
504
        try:
505
            path, node = self._lookup_container(account, container)
506
        except NameError:
507
            pass
508
        else:
509
            raise ContainerExists('Container already exists')
510
        if policy:
511
            self._check_policy(policy, is_account_policy=False)
512
        path = '/'.join((account, container))
513
        node = self._put_path(
514
            user, self._lookup_account(account, True)[1], path,
515
            update_statistics_ancestors_depth=-1)
516
        self._put_policy(node, policy, True, is_account_policy=False)
517

    
518
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
519
        """Delete/purge the container with the given name."""
520

    
521
        logger.debug("delete_container: %s %s %s %s %s %s", user,
522
                     account, container, until, prefix, delimiter)
523
        if user != account:
524
            raise NotAllowedError
525
        path, node = self._lookup_container(account, container)
526

    
527
        if until is not None:
528
            hashes, size, serials = self.node.node_purge_children(
529
                node, until, CLUSTER_HISTORY,
530
                update_statistics_ancestors_depth=0)
531
            for h in hashes:
532
                self.store.map_delete(h)
533
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
534
                                          update_statistics_ancestors_depth=0)
535
            if not self.free_versioning:
536
                self._report_size_change(
537
                    user, account, -size, {
538
                        'action':'container purge',
539
                        'path': path,
540
                        'versions': ','.join(str(i) for i in serials)
541
                    }
542
                )
543
            return
544

    
545
        if not delimiter:
546
            if self._get_statistics(node)[0] > 0:
547
                raise ContainerNotEmpty('Container is not empty')
548
            hashes, size, serials = self.node.node_purge_children(
549
                node, inf, CLUSTER_HISTORY,
550
                update_statistics_ancestors_depth=0)
551
            for h in hashes:
552
                self.store.map_delete(h)
553
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
554
                                          update_statistics_ancestors_depth=0)
555
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
556
            if not self.free_versioning:
557
                self._report_size_change(
558
                    user, account, -size, {
559
                        'action':'container purge',
560
                        'path': path,
561
                        'versions': ','.join(str(i) for i in serials)
562
                    }
563
                )
564
        else:
565
            # remove only contents
566
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
567
            paths = []
568
            for t in src_names:
569
                path = '/'.join((account, container, t[0]))
570
                node = t[2]
571
                src_version_id, dest_version_id = self._put_version_duplicate(
572
                    user, node, size=0, type='', hash=None, checksum='',
573
                    cluster=CLUSTER_DELETED,
574
                    update_statistics_ancestors_depth=1)
575
                del_size = self._apply_versioning(
576
                    account, container, src_version_id,
577
                    update_statistics_ancestors_depth=1)
578
                self._report_size_change(
579
                        user, account, -del_size, {
580
                                'action': 'object delete',
581
                                'path': path,
582
                        'versions': ','.join([str(dest_version_id)])
583
                     }
584
                )
585
                self._report_object_change(
586
                    user, account, path, details={'action': 'object delete'})
587
                paths.append(path)
588
            self.permissions.access_clear_bulk(paths)
589

    
590
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
591
        if user != account and until:
592
            raise NotAllowedError
593
        if shared and public:
594
            # get shared first
595
            shared_paths = self._list_object_permissions(
596
                user, account, container, prefix, shared=True, public=False)
597
            objects = set()
598
            if shared_paths:
599
                path, node = self._lookup_container(account, container)
600
                shared_paths = self._get_formatted_paths(shared_paths)
601
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
602

    
603
            # get public
604
            objects |= set(self._list_public_object_properties(
605
                user, account, container, prefix, all_props))
606
            objects = list(objects)
607

    
608
            objects.sort(key=lambda x: x[0])
609
            start, limit = self._list_limits(
610
                [x[0] for x in objects], marker, limit)
611
            return objects[start:start + limit]
612
        elif public:
613
            objects = self._list_public_object_properties(
614
                user, account, container, prefix, all_props)
615
            start, limit = self._list_limits(
616
                [x[0] for x in objects], marker, limit)
617
            return objects[start:start + limit]
618

    
619
        allowed = self._list_object_permissions(
620
            user, account, container, prefix, shared, public)
621
        if shared and not allowed:
622
            return []
623
        path, node = self._lookup_container(account, container)
624
        allowed = self._get_formatted_paths(allowed)
625
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
626
        start, limit = self._list_limits(
627
            [x[0] for x in objects], marker, limit)
628
        return objects[start:start + limit]
629

    
630
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
631
        public = self._list_object_permissions(
632
            user, account, container, prefix, shared=False, public=True)
633
        paths, nodes = self._lookup_objects(public)
634
        path = '/'.join((account, container))
635
        cont_prefix = path + '/'
636
        paths = [x[len(cont_prefix):] for x in paths]
637
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
638
        objects = [(path,) + props for path, props in zip(paths, props)]
639
        return objects
640

    
641
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
642
        objects = []
643
        while True:
644
            marker = objects[-1] if objects else None
645
            limit = 10000
646
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
647
            objects.extend(l)
648
            if not l or len(l) < limit:
649
                break
650
        return objects
651

    
652
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
653
        allowed = []
654
        path = '/'.join((account, container, prefix)).rstrip('/')
655
        if user != account:
656
            allowed = self.permissions.access_list_paths(user, path)
657
            if not allowed:
658
                raise NotAllowedError
659
        else:
660
            allowed = set()
661
            if shared:
662
                allowed.update(self.permissions.access_list_shared(path))
663
            if public:
664
                allowed.update(
665
                    [x[0] for x in self.permissions.public_list(path)])
666
            allowed = sorted(allowed)
667
            if not allowed:
668
                return []
669
        return allowed
670

    
671
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
672
        """Return a list of object (name, version_id) tuples existing under a container."""
673

    
674
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
675
        keys = keys or []
676
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
677

    
678
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
679
        """Return a list of object metadata dicts existing under a container."""
680

    
681
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
682
        keys = keys or []
683
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
684
        objects = []
685
        for p in props:
686
            if len(p) == 2:
687
                objects.append({'subdir': p[0]})
688
            else:
689
                objects.append({'name': p[0],
690
                                'bytes': p[self.SIZE + 1],
691
                                'type': p[self.TYPE + 1],
692
                                'hash': p[self.HASH + 1],
693
                                'version': p[self.SERIAL + 1],
694
                                'version_timestamp': p[self.MTIME + 1],
695
                                'modified': p[self.MTIME + 1] if until is None else None,
696
                                'modified_by': p[self.MUSER + 1],
697
                                'uuid': p[self.UUID + 1],
698
                                'checksum': p[self.CHECKSUM + 1]})
699
        return objects
700

    
701
    def list_object_permissions(self, user, account, container, prefix=''):
702
        """Return a list of paths that enforce permissions under a container."""
703

    
704
        logger.debug("list_object_permissions: %s %s %s %s", user,
705
                     account, container, prefix)
706
        return self._list_object_permissions(user, account, container, prefix, True, False)
707

    
708
    def list_object_public(self, user, account, container, prefix=''):
709
        """Return a dict mapping paths to public ids for objects that are public under a container."""
710

    
711
        logger.debug("list_object_public: %s %s %s %s", user,
712
                     account, container, prefix)
713
        public = {}
714
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
715
            public[path] = p
716
        return public
717

    
718
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
719
        """Return a dictionary with the object metadata for the domain."""
720

    
721
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
722
                     account, container, name, domain, version)
723
        self._can_read(user, account, container, name)
724
        path, node = self._lookup_object(account, container, name)
725
        props = self._get_version(node, version)
726
        if version is None:
727
            modified = props[self.MTIME]
728
        else:
729
            try:
730
                modified = self._get_version(
731
                    node)[self.MTIME]  # Overall last modification.
732
            except NameError:  # Object may be deleted.
733
                del_props = self.node.version_lookup(
734
                    node, inf, CLUSTER_DELETED)
735
                if del_props is None:
736
                    raise ItemNotExists('Object does not exist')
737
                modified = del_props[self.MTIME]
738

    
739
        meta = {}
740
        if include_user_defined:
741
            meta.update(
742
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
743
        meta.update({'name': name,
744
                     'bytes': props[self.SIZE],
745
                     'type': props[self.TYPE],
746
                     'hash': props[self.HASH],
747
                     'version': props[self.SERIAL],
748
                     'version_timestamp': props[self.MTIME],
749
                     'modified': modified,
750
                     'modified_by': props[self.MUSER],
751
                     'uuid': props[self.UUID],
752
                     'checksum': props[self.CHECKSUM]})
753
        return meta
754

    
755
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
756
        """Update the metadata associated with the object for the domain and return the new version."""
757

    
758
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
759
                     user, account, container, name, domain, meta, replace)
760
        self._can_write(user, account, container, name)
761
        path, node = self._lookup_object(account, container, name)
762
        src_version_id, dest_version_id = self._put_metadata(
763
            user, node, domain, meta, replace,
764
            update_statistics_ancestors_depth=1)
765
        self._apply_versioning(account, container, src_version_id,
766
                               update_statistics_ancestors_depth=1)
767
        return dest_version_id
768

    
769
    def get_object_permissions(self, user, account, container, name):
770
        """Return the action allowed on the object, the path
771
        from which the object gets its permissions from,
772
        along with a dictionary containing the permissions."""
773

    
774
        logger.debug("get_object_permissions: %s %s %s %s", user,
775
                     account, container, name)
776
        allowed = 'write'
777
        permissions_path = self._get_permissions_path(account, container, name)
778
        if user != account:
779
            if self.permissions.access_check(permissions_path, self.WRITE, user):
780
                allowed = 'write'
781
            elif self.permissions.access_check(permissions_path, self.READ, user):
782
                allowed = 'read'
783
            else:
784
                raise NotAllowedError
785
        self._lookup_object(account, container, name)
786
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
787

    
788
    def update_object_permissions(self, user, account, container, name, permissions):
789
        """Update the permissions associated with the object."""
790

    
791
        logger.debug("update_object_permissions: %s %s %s %s %s",
792
                     user, account, container, name, permissions)
793
        if user != account:
794
            raise NotAllowedError
795
        path = self._lookup_object(account, container, name)[0]
796
        self._check_permissions(path, permissions)
797
        self.permissions.access_set(path, permissions)
798
        self._report_sharing_change(user, account, path, {'members':
799
                                    self.permissions.access_members(path)})
800

    
801
    def get_object_public(self, user, account, container, name):
802
        """Return the public id of the object if applicable."""
803

    
804
        logger.debug(
805
            "get_object_public: %s %s %s %s", user, account, container, name)
806
        self._can_read(user, account, container, name)
807
        path = self._lookup_object(account, container, name)[0]
808
        p = self.permissions.public_get(path)
809
        return p
810

    
811
    def update_object_public(self, user, account, container, name, public):
812
        """Update the public status of the object."""
813

    
814
        logger.debug("update_object_public: %s %s %s %s %s", user,
815
                     account, container, name, public)
816
        self._can_write(user, account, container, name)
817
        path = self._lookup_object(account, container, name)[0]
818
        if not public:
819
            self.permissions.public_unset(path)
820
        else:
821
            self.permissions.public_set(
822
                path, self.public_url_security, self.public_url_alphabet
823
            )
824

    
825
    def get_object_hashmap(self, user, account, container, name, version=None):
826
        """Return the object's size and a list with partial hashes."""
827

    
828
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
829
                     account, container, name, version)
830
        self._can_read(user, account, container, name)
831
        path, node = self._lookup_object(account, container, name)
832
        props = self._get_version(node, version)
833
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
834
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
835

    
836
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
837
        if permissions is not None and user != account:
838
            raise NotAllowedError
839
        self._can_write(user, account, container, name)
840
        if permissions is not None:
841
            path = '/'.join((account, container, name))
842
            self._check_permissions(path, permissions)
843

    
844
        account_path, account_node = self._lookup_account(account, True)
845
        container_path, container_node = self._lookup_container(
846
            account, container)
847

    
848
        path, node = self._put_object_node(
849
            container_path, container_node, name)
850
        pre_version_id, dest_version_id = self._put_version_duplicate(
851
            user, node, src_node=src_node, size=size, type=type, hash=hash,
852
            checksum=checksum, is_copy=is_copy,
853
            update_statistics_ancestors_depth=1)
854

    
855
        # Handle meta.
856
        if src_version_id is None:
857
            src_version_id = pre_version_id
858
        self._put_metadata_duplicate(
859
            src_version_id, dest_version_id, domain, meta, replace_meta)
860

    
861
        del_size = self._apply_versioning(account, container, pre_version_id,
862
                                          update_statistics_ancestors_depth=1)
863
        size_delta = size - del_size
864
        if size_delta > 0:
865
            # Check account quota.
866
            if not self.using_external_quotaholder:
867
                account_quota = long(
868
                    self._get_policy(account_node, is_account_policy=True
869
                    )['quota']
870
                )
871
                account_usage = self._get_statistics(account_node, compute=True)[1]
872
                if (account_quota > 0 and account_usage > account_quota):
873
                    raise QuotaError(
874
                        'Account quota exceeded: limit: %s, usage: %s' % (
875
                            account_quota, account_usage
876
                        )
877
                    )
878

    
879
            # Check container quota.
880
            container_quota = long(
881
                self._get_policy(container_node, is_account_policy=False
882
                )['quota']
883
            )
884
            container_usage = self._get_statistics(container_node)[1]
885
            if (container_quota > 0 and container_usage > container_quota):
886
                # This must be executed in a transaction, so the version is
887
                # never created if it fails.
888
                raise QuotaError(
889
                    'Container quota exceeded: limit: %s, usage: %s' % (
890
                        container_quota, container_usage
891
                    )
892
                )
893

    
894
        self._report_size_change(user, account, size_delta,
895
                                 {'action': 'object update', 'path': path,
896
                                  'versions': ','.join([str(dest_version_id)])})
897
        if permissions is not None:
898
            self.permissions.access_set(path, permissions)
899
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
900

    
901
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
902
        return dest_version_id
903

    
904
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
905
        """Create/update an object with the specified size and partial hashes."""
906

    
907
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
908
                     account, container, name, size, type, hashmap, checksum)
909
        meta = meta or {}
910
        if size == 0:  # No such thing as an empty hashmap.
911
            hashmap = [self.put_block('')]
912
        map = HashMap(self.block_size, self.hash_algorithm)
913
        map.extend([binascii.unhexlify(x) for x in hashmap])
914
        missing = self.store.block_search(map)
915
        if missing:
916
            ie = IndexError()
917
            ie.data = [binascii.hexlify(x) for x in missing]
918
            raise ie
919

    
920
        hash = map.hash()
921
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
922
        self.store.map_put(hash, map)
923
        return dest_version_id
924

    
925
    def update_object_checksum(self, user, account, container, name, version, checksum):
926
        """Update an object's checksum."""
927

    
928
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
929
                     user, account, container, name, version, checksum)
930
        # Update objects with greater version and same hashmap and size (fix metadata updates).
931
        self._can_write(user, account, container, name)
932
        path, node = self._lookup_object(account, container, name)
933
        props = self._get_version(node, version)
934
        versions = self.node.node_get_versions(node)
935
        for x in versions:
936
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
937
                self.node.version_put_property(
938
                    x[self.SERIAL], 'checksum', checksum)
939

    
940
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
941
        dest_meta = dest_meta or {}
942
        dest_version_ids = []
943
        self._can_read(user, src_account, src_container, src_name)
944
        path, node = self._lookup_object(src_account, src_container, src_name)
945
        # TODO: Will do another fetch of the properties in duplicate version...
946
        props = self._get_version(
947
            node, src_version)  # Check to see if source exists.
948
        src_version_id = props[self.SERIAL]
949
        hash = props[self.HASH]
950
        size = props[self.SIZE]
951
        is_copy = not is_move and (src_account, src_container, src_name) != (
952
            dest_account, dest_container, dest_name)  # New uuid.
953
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
954
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
955
            self._delete_object(user, src_account, src_container, src_name)
956

    
957
        if delimiter:
958
            prefix = src_name + \
959
                delimiter if not src_name.endswith(delimiter) else src_name
960
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
961
            src_names.sort(key=lambda x: x[2])  # order by nodes
962
            paths = [elem[0] for elem in src_names]
963
            nodes = [elem[2] for elem in src_names]
964
            # TODO: Will do another fetch of the properties in duplicate version...
965
            props = self._get_versions(nodes)  # Check to see if source exists.
966

    
967
            for prop, path, node in zip(props, paths, nodes):
968
                src_version_id = prop[self.SERIAL]
969
                hash = prop[self.HASH]
970
                vtype = prop[self.TYPE]
971
                size = prop[self.SIZE]
972
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
973
                    delimiter) else dest_name
974
                vdest_name = path.replace(prefix, dest_prefix, 1)
975
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
976
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
977
                    self._delete_object(user, src_account, src_container, path)
978
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
979

    
980
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
981
        """Copy an object's data and metadata."""
982

    
983
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
984
        meta = meta or {}
985
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
986
        return dest_version_id
987

    
988
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
989
        """Move an object's data and metadata."""
990

    
991
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
992
        meta = meta or {}
993
        if user != src_account:
994
            raise NotAllowedError
995
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
996
        return dest_version_id
997

    
998
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
999
        if user != account:
1000
            raise NotAllowedError
1001

    
1002
        if until is not None:
1003
            path = '/'.join((account, container, name))
1004
            node = self.node.node_lookup(path)
1005
            if node is None:
1006
                return
1007
            hashes = []
1008
            size = 0
1009
            serials = []
1010
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1011
                                           update_statistics_ancestors_depth=1)
1012
            hashes += h
1013
            size += s
1014
            serials += v
1015
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1016
                                           update_statistics_ancestors_depth=1)
1017
            hashes += h
1018
            if not self.free_versioning:
1019
                size += s
1020
            serials += v
1021
            for h in hashes:
1022
                self.store.map_delete(h)
1023
            self.node.node_purge(node, until, CLUSTER_DELETED,
1024
                                 update_statistics_ancestors_depth=1)
1025
            try:
1026
                props = self._get_version(node)
1027
            except NameError:
1028
                self.permissions.access_clear(path)
1029
            self._report_size_change(
1030
                user, account, -size, {
1031
                    'action': 'object purge',
1032
                    'path': path,
1033
                    'versions': ','.join(str(i) for i in serials)
1034
                }
1035
            )
1036
            return
1037

    
1038
        path, node = self._lookup_object(account, container, name)
1039
        src_version_id, dest_version_id = self._put_version_duplicate(
1040
            user, node, size=0, type='', hash=None, checksum='',
1041
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1042
        del_size = self._apply_versioning(account, container, src_version_id,
1043
                                          update_statistics_ancestors_depth=1)
1044
        self._report_size_change(user, account, -del_size,
1045
                                 {'action': 'object delete', 'path': path,
1046
                                  'versions': ','.join([str(dest_version_id)])})
1047
        self._report_object_change(
1048
            user, account, path, details={'action': 'object delete'})
1049
        self.permissions.access_clear(path)
1050

    
1051
        if delimiter:
1052
            prefix = name + delimiter if not name.endswith(delimiter) else name
1053
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1054
            paths = []
1055
            for t in src_names:
1056
                path = '/'.join((account, container, t[0]))
1057
                node = t[2]
1058
                src_version_id, dest_version_id = self._put_version_duplicate(
1059
                    user, node, size=0, type='', hash=None, checksum='',
1060
                    cluster=CLUSTER_DELETED,
1061
                    update_statistics_ancestors_depth=1)
1062
                del_size = self._apply_versioning(
1063
                    account, container, src_version_id,
1064
                    update_statistics_ancestors_depth=1)
1065
                self._report_size_change(user, account, -del_size,
1066
                                         {'action': 'object delete',
1067
                                          'path': path,
1068
                                          'versions': ','.join([str(dest_version_id)])})
1069
                self._report_object_change(
1070
                    user, account, path, details={'action': 'object delete'})
1071
                paths.append(path)
1072
            self.permissions.access_clear_bulk(paths)
1073

    
1074
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1075
        """Delete/purge an object."""
1076

    
1077
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1078
                     account, container, name, until, prefix, delimiter)
1079
        self._delete_object(user, account, container, name, until, delimiter)
1080

    
1081
    def list_versions(self, user, account, container, name):
1082
        """Return a list of all (version, version_timestamp) tuples for an object."""
1083

    
1084
        logger.debug(
1085
            "list_versions: %s %s %s %s", user, account, container, name)
1086
        self._can_read(user, account, container, name)
1087
        path, node = self._lookup_object(account, container, name)
1088
        versions = self.node.node_get_versions(node)
1089
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1090

    
1091
    def get_uuid(self, user, uuid):
1092
        """Return the (account, container, name) for the UUID given."""
1093

    
1094
        logger.debug("get_uuid: %s %s", user, uuid)
1095
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1096
        if info is None:
1097
            raise NameError
1098
        path, serial = info
1099
        account, container, name = path.split('/', 2)
1100
        self._can_read(user, account, container, name)
1101
        return (account, container, name)
1102

    
1103
    def get_public(self, user, public):
1104
        """Return the (account, container, name) for the public id given."""
1105

    
1106
        logger.debug("get_public: %s %s", user, public)
1107
        path = self.permissions.public_path(public)
1108
        if path is None:
1109
            raise NameError
1110
        account, container, name = path.split('/', 2)
1111
        self._can_read(user, account, container, name)
1112
        return (account, container, name)
1113

    
1114
    def get_block(self, hash):
1115
        """Return a block's data."""
1116

    
1117
        logger.debug("get_block: %s", hash)
1118
        block = self.store.block_get(binascii.unhexlify(hash))
1119
        if not block:
1120
            raise ItemNotExists('Block does not exist')
1121
        return block
1122

    
1123
    def put_block(self, data):
1124
        """Store a block and return the hash."""
1125

    
1126
        logger.debug("put_block: %s", len(data))
1127
        return binascii.hexlify(self.store.block_put(data))
1128

    
1129
    def update_block(self, hash, data, offset=0):
1130
        """Update a known block and return the hash."""
1131

    
1132
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1133
        if offset == 0 and len(data) == self.block_size:
1134
            return self.put_block(data)
1135
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1136
        return binascii.hexlify(h)
1137

    
1138
    # Path functions.
1139

    
1140
    def _generate_uuid(self):
1141
        return str(uuidlib.uuid4())
1142

    
1143
    def _put_object_node(self, path, parent, name):
1144
        path = '/'.join((path, name))
1145
        node = self.node.node_lookup(path)
1146
        if node is None:
1147
            node = self.node.node_create(parent, path)
1148
        return path, node
1149

    
1150
    def _put_path(self, user, parent, path,
1151
                  update_statistics_ancestors_depth=None):
1152
        node = self.node.node_create(parent, path)
1153
        self.node.version_create(node, None, 0, '', None, user,
1154
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1155
                                 update_statistics_ancestors_depth)
1156
        return node
1157

    
1158
    def _lookup_account(self, account, create=True):
1159
        node = self.node.node_lookup(account)
1160
        if node is None and create:
1161
            node = self._put_path(
1162
                account, self.ROOTNODE, account,
1163
                update_statistics_ancestors_depth=-1)  # User is account.
1164
        return account, node
1165

    
1166
    def _lookup_container(self, account, container):
1167
        path = '/'.join((account, container))
1168
        node = self.node.node_lookup(path, for_update=True)
1169
        if node is None:
1170
            raise ItemNotExists('Container does not exist')
1171
        return path, node
1172

    
1173
    def _lookup_object(self, account, container, name):
1174
        path = '/'.join((account, container, name))
1175
        node = self.node.node_lookup(path)
1176
        if node is None:
1177
            raise ItemNotExists('Object does not exist')
1178
        return path, node
1179

    
1180
    def _lookup_objects(self, paths):
1181
        nodes = self.node.node_lookup_bulk(paths)
1182
        return paths, nodes
1183

    
1184
    def _get_properties(self, node, until=None):
1185
        """Return properties until the timestamp given."""
1186

    
1187
        before = until if until is not None else inf
1188
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1189
        if props is None and until is not None:
1190
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1191
        if props is None:
1192
            raise ItemNotExists('Path does not exist')
1193
        return props
1194

    
1195
    def _get_statistics(self, node, until=None, compute=False):
1196
        """Return count, sum of size and latest timestamp of everything under node."""
1197

    
1198
        if until is not None:
1199
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1200
        elif compute:
1201
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1202
        else:
1203
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1204
        if stats is None:
1205
            stats = (0, 0, 0)
1206
        return stats
1207

    
1208
    def _get_version(self, node, version=None):
1209
        if version is None:
1210
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1211
            if props is None:
1212
                raise ItemNotExists('Object does not exist')
1213
        else:
1214
            try:
1215
                version = int(version)
1216
            except ValueError:
1217
                raise VersionNotExists('Version does not exist')
1218
            props = self.node.version_get_properties(version)
1219
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1220
                raise VersionNotExists('Version does not exist')
1221
        return props
1222

    
1223
    def _get_versions(self, nodes):
1224
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1225

    
1226
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1227
                               type=None, hash=None, checksum=None,
1228
                               cluster=CLUSTER_NORMAL, is_copy=False,
1229
                               update_statistics_ancestors_depth=None):
1230
        """Create a new version of the node."""
1231

    
1232
        props = self.node.version_lookup(
1233
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1234
        if props is not None:
1235
            src_version_id = props[self.SERIAL]
1236
            src_hash = props[self.HASH]
1237
            src_size = props[self.SIZE]
1238
            src_type = props[self.TYPE]
1239
            src_checksum = props[self.CHECKSUM]
1240
        else:
1241
            src_version_id = None
1242
            src_hash = None
1243
            src_size = 0
1244
            src_type = ''
1245
            src_checksum = ''
1246
        if size is None:  # Set metadata.
1247
            hash = src_hash  # This way hash can be set to None (account or container).
1248
            size = src_size
1249
        if type is None:
1250
            type = src_type
1251
        if checksum is None:
1252
            checksum = src_checksum
1253
        uuid = self._generate_uuid(
1254
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1255

    
1256
        if src_node is None:
1257
            pre_version_id = src_version_id
1258
        else:
1259
            pre_version_id = None
1260
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1261
            if props is not None:
1262
                pre_version_id = props[self.SERIAL]
1263
        if pre_version_id is not None:
1264
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1265
                                        update_statistics_ancestors_depth)
1266

    
1267
        dest_version_id, mtime = self.node.version_create(
1268
            node, hash, size, type, src_version_id, user, uuid, checksum,
1269
            cluster, update_statistics_ancestors_depth)
1270
        return pre_version_id, dest_version_id
1271

    
1272
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1273
        if src_version_id is not None:
1274
            self.node.attribute_copy(src_version_id, dest_version_id)
1275
        if not replace:
1276
            self.node.attribute_del(dest_version_id, domain, (
1277
                k for k, v in meta.iteritems() if v == ''))
1278
            self.node.attribute_set(dest_version_id, domain, (
1279
                (k, v) for k, v in meta.iteritems() if v != ''))
1280
        else:
1281
            self.node.attribute_del(dest_version_id, domain)
1282
            self.node.attribute_set(dest_version_id, domain, ((
1283
                k, v) for k, v in meta.iteritems()))
1284

    
1285
    def _put_metadata(self, user, node, domain, meta, replace=False,
1286
                      update_statistics_ancestors_depth=None):
1287
        """Create a new version and store metadata."""
1288

    
1289
        src_version_id, dest_version_id = self._put_version_duplicate(
1290
            user, node,
1291
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1292
        self._put_metadata_duplicate(
1293
            src_version_id, dest_version_id, domain, meta, replace)
1294
        return src_version_id, dest_version_id
1295

    
1296
    def _list_limits(self, listing, marker, limit):
1297
        start = 0
1298
        if marker:
1299
            try:
1300
                start = listing.index(marker) + 1
1301
            except ValueError:
1302
                pass
1303
        if not limit or limit > 10000:
1304
            limit = 10000
1305
        return start, limit
1306

    
1307
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1308
        keys = keys or []
1309
        allowed = allowed or []
1310
        cont_prefix = path + '/'
1311
        prefix = cont_prefix + prefix
1312
        start = cont_prefix + marker if marker else None
1313
        before = until if until is not None else inf
1314
        filterq = keys if domain else []
1315
        sizeq = size_range
1316

    
1317
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1318
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1319
        objects.sort(key=lambda x: x[0])
1320
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1321
        return objects
1322

    
1323
    # Reporting functions.
1324

    
1325
    def _report_size_change(self, user, account, size, details=None):
1326
        details = details or {}
1327

    
1328
        if size == 0:
1329
            return
1330

    
1331
        account_node = self._lookup_account(account, True)[1]
1332
        total = self._get_statistics(account_node, compute=True)[1]
1333
        details.update({'user': user, 'total': total})
1334
        logger.debug(
1335
            "_report_size_change: %s %s %s %s", user, account, size, details)
1336
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1337
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1338
                              float(size), details))
1339

    
1340
        if not self.using_external_quotaholder:
1341
            return
1342

    
1343
        try:
1344
            name = details['path'] if 'path' in details else ''
1345
            serial = self.astakosclient.issue_one_commission(
1346
                token=self.service_token,
1347
                holder=account,
1348
                source=DEFAULT_SOURCE,
1349
                provisions={'pithos.diskspace': size},
1350
                name=name
1351
                )
1352
        except BaseException, e:
1353
            raise QuotaError(e)
1354
        else:
1355
            self.serials.append(serial)
1356

    
1357
    def _report_object_change(self, user, account, path, details=None):
1358
        details = details or {}
1359
        details.update({'user': user})
1360
        logger.debug("_report_object_change: %s %s %s %s", user,
1361
                     account, path, details)
1362
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1363
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1364

    
1365
    def _report_sharing_change(self, user, account, path, details=None):
1366
        logger.debug("_report_permissions_change: %s %s %s %s",
1367
                     user, account, path, details)
1368
        details = details or {}
1369
        details.update({'user': user})
1370
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1371
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1372

    
1373
    # Policy functions.
1374

    
1375
    def _check_policy(self, policy, is_account_policy=True):
1376
        default_policy = self.default_account_policy \
1377
            if is_account_policy else self.default_container_policy
1378
        for k in policy.keys():
1379
            if policy[k] == '':
1380
                policy[k] = default_policy.get(k)
1381
        for k, v in policy.iteritems():
1382
            if k == 'quota':
1383
                q = int(v)  # May raise ValueError.
1384
                if q < 0:
1385
                    raise ValueError
1386
            elif k == 'versioning':
1387
                if v not in ['auto', 'none']:
1388
                    raise ValueError
1389
            else:
1390
                raise ValueError
1391

    
1392
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1393
        default_policy = self.default_account_policy \
1394
            if is_account_policy else self.default_container_policy
1395
        if replace:
1396
            for k, v in default_policy.iteritems():
1397
                if k not in policy:
1398
                    policy[k] = v
1399
        self.node.policy_set(node, policy)
1400

    
1401
    def _get_policy(self, node, is_account_policy=True):
1402
        default_policy = self.default_account_policy \
1403
            if is_account_policy else self.default_container_policy
1404
        policy = default_policy.copy()
1405
        policy.update(self.node.policy_get(node))
1406
        return policy
1407

    
1408
    def _apply_versioning(self, account, container, version_id,
1409
                          update_statistics_ancestors_depth=None):
1410
        """Delete the provided version if such is the policy.
1411
           Return size of object removed.
1412
        """
1413

    
1414
        if version_id is None:
1415
            return 0
1416
        path, node = self._lookup_container(account, container)
1417
        versioning = self._get_policy(
1418
            node, is_account_policy=False)['versioning']
1419
        if versioning != 'auto':
1420
            hash, size = self.node.version_remove(
1421
                version_id, update_statistics_ancestors_depth)
1422
            self.store.map_delete(hash)
1423
            return size
1424
        elif self.free_versioning:
1425
            return self.node.version_get_properties(
1426
                version_id, keys=('size',))[0]
1427
        return 0
1428

    
1429
    # Access control functions.
1430

    
1431
    def _check_groups(self, groups):
1432
        # raise ValueError('Bad characters in groups')
1433
        pass
1434

    
1435
    def _check_permissions(self, path, permissions):
1436
        # raise ValueError('Bad characters in permissions')
1437
        pass
1438

    
1439
    def _get_formatted_paths(self, paths):
1440
        formatted = []
1441
        for p in paths:
1442
            node = self.node.node_lookup(p)
1443
            props = None
1444
            if node is not None:
1445
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1446
            if props is not None:
1447
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1448
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1449
                formatted.append((p, self.MATCH_EXACT))
1450
        return formatted
1451

    
1452
    def _get_permissions_path(self, account, container, name):
1453
        path = '/'.join((account, container, name))
1454
        permission_paths = self.permissions.access_inherit(path)
1455
        permission_paths.sort()
1456
        permission_paths.reverse()
1457
        for p in permission_paths:
1458
            if p == path:
1459
                return p
1460
            else:
1461
                if p.count('/') < 2:
1462
                    continue
1463
                node = self.node.node_lookup(p)
1464
                props = None
1465
                if node is not None:
1466
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1467
                if props is not None:
1468
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1469
                        return p
1470
        return None
1471

    
1472
    def _can_read(self, user, account, container, name):
1473
        if user == account:
1474
            return True
1475
        path = '/'.join((account, container, name))
1476
        if self.permissions.public_get(path) is not None:
1477
            return True
1478
        path = self._get_permissions_path(account, container, name)
1479
        if not path:
1480
            raise NotAllowedError
1481
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1482
            raise NotAllowedError
1483

    
1484
    def _can_write(self, user, account, container, name):
1485
        if user == account:
1486
            return True
1487
        path = '/'.join((account, container, name))
1488
        path = self._get_permissions_path(account, container, name)
1489
        if not path:
1490
            raise NotAllowedError
1491
        if not self.permissions.access_check(path, self.WRITE, user):
1492
            raise NotAllowedError
1493

    
1494
    def _allowed_accounts(self, user):
1495
        allow = set()
1496
        for path in self.permissions.access_list_paths(user):
1497
            allow.add(path.split('/', 1)[0])
1498
        return sorted(allow)
1499

    
1500
    def _allowed_containers(self, user, account):
1501
        allow = set()
1502
        for path in self.permissions.access_list_paths(user, account):
1503
            allow.add(path.split('/', 2)[1])
1504
        return sorted(allow)
1505

    
1506
    # Domain functions
1507

    
1508
    def get_domain_objects(self, domain, user=None):
1509
        allowed_paths = self.permissions.access_list_paths(user)
1510
        if not allowed_paths:
1511
            return []
1512
        obj_list = self.node.domain_object_list(
1513
            domain, allowed_paths, CLUSTER_NORMAL)
1514
        return [(path,
1515
                 self._build_metadata(props, user_defined_meta),
1516
                 self.permissions.access_get(path)) for
1517
                path, props, user_defined_meta in obj_list]
1518

    
1519
    # util functions
1520

    
1521
    def _build_metadata(self, props, user_defined=None,
1522
                        include_user_defined=True):
1523
        meta = {'bytes': props[self.SIZE],
1524
                'type': props[self.TYPE],
1525
                'hash': props[self.HASH],
1526
                'version': props[self.SERIAL],
1527
                'version_timestamp': props[self.MTIME],
1528
                'modified_by': props[self.MUSER],
1529
                'uuid': props[self.UUID],
1530
                'checksum': props[self.CHECKSUM]}
1531
        if include_user_defined and user_defined != None:
1532
            meta.update(user_defined)
1533
        return meta
1534

    
1535
    def _has_read_access(self, user, path):
1536
        try:
1537
            account, container, object = path.split('/', 2)
1538
        except ValueError:
1539
            raise ValueError('Invalid object path')
1540

    
1541
        assert isinstance(user, basestring), "Invalid user"
1542

    
1543
        try:
1544
            self._can_read(user, account, container, object)
1545
        except NotAllowedError:
1546
            return False
1547
        else:
1548
            return True