Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ a63f36a2

History | View | Annotate | Download (69.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
98
DEFAULT_HASH_ALGORITHM = 'sha256'
99
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
100
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
101
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
102
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
103
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
104
                               'abcdefghijklmnopqrstuvwxyz'
105
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
106
DEFAULT_PUBLIC_URL_SECURITY = 16
107

    
108
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
109
QUEUE_CLIENT_ID = 'pithos'
110
QUEUE_INSTANCE_ID = '1'
111

    
112
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
113

    
114
inf = float('inf')
115

    
116
ULTIMATE_ANSWER = 42
117

    
118
DEFAULT_SOURCE = 'system'
119

    
120
logger = logging.getLogger(__name__)
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 block_size=None, hash_algorithm=None,
132
                 queue_module=None, queue_hosts=None, queue_exchange=None,
133
                 astakos_url=None, service_token=None,
134
                 astakosclient_poolsize=None,
135
                 free_versioning=True, block_params=None,
136
                 public_url_security=None,
137
                 public_url_alphabet=None,
138
                 account_quota_policy=None,
139
                 container_quota_policy=None,
140
                 container_versioning_policy=None):
141
        db_module = db_module or DEFAULT_DB_MODULE
142
        db_connection = db_connection or DEFAULT_DB_CONNECTION
143
        block_module = block_module or DEFAULT_BLOCK_MODULE
144
        block_path = block_path or DEFAULT_BLOCK_PATH
145
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
146
        block_params = block_params or DEFAULT_BLOCK_PARAMS
147
        block_size = block_size or DEFAULT_BLOCK_SIZE
148
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
149
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
150
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
151
        container_quota_policy = container_quota_policy \
152
            or DEFAULT_CONTAINER_QUOTA
153
        container_versioning_policy = container_versioning_policy \
154
            or DEFAULT_CONTAINER_VERSIONING
155

    
156
        self.default_account_policy = {'quota': account_quota_policy}
157
        self.default_container_policy = {
158
            'quota': container_quota_policy,
159
            'versioning': container_versioning_policy
160
        }
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.public_url_security = (public_url_security or
165
            DEFAULT_PUBLIC_URL_SECURITY)
166
        self.public_url_alphabet = (public_url_alphabet or
167
            DEFAULT_PUBLIC_URL_ALPHABET)
168

    
169
        self.hash_algorithm = hash_algorithm
170
        self.block_size = block_size
171
        self.free_versioning = free_versioning
172

    
173
        def load_module(m):
174
            __import__(m)
175
            return sys.modules[m]
176

    
177
        self.db_module = load_module(db_module)
178
        self.wrapper = self.db_module.DBWrapper(db_connection)
179
        params = {'wrapper': self.wrapper}
180
        self.permissions = self.db_module.Permissions(**params)
181
        self.config = self.db_module.Config(**params)
182
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
183
        for x in ['READ', 'WRITE']:
184
            setattr(self, x, getattr(self.db_module, x))
185
        self.node = self.db_module.Node(**params)
186
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
187
            setattr(self, x, getattr(self.db_module, x))
188

    
189
        self.block_module = load_module(block_module)
190
        self.block_params = block_params
191
        params = {'path': block_path,
192
                  'block_size': self.block_size,
193
                  'hash_algorithm': self.hash_algorithm,
194
                  'umask': block_umask}
195
        params.update(self.block_params)
196
        self.store = self.block_module.Store(**params)
197

    
198
        if queue_module and queue_hosts:
199
            self.queue_module = load_module(queue_module)
200
            params = {'hosts': queue_hosts,
201
                      'exchange': queue_exchange,
202
                      'client_id': QUEUE_CLIENT_ID}
203
            self.queue = self.queue_module.Queue(**params)
204
        else:
205
            class NoQueue:
206
                def send(self, *args):
207
                    pass
208

    
209
                def close(self):
210
                    pass
211

    
212
            self.queue = NoQueue()
213

    
214
        self.astakos_url = astakos_url
215
        self.service_token = service_token
216

    
217
        if not astakos_url or not AstakosClient:
218
            self.astakosclient = DisabledAstakosClient(
219
                astakos_url,
220
                use_pool=True,
221
                pool_size=astakosclient_poolsize)
222
        else:
223
            self.astakosclient = AstakosClient(
224
                astakos_url,
225
                use_pool=True,
226
                pool_size=astakosclient_poolsize)
227

    
228
        self.serials = []
229
        self.messages = []
230

    
231
    def pre_exec(self):
232
        self.wrapper.execute()
233

    
234
    def post_exec(self, success_status=True):
235
        if success_status:
236
            # send messages produced
237
            for m in self.messages:
238
                self.queue.send(*m)
239

    
240
            # register serials
241
            if self.serials:
242
                self.commission_serials.insert_many(
243
                    self.serials)
244

    
245
                # commit to ensure that the serials are registered
246
                # even if resolve commission fails
247
                self.wrapper.commit()
248

    
249
                # start new transaction
250
                self.wrapper.execute()
251

    
252
                r = self.astakosclient.resolve_commissions(
253
                            token=self.service_token,
254
                            accept_serials=self.serials,
255
                            reject_serials=[])
256
                self.commission_serials.delete_many(
257
                    r['accepted'])
258

    
259
            self.wrapper.commit()
260
        else:
261
            if self.serials:
262
                self.astakosclient.resolve_commissions(
263
                    token=self.service_token,
264
                    accept_serials=[],
265
                    reject_serials=self.serials)
266
            self.wrapper.rollback()
267

    
268
    def close(self):
269
        self.wrapper.close()
270
        self.queue.close()
271

    
272
    @property
273
    def using_external_quotaholder(self):
274
        return not isinstance(self.astakosclient, DisabledAstakosClient)
275

    
276
    def list_accounts(self, user, marker=None, limit=10000):
277
        """Return a list of accounts the user can access."""
278

    
279
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
280
        allowed = self._allowed_accounts(user)
281
        start, limit = self._list_limits(allowed, marker, limit)
282
        return allowed[start:start + limit]
283

    
284
    def get_account_meta(
285
            self, user, account, domain, until=None, include_user_defined=True,
286
            external_quota=None):
287
        """Return a dictionary with the account metadata for the domain."""
288

    
289
        logger.debug(
290
            "get_account_meta: %s %s %s %s", user, account, domain, until)
291
        path, node = self._lookup_account(account, user == account)
292
        if user != account:
293
            if until or node is None or account not in self._allowed_accounts(user):
294
                raise NotAllowedError
295
        try:
296
            props = self._get_properties(node, until)
297
            mtime = props[self.MTIME]
298
        except NameError:
299
            props = None
300
            mtime = until
301
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
302
        tstamp = max(tstamp, mtime)
303
        if until is None:
304
            modified = tstamp
305
        else:
306
            modified = self._get_statistics(
307
                node, compute=True)[2]  # Overall last modification.
308
            modified = max(modified, mtime)
309

    
310
        if user != account:
311
            meta = {'name': account}
312
        else:
313
            meta = {}
314
            if props is not None and include_user_defined:
315
                meta.update(
316
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
317
            if until is not None:
318
                meta.update({'until_timestamp': tstamp})
319
            meta.update({'name': account, 'count': count, 'bytes': bytes})
320
            if self.using_external_quotaholder:
321
                external_quota = external_quota or {}
322
                meta['bytes'] = external_quota.get('usage', 0)
323
        meta.update({'modified': modified})
324
        return meta
325

    
326
    def update_account_meta(self, user, account, domain, meta, replace=False):
327
        """Update the metadata associated with the account for the domain."""
328

    
329
        logger.debug("update_account_meta: %s %s %s %s %s", user,
330
                     account, domain, meta, replace)
331
        if user != account:
332
            raise NotAllowedError
333
        path, node = self._lookup_account(account, True)
334
        self._put_metadata(user, node, domain, meta, replace,
335
                           update_statistics_ancestors_depth=-1)
336

    
337
    def get_account_groups(self, user, account):
338
        """Return a dictionary with the user groups defined for this account."""
339

    
340
        logger.debug("get_account_groups: %s %s", user, account)
341
        if user != account:
342
            if account not in self._allowed_accounts(user):
343
                raise NotAllowedError
344
            return {}
345
        self._lookup_account(account, True)
346
        return self.permissions.group_dict(account)
347

    
348
    def update_account_groups(self, user, account, groups, replace=False):
349
        """Update the groups associated with the account."""
350

    
351
        logger.debug("update_account_groups: %s %s %s %s", user,
352
                     account, groups, replace)
353
        if user != account:
354
            raise NotAllowedError
355
        self._lookup_account(account, True)
356
        self._check_groups(groups)
357
        if replace:
358
            self.permissions.group_destroy(account)
359
        for k, v in groups.iteritems():
360
            if not replace:  # If not already deleted.
361
                self.permissions.group_delete(account, k)
362
            if v:
363
                self.permissions.group_addmany(account, k, v)
364

    
365
    def get_account_policy(self, user, account, external_quota=None):
366
        """Return a dictionary with the account policy."""
367

    
368
        logger.debug("get_account_policy: %s %s", user, account)
369
        if user != account:
370
            if account not in self._allowed_accounts(user):
371
                raise NotAllowedError
372
            return {}
373
        path, node = self._lookup_account(account, True)
374
        policy = self._get_policy(node, is_account_policy=True)
375
        if self.using_external_quotaholder:
376
            external_quota = external_quota or {}
377
            policy['quota'] = external_quota.get('limit', 0)
378
        return policy
379

    
380
    def update_account_policy(self, user, account, policy, replace=False):
381
        """Update the policy associated with the account."""
382

    
383
        logger.debug("update_account_policy: %s %s %s %s", user,
384
                     account, policy, replace)
385
        if user != account:
386
            raise NotAllowedError
387
        path, node = self._lookup_account(account, True)
388
        self._check_policy(policy, is_account_policy=True)
389
        self._put_policy(node, policy, replace, is_account_policy=True)
390

    
391
    def put_account(self, user, account, policy=None):
392
        """Create a new account with the given name."""
393

    
394
        logger.debug("put_account: %s %s %s", user, account, policy)
395
        policy = policy or {}
396
        if user != account:
397
            raise NotAllowedError
398
        node = self.node.node_lookup(account)
399
        if node is not None:
400
            raise AccountExists('Account already exists')
401
        if policy:
402
            self._check_policy(policy, is_account_policy=True)
403
        node = self._put_path(user, self.ROOTNODE, account,
404
                              update_statistics_ancestors_depth=-1)
405
        self._put_policy(node, policy, True, is_account_policy=True)
406

    
407
    def delete_account(self, user, account):
408
        """Delete the account with the given name."""
409

    
410
        logger.debug("delete_account: %s %s", user, account)
411
        if user != account:
412
            raise NotAllowedError
413
        node = self.node.node_lookup(account)
414
        if node is None:
415
            return
416
        if not self.node.node_remove(node,
417
                                     update_statistics_ancestors_depth=-1):
418
            raise AccountNotEmpty('Account is not empty')
419
        self.permissions.group_destroy(account)
420

    
421
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
422
        """Return a list of containers existing under an account."""
423

    
424
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
425
                     account, marker, limit, shared, until, public)
426
        if user != account:
427
            if until or account not in self._allowed_accounts(user):
428
                raise NotAllowedError
429
            allowed = self._allowed_containers(user, account)
430
            start, limit = self._list_limits(allowed, marker, limit)
431
            return allowed[start:start + limit]
432
        if shared or public:
433
            allowed = set()
434
            if shared:
435
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
436
            if public:
437
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
438
            allowed = sorted(allowed)
439
            start, limit = self._list_limits(allowed, marker, limit)
440
            return allowed[start:start + limit]
441
        node = self.node.node_lookup(account)
442
        containers = [x[0] for x in self._list_object_properties(
443
            node, account, '', '/', marker, limit, False, None, [], until)]
444
        start, limit = self._list_limits(
445
            [x[0] for x in containers], marker, limit)
446
        return containers[start:start + limit]
447

    
448
    def list_container_meta(self, user, account, container, domain, until=None):
449
        """Return a list with all the container's object meta keys for the domain."""
450

    
451
        logger.debug("list_container_meta: %s %s %s %s %s", user,
452
                     account, container, domain, until)
453
        allowed = []
454
        if user != account:
455
            if until:
456
                raise NotAllowedError
457
            allowed = self.permissions.access_list_paths(
458
                user, '/'.join((account, container)))
459
            if not allowed:
460
                raise NotAllowedError
461
        path, node = self._lookup_container(account, container)
462
        before = until if until is not None else inf
463
        allowed = self._get_formatted_paths(allowed)
464
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
465

    
466
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
467
        """Return a dictionary with the container metadata for the domain."""
468

    
469
        logger.debug("get_container_meta: %s %s %s %s %s", user,
470
                     account, container, domain, until)
471
        if user != account:
472
            if until or container not in self._allowed_containers(user, account):
473
                raise NotAllowedError
474
        path, node = self._lookup_container(account, container)
475
        props = self._get_properties(node, until)
476
        mtime = props[self.MTIME]
477
        count, bytes, tstamp = self._get_statistics(node, until)
478
        tstamp = max(tstamp, mtime)
479
        if until is None:
480
            modified = tstamp
481
        else:
482
            modified = self._get_statistics(
483
                node)[2]  # Overall last modification.
484
            modified = max(modified, mtime)
485

    
486
        if user != account:
487
            meta = {'name': container}
488
        else:
489
            meta = {}
490
            if include_user_defined:
491
                meta.update(
492
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
493
            if until is not None:
494
                meta.update({'until_timestamp': tstamp})
495
            meta.update({'name': container, 'count': count, 'bytes': bytes})
496
        meta.update({'modified': modified})
497
        return meta
498

    
499
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
500
        """Update the metadata associated with the container for the domain."""
501

    
502
        logger.debug("update_container_meta: %s %s %s %s %s %s",
503
                     user, account, container, domain, meta, replace)
504
        if user != account:
505
            raise NotAllowedError
506
        path, node = self._lookup_container(account, container)
507
        src_version_id, dest_version_id = self._put_metadata(
508
            user, node, domain, meta, replace,
509
            update_statistics_ancestors_depth=0)
510
        if src_version_id is not None:
511
            versioning = self._get_policy(
512
                node, is_account_policy=False)['versioning']
513
            if versioning != 'auto':
514
                self.node.version_remove(src_version_id,
515
                                         update_statistics_ancestors_depth=0)
516

    
517
    def get_container_policy(self, user, account, container):
518
        """Return a dictionary with the container policy."""
519

    
520
        logger.debug(
521
            "get_container_policy: %s %s %s", user, account, container)
522
        if user != account:
523
            if container not in self._allowed_containers(user, account):
524
                raise NotAllowedError
525
            return {}
526
        path, node = self._lookup_container(account, container)
527
        return self._get_policy(node, is_account_policy=False)
528

    
529
    def update_container_policy(self, user, account, container, policy, replace=False):
530
        """Update the policy associated with the container."""
531

    
532
        logger.debug("update_container_policy: %s %s %s %s %s",
533
                     user, account, container, policy, replace)
534
        if user != account:
535
            raise NotAllowedError
536
        path, node = self._lookup_container(account, container)
537
        self._check_policy(policy, is_account_policy=False)
538
        self._put_policy(node, policy, replace, is_account_policy=False)
539

    
540
    def put_container(self, user, account, container, policy=None):
541
        """Create a new container with the given name."""
542

    
543
        logger.debug(
544
            "put_container: %s %s %s %s", user, account, container, policy)
545
        policy = policy or {}
546
        if user != account:
547
            raise NotAllowedError
548
        try:
549
            path, node = self._lookup_container(account, container)
550
        except NameError:
551
            pass
552
        else:
553
            raise ContainerExists('Container already exists')
554
        if policy:
555
            self._check_policy(policy, is_account_policy=False)
556
        path = '/'.join((account, container))
557
        node = self._put_path(
558
            user, self._lookup_account(account, True)[1], path,
559
            update_statistics_ancestors_depth=-1)
560
        self._put_policy(node, policy, True, is_account_policy=False)
561

    
562
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
563
        """Delete/purge the container with the given name."""
564

    
565
        logger.debug("delete_container: %s %s %s %s %s %s", user,
566
                     account, container, until, prefix, delimiter)
567
        if user != account:
568
            raise NotAllowedError
569
        path, node = self._lookup_container(account, container)
570

    
571
        if until is not None:
572
            hashes, size, serials = self.node.node_purge_children(
573
                node, until, CLUSTER_HISTORY,
574
                update_statistics_ancestors_depth=0)
575
            for h in hashes:
576
                self.store.map_delete(h)
577
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
578
                                          update_statistics_ancestors_depth=0)
579
            if not self.free_versioning:
580
                self._report_size_change(
581
                    user, account, -size, {
582
                        'action':'container purge',
583
                        'path': path,
584
                        'versions': ','.join(str(i) for i in serials)
585
                    }
586
                )
587
            return
588

    
589
        if not delimiter:
590
            if self._get_statistics(node)[0] > 0:
591
                raise ContainerNotEmpty('Container is not empty')
592
            hashes, size, serials = self.node.node_purge_children(
593
                node, inf, CLUSTER_HISTORY,
594
                update_statistics_ancestors_depth=0)
595
            for h in hashes:
596
                self.store.map_delete(h)
597
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
598
                                          update_statistics_ancestors_depth=0)
599
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
600
            if not self.free_versioning:
601
                self._report_size_change(
602
                    user, account, -size, {
603
                        'action':'container purge',
604
                        'path': path,
605
                        'versions': ','.join(str(i) for i in serials)
606
                    }
607
                )
608
        else:
609
            # remove only contents
610
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
611
            paths = []
612
            for t in src_names:
613
                path = '/'.join((account, container, t[0]))
614
                node = t[2]
615
                src_version_id, dest_version_id = self._put_version_duplicate(
616
                    user, node, size=0, type='', hash=None, checksum='',
617
                    cluster=CLUSTER_DELETED,
618
                    update_statistics_ancestors_depth=1)
619
                del_size = self._apply_versioning(
620
                    account, container, src_version_id,
621
                    update_statistics_ancestors_depth=1)
622
                self._report_size_change(
623
                        user, account, -del_size, {
624
                                'action': 'object delete',
625
                                'path': path,
626
                        'versions': ','.join([str(dest_version_id)])
627
                     }
628
                )
629
                self._report_object_change(
630
                    user, account, path, details={'action': 'object delete'})
631
                paths.append(path)
632
            self.permissions.access_clear_bulk(paths)
633

    
634
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
635
        if user != account and until:
636
            raise NotAllowedError
637
        if shared and public:
638
            # get shared first
639
            shared_paths = self._list_object_permissions(
640
                user, account, container, prefix, shared=True, public=False)
641
            objects = set()
642
            if shared_paths:
643
                path, node = self._lookup_container(account, container)
644
                shared_paths = self._get_formatted_paths(shared_paths)
645
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
646

    
647
            # get public
648
            objects |= set(self._list_public_object_properties(
649
                user, account, container, prefix, all_props))
650
            objects = list(objects)
651

    
652
            objects.sort(key=lambda x: x[0])
653
            start, limit = self._list_limits(
654
                [x[0] for x in objects], marker, limit)
655
            return objects[start:start + limit]
656
        elif public:
657
            objects = self._list_public_object_properties(
658
                user, account, container, prefix, all_props)
659
            start, limit = self._list_limits(
660
                [x[0] for x in objects], marker, limit)
661
            return objects[start:start + limit]
662

    
663
        allowed = self._list_object_permissions(
664
            user, account, container, prefix, shared, public)
665
        if shared and not allowed:
666
            return []
667
        path, node = self._lookup_container(account, container)
668
        allowed = self._get_formatted_paths(allowed)
669
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
670
        start, limit = self._list_limits(
671
            [x[0] for x in objects], marker, limit)
672
        return objects[start:start + limit]
673

    
674
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
675
        public = self._list_object_permissions(
676
            user, account, container, prefix, shared=False, public=True)
677
        paths, nodes = self._lookup_objects(public)
678
        path = '/'.join((account, container))
679
        cont_prefix = path + '/'
680
        paths = [x[len(cont_prefix):] for x in paths]
681
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
682
        objects = [(path,) + props for path, props in zip(paths, props)]
683
        return objects
684

    
685
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
686
        objects = []
687
        while True:
688
            marker = objects[-1] if objects else None
689
            limit = 10000
690
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
691
            objects.extend(l)
692
            if not l or len(l) < limit:
693
                break
694
        return objects
695

    
696
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
697
        allowed = []
698
        path = '/'.join((account, container, prefix)).rstrip('/')
699
        if user != account:
700
            allowed = self.permissions.access_list_paths(user, path)
701
            if not allowed:
702
                raise NotAllowedError
703
        else:
704
            allowed = set()
705
            if shared:
706
                allowed.update(self.permissions.access_list_shared(path))
707
            if public:
708
                allowed.update(
709
                    [x[0] for x in self.permissions.public_list(path)])
710
            allowed = sorted(allowed)
711
            if not allowed:
712
                return []
713
        return allowed
714

    
715
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
716
        """Return a list of object (name, version_id) tuples existing under a container."""
717

    
718
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
719
        keys = keys or []
720
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
721

    
722
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
723
        """Return a list of object metadata dicts existing under a container."""
724

    
725
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
726
        keys = keys or []
727
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
728
        objects = []
729
        for p in props:
730
            if len(p) == 2:
731
                objects.append({'subdir': p[0]})
732
            else:
733
                objects.append({'name': p[0],
734
                                'bytes': p[self.SIZE + 1],
735
                                'type': p[self.TYPE + 1],
736
                                'hash': p[self.HASH + 1],
737
                                'version': p[self.SERIAL + 1],
738
                                'version_timestamp': p[self.MTIME + 1],
739
                                'modified': p[self.MTIME + 1] if until is None else None,
740
                                'modified_by': p[self.MUSER + 1],
741
                                'uuid': p[self.UUID + 1],
742
                                'checksum': p[self.CHECKSUM + 1]})
743
        return objects
744

    
745
    def list_object_permissions(self, user, account, container, prefix=''):
746
        """Return a list of paths that enforce permissions under a container."""
747

    
748
        logger.debug("list_object_permissions: %s %s %s %s", user,
749
                     account, container, prefix)
750
        return self._list_object_permissions(user, account, container, prefix, True, False)
751

    
752
    def list_object_public(self, user, account, container, prefix=''):
753
        """Return a dict mapping paths to public ids for objects that are public under a container."""
754

    
755
        logger.debug("list_object_public: %s %s %s %s", user,
756
                     account, container, prefix)
757
        public = {}
758
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
759
            public[path] = p
760
        return public
761

    
762
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
763
        """Return a dictionary with the object metadata for the domain."""
764

    
765
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
766
                     account, container, name, domain, version)
767
        self._can_read(user, account, container, name)
768
        path, node = self._lookup_object(account, container, name)
769
        props = self._get_version(node, version)
770
        if version is None:
771
            modified = props[self.MTIME]
772
        else:
773
            try:
774
                modified = self._get_version(
775
                    node)[self.MTIME]  # Overall last modification.
776
            except NameError:  # Object may be deleted.
777
                del_props = self.node.version_lookup(
778
                    node, inf, CLUSTER_DELETED)
779
                if del_props is None:
780
                    raise ItemNotExists('Object does not exist')
781
                modified = del_props[self.MTIME]
782

    
783
        meta = {}
784
        if include_user_defined:
785
            meta.update(
786
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
787
        meta.update({'name': name,
788
                     'bytes': props[self.SIZE],
789
                     'type': props[self.TYPE],
790
                     'hash': props[self.HASH],
791
                     'version': props[self.SERIAL],
792
                     'version_timestamp': props[self.MTIME],
793
                     'modified': modified,
794
                     'modified_by': props[self.MUSER],
795
                     'uuid': props[self.UUID],
796
                     'checksum': props[self.CHECKSUM]})
797
        return meta
798

    
799
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
800
        """Update the metadata associated with the object for the domain and return the new version."""
801

    
802
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
803
                     user, account, container, name, domain, meta, replace)
804
        self._can_write(user, account, container, name)
805
        path, node = self._lookup_object(account, container, name)
806
        src_version_id, dest_version_id = self._put_metadata(
807
            user, node, domain, meta, replace,
808
            update_statistics_ancestors_depth=1)
809
        self._apply_versioning(account, container, src_version_id,
810
                               update_statistics_ancestors_depth=1)
811
        return dest_version_id
812

    
813
    def get_object_permissions(self, user, account, container, name):
814
        """Return the action allowed on the object, the path
815
        from which the object gets its permissions from,
816
        along with a dictionary containing the permissions."""
817

    
818
        logger.debug("get_object_permissions: %s %s %s %s", user,
819
                     account, container, name)
820
        allowed = 'write'
821
        permissions_path = self._get_permissions_path(account, container, name)
822
        if user != account:
823
            if self.permissions.access_check(permissions_path, self.WRITE, user):
824
                allowed = 'write'
825
            elif self.permissions.access_check(permissions_path, self.READ, user):
826
                allowed = 'read'
827
            else:
828
                raise NotAllowedError
829
        self._lookup_object(account, container, name)
830
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
831

    
832
    def update_object_permissions(self, user, account, container, name, permissions):
833
        """Update the permissions associated with the object."""
834

    
835
        logger.debug("update_object_permissions: %s %s %s %s %s",
836
                     user, account, container, name, permissions)
837
        if user != account:
838
            raise NotAllowedError
839
        path = self._lookup_object(account, container, name)[0]
840
        self._check_permissions(path, permissions)
841
        self.permissions.access_set(path, permissions)
842
        self._report_sharing_change(user, account, path, {'members':
843
                                    self.permissions.access_members(path)})
844

    
845
    def get_object_public(self, user, account, container, name):
846
        """Return the public id of the object if applicable."""
847

    
848
        logger.debug(
849
            "get_object_public: %s %s %s %s", user, account, container, name)
850
        self._can_read(user, account, container, name)
851
        path = self._lookup_object(account, container, name)[0]
852
        p = self.permissions.public_get(path)
853
        return p
854

    
855
    def update_object_public(self, user, account, container, name, public):
856
        """Update the public status of the object."""
857

    
858
        logger.debug("update_object_public: %s %s %s %s %s", user,
859
                     account, container, name, public)
860
        self._can_write(user, account, container, name)
861
        path = self._lookup_object(account, container, name)[0]
862
        if not public:
863
            self.permissions.public_unset(path)
864
        else:
865
            self.permissions.public_set(
866
                path, self.public_url_security, self.public_url_alphabet
867
            )
868

    
869
    def get_object_hashmap(self, user, account, container, name, version=None):
870
        """Return the object's size and a list with partial hashes."""
871

    
872
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
873
                     account, container, name, version)
874
        self._can_read(user, account, container, name)
875
        path, node = self._lookup_object(account, container, name)
876
        props = self._get_version(node, version)
877
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
878
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
879

    
880
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
881
        if permissions is not None and user != account:
882
            raise NotAllowedError
883
        self._can_write(user, account, container, name)
884
        if permissions is not None:
885
            path = '/'.join((account, container, name))
886
            self._check_permissions(path, permissions)
887

    
888
        account_path, account_node = self._lookup_account(account, True)
889
        container_path, container_node = self._lookup_container(
890
            account, container)
891

    
892
        path, node = self._put_object_node(
893
            container_path, container_node, name)
894
        pre_version_id, dest_version_id = self._put_version_duplicate(
895
            user, node, src_node=src_node, size=size, type=type, hash=hash,
896
            checksum=checksum, is_copy=is_copy,
897
            update_statistics_ancestors_depth=1)
898

    
899
        # Handle meta.
900
        if src_version_id is None:
901
            src_version_id = pre_version_id
902
        self._put_metadata_duplicate(
903
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
904

    
905
        del_size = self._apply_versioning(account, container, pre_version_id,
906
                                          update_statistics_ancestors_depth=1)
907
        size_delta = size - del_size
908
        if size_delta > 0:
909
            # Check account quota.
910
            if not self.using_external_quotaholder:
911
                account_quota = long(
912
                    self._get_policy(account_node, is_account_policy=True
913
                    )['quota']
914
                )
915
                account_usage = self._get_statistics(account_node, compute=True)[1]
916
                if (account_quota > 0 and account_usage > account_quota):
917
                    raise QuotaError(
918
                        'Account quota exceeded: limit: %s, usage: %s' % (
919
                            account_quota, account_usage
920
                        )
921
                    )
922

    
923
            # Check container quota.
924
            container_quota = long(
925
                self._get_policy(container_node, is_account_policy=False
926
                )['quota']
927
            )
928
            container_usage = self._get_statistics(container_node)[1]
929
            if (container_quota > 0 and container_usage > container_quota):
930
                # This must be executed in a transaction, so the version is
931
                # never created if it fails.
932
                raise QuotaError(
933
                    'Container quota exceeded: limit: %s, usage: %s' % (
934
                        container_quota, container_usage
935
                    )
936
                )
937

    
938
        self._report_size_change(user, account, size_delta,
939
                                 {'action': 'object update', 'path': path,
940
                                  'versions': ','.join([str(dest_version_id)])})
941
        if permissions is not None:
942
            self.permissions.access_set(path, permissions)
943
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
944

    
945
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
946
        return dest_version_id
947

    
948
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
949
        """Create/update an object with the specified size and partial hashes."""
950

    
951
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
952
                     account, container, name, size, type, hashmap, checksum)
953
        meta = meta or {}
954
        if size == 0:  # No such thing as an empty hashmap.
955
            hashmap = [self.put_block('')]
956
        map = HashMap(self.block_size, self.hash_algorithm)
957
        map.extend([binascii.unhexlify(x) for x in hashmap])
958
        missing = self.store.block_search(map)
959
        if missing:
960
            ie = IndexError()
961
            ie.data = [binascii.hexlify(x) for x in missing]
962
            raise ie
963

    
964
        hash = map.hash()
965
        hexlified = binascii.hexlify(hash)
966
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, hexlified, checksum, domain, meta, replace_meta, permissions)
967
        self.store.map_put(hash, map)
968
        return dest_version_id, hexlified
969

    
970
    def update_object_checksum(self, user, account, container, name, version, checksum):
971
        """Update an object's checksum."""
972

    
973
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
974
                     user, account, container, name, version, checksum)
975
        # Update objects with greater version and same hashmap and size (fix metadata updates).
976
        self._can_write(user, account, container, name)
977
        path, node = self._lookup_object(account, container, name)
978
        props = self._get_version(node, version)
979
        versions = self.node.node_get_versions(node)
980
        for x in versions:
981
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
982
                self.node.version_put_property(
983
                    x[self.SERIAL], 'checksum', checksum)
984

    
985
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
986
        dest_meta = dest_meta or {}
987
        dest_version_ids = []
988
        self._can_read(user, src_account, src_container, src_name)
989
        path, node = self._lookup_object(src_account, src_container, src_name)
990
        # TODO: Will do another fetch of the properties in duplicate version...
991
        props = self._get_version(
992
            node, src_version)  # Check to see if source exists.
993
        src_version_id = props[self.SERIAL]
994
        hash = props[self.HASH]
995
        size = props[self.SIZE]
996
        is_copy = not is_move and (src_account, src_container, src_name) != (
997
            dest_account, dest_container, dest_name)  # New uuid.
998
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
999
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1000
            self._delete_object(user, src_account, src_container, src_name)
1001

    
1002
        if delimiter:
1003
            prefix = src_name + \
1004
                delimiter if not src_name.endswith(delimiter) else src_name
1005
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1006
            src_names.sort(key=lambda x: x[2])  # order by nodes
1007
            paths = [elem[0] for elem in src_names]
1008
            nodes = [elem[2] for elem in src_names]
1009
            # TODO: Will do another fetch of the properties in duplicate version...
1010
            props = self._get_versions(nodes)  # Check to see if source exists.
1011

    
1012
            for prop, path, node in zip(props, paths, nodes):
1013
                src_version_id = prop[self.SERIAL]
1014
                hash = prop[self.HASH]
1015
                vtype = prop[self.TYPE]
1016
                size = prop[self.SIZE]
1017
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1018
                    delimiter) else dest_name
1019
                vdest_name = path.replace(prefix, dest_prefix, 1)
1020
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1021
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1022
                    self._delete_object(user, src_account, src_container, path)
1023
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1024

    
1025
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1026
        """Copy an object's data and metadata."""
1027

    
1028
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1029
        meta = meta or {}
1030
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1031
        return dest_version_id
1032

    
1033
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1034
        """Move an object's data and metadata."""
1035

    
1036
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1037
        meta = meta or {}
1038
        if user != src_account:
1039
            raise NotAllowedError
1040
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1041
        return dest_version_id
1042

    
1043
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1044
        if user != account:
1045
            raise NotAllowedError
1046

    
1047
        if until is not None:
1048
            path = '/'.join((account, container, name))
1049
            node = self.node.node_lookup(path)
1050
            if node is None:
1051
                return
1052
            hashes = []
1053
            size = 0
1054
            serials = []
1055
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1056
                                           update_statistics_ancestors_depth=1)
1057
            hashes += h
1058
            size += s
1059
            serials += v
1060
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1061
                                           update_statistics_ancestors_depth=1)
1062
            hashes += h
1063
            if not self.free_versioning:
1064
                size += s
1065
            serials += v
1066
            for h in hashes:
1067
                self.store.map_delete(h)
1068
            self.node.node_purge(node, until, CLUSTER_DELETED,
1069
                                 update_statistics_ancestors_depth=1)
1070
            try:
1071
                props = self._get_version(node)
1072
            except NameError:
1073
                self.permissions.access_clear(path)
1074
            self._report_size_change(
1075
                user, account, -size, {
1076
                    'action': 'object purge',
1077
                    'path': path,
1078
                    'versions': ','.join(str(i) for i in serials)
1079
                }
1080
            )
1081
            return
1082

    
1083
        path, node = self._lookup_object(account, container, name)
1084
        src_version_id, dest_version_id = self._put_version_duplicate(
1085
            user, node, size=0, type='', hash=None, checksum='',
1086
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1087
        del_size = self._apply_versioning(account, container, src_version_id,
1088
                                          update_statistics_ancestors_depth=1)
1089
        self._report_size_change(user, account, -del_size,
1090
                                 {'action': 'object delete', 'path': path,
1091
                                  'versions': ','.join([str(dest_version_id)])})
1092
        self._report_object_change(
1093
            user, account, path, details={'action': 'object delete'})
1094
        self.permissions.access_clear(path)
1095

    
1096
        if delimiter:
1097
            prefix = name + delimiter if not name.endswith(delimiter) else name
1098
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1099
            paths = []
1100
            for t in src_names:
1101
                path = '/'.join((account, container, t[0]))
1102
                node = t[2]
1103
                src_version_id, dest_version_id = self._put_version_duplicate(
1104
                    user, node, size=0, type='', hash=None, checksum='',
1105
                    cluster=CLUSTER_DELETED,
1106
                    update_statistics_ancestors_depth=1)
1107
                del_size = self._apply_versioning(
1108
                    account, container, src_version_id,
1109
                    update_statistics_ancestors_depth=1)
1110
                self._report_size_change(user, account, -del_size,
1111
                                         {'action': 'object delete',
1112
                                          'path': path,
1113
                                          'versions': ','.join([str(dest_version_id)])})
1114
                self._report_object_change(
1115
                    user, account, path, details={'action': 'object delete'})
1116
                paths.append(path)
1117
            self.permissions.access_clear_bulk(paths)
1118

    
1119
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1120
        """Delete/purge an object."""
1121

    
1122
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1123
                     account, container, name, until, prefix, delimiter)
1124
        self._delete_object(user, account, container, name, until, delimiter)
1125

    
1126
    def list_versions(self, user, account, container, name):
1127
        """Return a list of all (version, version_timestamp) tuples for an object."""
1128

    
1129
        logger.debug(
1130
            "list_versions: %s %s %s %s", user, account, container, name)
1131
        self._can_read(user, account, container, name)
1132
        path, node = self._lookup_object(account, container, name)
1133
        versions = self.node.node_get_versions(node)
1134
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1135

    
1136
    def get_uuid(self, user, uuid):
1137
        """Return the (account, container, name) for the UUID given."""
1138

    
1139
        logger.debug("get_uuid: %s %s", user, uuid)
1140
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1141
        if info is None:
1142
            raise NameError
1143
        path, serial = info
1144
        account, container, name = path.split('/', 2)
1145
        self._can_read(user, account, container, name)
1146
        return (account, container, name)
1147

    
1148
    def get_public(self, user, public):
1149
        """Return the (account, container, name) for the public id given."""
1150

    
1151
        logger.debug("get_public: %s %s", user, public)
1152
        path = self.permissions.public_path(public)
1153
        if path is None:
1154
            raise NameError
1155
        account, container, name = path.split('/', 2)
1156
        self._can_read(user, account, container, name)
1157
        return (account, container, name)
1158

    
1159
    def get_block(self, hash):
1160
        """Return a block's data."""
1161

    
1162
        logger.debug("get_block: %s", hash)
1163
        block = self.store.block_get(binascii.unhexlify(hash))
1164
        if not block:
1165
            raise ItemNotExists('Block does not exist')
1166
        return block
1167

    
1168
    def put_block(self, data):
1169
        """Store a block and return the hash."""
1170

    
1171
        logger.debug("put_block: %s", len(data))
1172
        return binascii.hexlify(self.store.block_put(data))
1173

    
1174
    def update_block(self, hash, data, offset=0):
1175
        """Update a known block and return the hash."""
1176

    
1177
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1178
        if offset == 0 and len(data) == self.block_size:
1179
            return self.put_block(data)
1180
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1181
        return binascii.hexlify(h)
1182

    
1183
    # Path functions.
1184

    
1185
    def _generate_uuid(self):
1186
        return str(uuidlib.uuid4())
1187

    
1188
    def _put_object_node(self, path, parent, name):
1189
        path = '/'.join((path, name))
1190
        node = self.node.node_lookup(path)
1191
        if node is None:
1192
            node = self.node.node_create(parent, path)
1193
        return path, node
1194

    
1195
    def _put_path(self, user, parent, path,
1196
                  update_statistics_ancestors_depth=None):
1197
        node = self.node.node_create(parent, path)
1198
        self.node.version_create(node, None, 0, '', None, user,
1199
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1200
                                 update_statistics_ancestors_depth)
1201
        return node
1202

    
1203
    def _lookup_account(self, account, create=True):
1204
        for_update = True if create else False
1205
        node = self.node.node_lookup(account, for_update=for_update)
1206
        if node is None and create:
1207
            node = self._put_path(
1208
                account, self.ROOTNODE, account,
1209
                update_statistics_ancestors_depth=-1)  # User is account.
1210
        return account, node
1211

    
1212
    def _lookup_container(self, account, container):
1213
        for_update = True if self.lock_container_path else False
1214
        path = '/'.join((account, container))
1215
        node = self.node.node_lookup(path, for_update)
1216
        if node is None:
1217
            raise ItemNotExists('Container does not exist')
1218
        return path, node
1219

    
1220
    def _lookup_object(self, account, container, name):
1221
        path = '/'.join((account, container, name))
1222
        node = self.node.node_lookup(path)
1223
        if node is None:
1224
            raise ItemNotExists('Object does not exist')
1225
        return path, node
1226

    
1227
    def _lookup_objects(self, paths):
1228
        nodes = self.node.node_lookup_bulk(paths)
1229
        return paths, nodes
1230

    
1231
    def _get_properties(self, node, until=None):
1232
        """Return properties until the timestamp given."""
1233

    
1234
        before = until if until is not None else inf
1235
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1236
        if props is None and until is not None:
1237
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1238
        if props is None:
1239
            raise ItemNotExists('Path does not exist')
1240
        return props
1241

    
1242
    def _get_statistics(self, node, until=None, compute=False):
1243
        """Return count, sum of size and latest timestamp of everything under node."""
1244

    
1245
        if until is not None:
1246
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1247
        elif compute:
1248
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1249
        else:
1250
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1251
        if stats is None:
1252
            stats = (0, 0, 0)
1253
        return stats
1254

    
1255
    def _get_version(self, node, version=None):
1256
        if version is None:
1257
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1258
            if props is None:
1259
                raise ItemNotExists('Object does not exist')
1260
        else:
1261
            try:
1262
                version = int(version)
1263
            except ValueError:
1264
                raise VersionNotExists('Version does not exist')
1265
            props = self.node.version_get_properties(version)
1266
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1267
                raise VersionNotExists('Version does not exist')
1268
        return props
1269

    
1270
    def _get_versions(self, nodes):
1271
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1272

    
1273
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1274
                               type=None, hash=None, checksum=None,
1275
                               cluster=CLUSTER_NORMAL, is_copy=False,
1276
                               update_statistics_ancestors_depth=None):
1277
        """Create a new version of the node."""
1278

    
1279
        props = self.node.version_lookup(
1280
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1281
        if props is not None:
1282
            src_version_id = props[self.SERIAL]
1283
            src_hash = props[self.HASH]
1284
            src_size = props[self.SIZE]
1285
            src_type = props[self.TYPE]
1286
            src_checksum = props[self.CHECKSUM]
1287
        else:
1288
            src_version_id = None
1289
            src_hash = None
1290
            src_size = 0
1291
            src_type = ''
1292
            src_checksum = ''
1293
        if size is None:  # Set metadata.
1294
            hash = src_hash  # This way hash can be set to None (account or container).
1295
            size = src_size
1296
        if type is None:
1297
            type = src_type
1298
        if checksum is None:
1299
            checksum = src_checksum
1300
        uuid = self._generate_uuid(
1301
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1302

    
1303
        if src_node is None:
1304
            pre_version_id = src_version_id
1305
        else:
1306
            pre_version_id = None
1307
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1308
            if props is not None:
1309
                pre_version_id = props[self.SERIAL]
1310
        if pre_version_id is not None:
1311
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1312
                                        update_statistics_ancestors_depth)
1313

    
1314
        dest_version_id, mtime = self.node.version_create(
1315
            node, hash, size, type, src_version_id, user, uuid, checksum,
1316
            cluster, update_statistics_ancestors_depth)
1317

    
1318
        self.node.attribute_unset_is_latest(node, dest_version_id)
1319

    
1320
        return pre_version_id, dest_version_id
1321

    
1322
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1323
                                node, meta, replace=False):
1324
        if src_version_id is not None:
1325
            self.node.attribute_copy(src_version_id, dest_version_id)
1326
        if not replace:
1327
            self.node.attribute_del(dest_version_id, domain, (
1328
                k for k, v in meta.iteritems() if v == ''))
1329
            self.node.attribute_set(dest_version_id, domain, node, (
1330
                (k, v) for k, v in meta.iteritems() if v != ''))
1331
        else:
1332
            self.node.attribute_del(dest_version_id, domain)
1333
            self.node.attribute_set(dest_version_id, domain, node, ((
1334
                k, v) for k, v in meta.iteritems()))
1335

    
1336
    def _put_metadata(self, user, node, domain, meta, replace=False,
1337
                      update_statistics_ancestors_depth=None):
1338
        """Create a new version and store metadata."""
1339

    
1340
        src_version_id, dest_version_id = self._put_version_duplicate(
1341
            user, node,
1342
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1343
        self._put_metadata_duplicate(
1344
            src_version_id, dest_version_id, domain, node, meta, replace)
1345
        return src_version_id, dest_version_id
1346

    
1347
    def _list_limits(self, listing, marker, limit):
1348
        start = 0
1349
        if marker:
1350
            try:
1351
                start = listing.index(marker) + 1
1352
            except ValueError:
1353
                pass
1354
        if not limit or limit > 10000:
1355
            limit = 10000
1356
        return start, limit
1357

    
1358
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1359
        keys = keys or []
1360
        allowed = allowed or []
1361
        cont_prefix = path + '/'
1362
        prefix = cont_prefix + prefix
1363
        start = cont_prefix + marker if marker else None
1364
        before = until if until is not None else inf
1365
        filterq = keys if domain else []
1366
        sizeq = size_range
1367

    
1368
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1369
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1370
        objects.sort(key=lambda x: x[0])
1371
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1372
        return objects
1373

    
1374
    # Reporting functions.
1375

    
1376
    def _report_size_change(self, user, account, size, details=None):
1377
        details = details or {}
1378

    
1379
        if size == 0:
1380
            return
1381

    
1382
        account_node = self._lookup_account(account, True)[1]
1383
        total = self._get_statistics(account_node, compute=True)[1]
1384
        details.update({'user': user, 'total': total})
1385
        logger.debug(
1386
            "_report_size_change: %s %s %s %s", user, account, size, details)
1387
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1388
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1389
                              float(size), details))
1390

    
1391
        if not self.using_external_quotaholder:
1392
            return
1393

    
1394
        try:
1395
            name = details['path'] if 'path' in details else ''
1396
            serial = self.astakosclient.issue_one_commission(
1397
                token=self.service_token,
1398
                holder=account,
1399
                source=DEFAULT_SOURCE,
1400
                provisions={'pithos.diskspace': size},
1401
                name=name
1402
                )
1403
        except BaseException, e:
1404
            raise QuotaError(e)
1405
        else:
1406
            self.serials.append(serial)
1407

    
1408
    def _report_object_change(self, user, account, path, details=None):
1409
        details = details or {}
1410
        details.update({'user': user})
1411
        logger.debug("_report_object_change: %s %s %s %s", user,
1412
                     account, path, details)
1413
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1414
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1415

    
1416
    def _report_sharing_change(self, user, account, path, details=None):
1417
        logger.debug("_report_permissions_change: %s %s %s %s",
1418
                     user, account, path, details)
1419
        details = details or {}
1420
        details.update({'user': user})
1421
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1422
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1423

    
1424
    # Policy functions.
1425

    
1426
    def _check_policy(self, policy, is_account_policy=True):
1427
        default_policy = self.default_account_policy \
1428
            if is_account_policy else self.default_container_policy
1429
        for k in policy.keys():
1430
            if policy[k] == '':
1431
                policy[k] = default_policy.get(k)
1432
        for k, v in policy.iteritems():
1433
            if k == 'quota':
1434
                q = int(v)  # May raise ValueError.
1435
                if q < 0:
1436
                    raise ValueError
1437
            elif k == 'versioning':
1438
                if v not in ['auto', 'none']:
1439
                    raise ValueError
1440
            else:
1441
                raise ValueError
1442

    
1443
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1444
        default_policy = self.default_account_policy \
1445
            if is_account_policy else self.default_container_policy
1446
        if replace:
1447
            for k, v in default_policy.iteritems():
1448
                if k not in policy:
1449
                    policy[k] = v
1450
        self.node.policy_set(node, policy)
1451

    
1452
    def _get_policy(self, node, is_account_policy=True):
1453
        default_policy = self.default_account_policy \
1454
            if is_account_policy else self.default_container_policy
1455
        policy = default_policy.copy()
1456
        policy.update(self.node.policy_get(node))
1457
        return policy
1458

    
1459
    def _apply_versioning(self, account, container, version_id,
1460
                          update_statistics_ancestors_depth=None):
1461
        """Delete the provided version if such is the policy.
1462
           Return size of object removed.
1463
        """
1464

    
1465
        if version_id is None:
1466
            return 0
1467
        path, node = self._lookup_container(account, container)
1468
        versioning = self._get_policy(
1469
            node, is_account_policy=False)['versioning']
1470
        if versioning != 'auto':
1471
            hash, size = self.node.version_remove(
1472
                version_id, update_statistics_ancestors_depth)
1473
            self.store.map_delete(hash)
1474
            return size
1475
        elif self.free_versioning:
1476
            return self.node.version_get_properties(
1477
                version_id, keys=('size',))[0]
1478
        return 0
1479

    
1480
    # Access control functions.
1481

    
1482
    def _check_groups(self, groups):
1483
        # raise ValueError('Bad characters in groups')
1484
        pass
1485

    
1486
    def _check_permissions(self, path, permissions):
1487
        # raise ValueError('Bad characters in permissions')
1488
        pass
1489

    
1490
    def _get_formatted_paths(self, paths):
1491
        formatted = []
1492
        for p in paths:
1493
            node = self.node.node_lookup(p)
1494
            props = None
1495
            if node is not None:
1496
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1497
            if props is not None:
1498
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1499
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1500
                formatted.append((p, self.MATCH_EXACT))
1501
        return formatted
1502

    
1503
    def _get_permissions_path(self, account, container, name):
1504
        path = '/'.join((account, container, name))
1505
        permission_paths = self.permissions.access_inherit(path)
1506
        permission_paths.sort()
1507
        permission_paths.reverse()
1508
        for p in permission_paths:
1509
            if p == path:
1510
                return p
1511
            else:
1512
                if p.count('/') < 2:
1513
                    continue
1514
                node = self.node.node_lookup(p)
1515
                props = None
1516
                if node is not None:
1517
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1518
                if props is not None:
1519
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1520
                        return p
1521
        return None
1522

    
1523
    def _can_read(self, user, account, container, name):
1524
        if user == account:
1525
            return True
1526
        path = '/'.join((account, container, name))
1527
        if self.permissions.public_get(path) is not None:
1528
            return True
1529
        path = self._get_permissions_path(account, container, name)
1530
        if not path:
1531
            raise NotAllowedError
1532
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1533
            raise NotAllowedError
1534

    
1535
    def _can_write(self, user, account, container, name):
1536
        if user == account:
1537
            return True
1538
        path = '/'.join((account, container, name))
1539
        path = self._get_permissions_path(account, container, name)
1540
        if not path:
1541
            raise NotAllowedError
1542
        if not self.permissions.access_check(path, self.WRITE, user):
1543
            raise NotAllowedError
1544

    
1545
    def _allowed_accounts(self, user):
1546
        allow = set()
1547
        for path in self.permissions.access_list_paths(user):
1548
            allow.add(path.split('/', 1)[0])
1549
        return sorted(allow)
1550

    
1551
    def _allowed_containers(self, user, account):
1552
        allow = set()
1553
        for path in self.permissions.access_list_paths(user, account):
1554
            allow.add(path.split('/', 2)[1])
1555
        return sorted(allow)
1556

    
1557
    # Domain functions
1558

    
1559
    def get_domain_objects(self, domain, user=None):
1560
        allowed_paths = self.permissions.access_list_paths(
1561
            user, include_owned=user is not None, include_containers=False)
1562
        if not allowed_paths:
1563
            return []
1564
        obj_list = self.node.domain_object_list(
1565
            domain, allowed_paths, CLUSTER_NORMAL)
1566
        return [(path,
1567
                 self._build_metadata(props, user_defined_meta),
1568
                 self.permissions.access_get(path)) for
1569
                path, props, user_defined_meta in obj_list]
1570

    
1571
    # util functions
1572

    
1573
    def _build_metadata(self, props, user_defined=None,
1574
                        include_user_defined=True):
1575
        meta = {'bytes': props[self.SIZE],
1576
                'type': props[self.TYPE],
1577
                'hash': props[self.HASH],
1578
                'version': props[self.SERIAL],
1579
                'version_timestamp': props[self.MTIME],
1580
                'modified_by': props[self.MUSER],
1581
                'uuid': props[self.UUID],
1582
                'checksum': props[self.CHECKSUM]}
1583
        if include_user_defined and user_defined != None:
1584
            meta.update(user_defined)
1585
        return meta
1586

    
1587
    def _has_read_access(self, user, path):
1588
        try:
1589
            account, container, object = path.split('/', 2)
1590
        except ValueError:
1591
            raise ValueError('Invalid object path')
1592

    
1593
        assert isinstance(user, basestring), "Invalid user"
1594

    
1595
        try:
1596
            self._can_read(user, account, container, object)
1597
        except NotAllowedError:
1598
            return False
1599
        else:
1600
            return True