Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 71585c27

History | View | Annotate | Download (67 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from astakosclient import AstakosClient
41

    
42
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
43
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
44
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
45
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49
class HashMap(list):
50

    
51
    def __init__(self, blocksize, blockhash):
52
        super(HashMap, self).__init__()
53
        self.blocksize = blocksize
54
        self.blockhash = blockhash
55

    
56
    def _hash_raw(self, v):
57
        h = hashlib.new(self.blockhash)
58
        h.update(v)
59
        return h.digest()
60

    
61
    def hash(self):
62
        if len(self) == 0:
63
            return self._hash_raw('')
64
        if len(self) == 1:
65
            return self.__getitem__(0)
66

    
67
        h = list(self)
68
        s = 2
69
        while s < len(h):
70
            s = s * 2
71
        h += [('\x00' * len(h[0]))] * (s - len(h))
72
        while len(h) > 1:
73
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
74
        return h[0]
75

    
76
# Default modules and settings.
77
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
78
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
79
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
80
DEFAULT_BLOCK_PATH = 'data/'
81
DEFAULT_BLOCK_UMASK = 0o022
82
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
83
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
87
                               'abcdefghijklmnopqrstuvwxyz'
88
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
89
DEFAULT_PUBLIC_URL_SECURITY = 16
90

    
91
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
92
QUEUE_CLIENT_ID = 'pithos'
93
QUEUE_INSTANCE_ID = '1'
94

    
95
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
96

    
97
inf = float('inf')
98

    
99
ULTIMATE_ANSWER = 42
100

    
101
DEFAULT_SOURCE = 'system'
102

    
103
logger = logging.getLogger(__name__)
104

    
105

    
106
def backend_method(func=None, autocommit=1):
107
    if func is None:
108
        def fn(func):
109
            return backend_method(func, autocommit)
110
        return fn
111

    
112
    if not autocommit:
113
        return func
114

    
115
    def fn(self, *args, **kw):
116
        self.wrapper.execute()
117
        serials = []
118
        self.serials = serials
119
        self.messages = []
120

    
121
        try:
122
            ret = func(self, *args, **kw)
123
            for m in self.messages:
124
                self.queue.send(*m)
125
            if self.serials:
126
                self.commission_serials.insert_many(serials)
127

    
128
                # commit to ensure that the serials are registered
129
                # even if accept commission fails
130
                self.wrapper.commit()
131
                self.wrapper.execute()
132

    
133
                r = self.astakosclient.resolve_commissions(
134
                            token=self.service_token,
135
                            accept_serials=self.serials,
136
                            reject_serials=[])
137
                self.commission_serials.delete_many(r['accepted'])
138

    
139
            self.wrapper.commit()
140
            return ret
141
        except:
142
            if self.serials:
143
                self.astakosclient.resolve_commissions(
144
                    token=self.service_token,
145
                    accept_serials=[],
146
                    reject_serials=self.serials)
147
            self.wrapper.rollback()
148
            raise
149
    return fn
150

    
151

    
152
class ModularBackend(BaseBackend):
153
    """A modular backend.
154

155
    Uses modules for SQL functions and storage.
156
    """
157

    
158
    def __init__(self, db_module=None, db_connection=None,
159
                 block_module=None, block_path=None, block_umask=None,
160
                 queue_module=None, queue_hosts=None, queue_exchange=None,
161
                 astakos_url=None, service_token=None,
162
                 astakosclient_poolsize=None,
163
                 free_versioning=True, block_params=None,
164
                 public_url_security=None,
165
                 public_url_alphabet=None,
166
                 account_quota_policy=None,
167
                 container_quota_policy=None,
168
                 container_versioning_policy=None):
169
        db_module = db_module or DEFAULT_DB_MODULE
170
        db_connection = db_connection or DEFAULT_DB_CONNECTION
171
        block_module = block_module or DEFAULT_BLOCK_MODULE
172
        block_path = block_path or DEFAULT_BLOCK_PATH
173
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
174
        block_params = block_params or DEFAULT_BLOCK_PARAMS
175
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
176
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
177
        container_quota_policy = container_quota_policy \
178
            or DEFAULT_CONTAINER_QUOTA
179
        container_versioning_policy = container_versioning_policy \
180
            or DEFAULT_CONTAINER_VERSIONING
181

    
182
        self.default_account_policy = {'quota': account_quota_policy}
183
        self.default_container_policy = {
184
            'quota': container_quota_policy,
185
            'versioning': container_versioning_policy
186
        }
187
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
188
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
189

    
190
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
191
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
192

    
193
        self.hash_algorithm = 'sha256'
194
        self.block_size = 4 * 1024 * 1024  # 4MB
195
        self.free_versioning = free_versioning
196

    
197
        def load_module(m):
198
            __import__(m)
199
            return sys.modules[m]
200

    
201
        self.db_module = load_module(db_module)
202
        self.wrapper = self.db_module.DBWrapper(db_connection)
203
        params = {'wrapper': self.wrapper}
204
        self.permissions = self.db_module.Permissions(**params)
205
        self.config = self.db_module.Config(**params)
206
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
207
        for x in ['READ', 'WRITE']:
208
            setattr(self, x, getattr(self.db_module, x))
209
        self.node = self.db_module.Node(**params)
210
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
211
            setattr(self, x, getattr(self.db_module, x))
212

    
213
        self.block_module = load_module(block_module)
214
        self.block_params = block_params
215
        params = {'path': block_path,
216
                  'block_size': self.block_size,
217
                  'hash_algorithm': self.hash_algorithm,
218
                  'umask': block_umask}
219
        params.update(self.block_params)
220
        self.store = self.block_module.Store(**params)
221

    
222
        if queue_module and queue_hosts:
223
            self.queue_module = load_module(queue_module)
224
            params = {'hosts': queue_hosts,
225
                      'exchange': queue_exchange,
226
                      'client_id': QUEUE_CLIENT_ID}
227
            self.queue = self.queue_module.Queue(**params)
228
        else:
229
            class NoQueue:
230
                def send(self, *args):
231
                    pass
232

    
233
                def close(self):
234
                    pass
235

    
236
            self.queue = NoQueue()
237

    
238
        self.astakos_url = astakos_url
239
        self.service_token = service_token
240
        self.astakosclient = AstakosClient(
241
            astakos_url,
242
            use_pool=True,
243
            pool_size=astakosclient_poolsize)
244

    
245
        self.serials = []
246
        self.messages = []
247

    
248
    def close(self):
249
        self.wrapper.close()
250
        self.queue.close()
251

    
252
    @property
253
    def using_external_quotaholder(self):
254
        return True
255

    
256
    @backend_method
257
    def list_accounts(self, user, marker=None, limit=10000):
258
        """Return a list of accounts the user can access."""
259

    
260
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
261
        allowed = self._allowed_accounts(user)
262
        start, limit = self._list_limits(allowed, marker, limit)
263
        return allowed[start:start + limit]
264

    
265
    @backend_method
266
    def get_account_meta(
267
            self, user, account, domain, until=None, include_user_defined=True,
268
            external_quota=None):
269
        """Return a dictionary with the account metadata for the domain."""
270

    
271
        logger.debug(
272
            "get_account_meta: %s %s %s %s", user, account, domain, until)
273
        path, node = self._lookup_account(account, user == account)
274
        if user != account:
275
            if until or node is None or account not in self._allowed_accounts(user):
276
                raise NotAllowedError
277
        try:
278
            props = self._get_properties(node, until)
279
            mtime = props[self.MTIME]
280
        except NameError:
281
            props = None
282
            mtime = until
283
        count, bytes, tstamp = self._get_statistics(node, until)
284
        tstamp = max(tstamp, mtime)
285
        if until is None:
286
            modified = tstamp
287
        else:
288
            modified = self._get_statistics(
289
                node)[2]  # Overall last modification.
290
            modified = max(modified, mtime)
291

    
292
        if user != account:
293
            meta = {'name': account}
294
        else:
295
            meta = {}
296
            if props is not None and include_user_defined:
297
                meta.update(
298
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
299
            if until is not None:
300
                meta.update({'until_timestamp': tstamp})
301
            meta.update({'name': account, 'count': count, 'bytes': bytes})
302
            if self.using_external_quotaholder:
303
                external_quota = external_quota or {}
304
                meta['bytes'] = external_quota.get('currValue', 0)
305
        meta.update({'modified': modified})
306
        return meta
307

    
308
    @backend_method
309
    def update_account_meta(self, user, account, domain, meta, replace=False):
310
        """Update the metadata associated with the account for the domain."""
311

    
312
        logger.debug("update_account_meta: %s %s %s %s %s", user,
313
                     account, domain, meta, replace)
314
        if user != account:
315
            raise NotAllowedError
316
        path, node = self._lookup_account(account, True)
317
        self._put_metadata(user, node, domain, meta, replace)
318

    
319
    @backend_method
320
    def get_account_groups(self, user, account):
321
        """Return a dictionary with the user groups defined for this account."""
322

    
323
        logger.debug("get_account_groups: %s %s", user, account)
324
        if user != account:
325
            if account not in self._allowed_accounts(user):
326
                raise NotAllowedError
327
            return {}
328
        self._lookup_account(account, True)
329
        return self.permissions.group_dict(account)
330

    
331
    @backend_method
332
    def update_account_groups(self, user, account, groups, replace=False):
333
        """Update the groups associated with the account."""
334

    
335
        logger.debug("update_account_groups: %s %s %s %s", user,
336
                     account, groups, replace)
337
        if user != account:
338
            raise NotAllowedError
339
        self._lookup_account(account, True)
340
        self._check_groups(groups)
341
        if replace:
342
            self.permissions.group_destroy(account)
343
        for k, v in groups.iteritems():
344
            if not replace:  # If not already deleted.
345
                self.permissions.group_delete(account, k)
346
            if v:
347
                self.permissions.group_addmany(account, k, v)
348

    
349
    @backend_method
350
    def get_account_policy(self, user, account, external_quota=None):
351
        """Return a dictionary with the account policy."""
352

    
353
        logger.debug("get_account_policy: %s %s", user, account)
354
        if user != account:
355
            if account not in self._allowed_accounts(user):
356
                raise NotAllowedError
357
            return {}
358
        path, node = self._lookup_account(account, True)
359
        policy = self._get_policy(node, is_account_policy=True)
360
        if self.using_external_quotaholder:
361
            external_quota = external_quota or {}
362
            policy['quota'] = external_quota.get('maxValue', 0)
363
        return policy
364

    
365
    @backend_method
366
    def update_account_policy(self, user, account, policy, replace=False):
367
        """Update the policy associated with the account."""
368

    
369
        logger.debug("update_account_policy: %s %s %s %s", user,
370
                     account, policy, replace)
371
        if user != account:
372
            raise NotAllowedError
373
        path, node = self._lookup_account(account, True)
374
        self._check_policy(policy, is_account_policy=True)
375
        self._put_policy(node, policy, replace, is_account_policy=True)
376

    
377
    @backend_method
378
    def put_account(self, user, account, policy=None):
379
        """Create a new account with the given name."""
380

    
381
        logger.debug("put_account: %s %s %s", user, account, policy)
382
        policy = policy or {}
383
        if user != account:
384
            raise NotAllowedError
385
        node = self.node.node_lookup(account)
386
        if node is not None:
387
            raise AccountExists('Account already exists')
388
        if policy:
389
            self._check_policy(policy, is_account_policy=True)
390
        node = self._put_path(user, self.ROOTNODE, account)
391
        self._put_policy(node, policy, True, is_account_policy=True)
392

    
393
    @backend_method
394
    def delete_account(self, user, account):
395
        """Delete the account with the given name."""
396

    
397
        logger.debug("delete_account: %s %s", user, account)
398
        if user != account:
399
            raise NotAllowedError
400
        node = self.node.node_lookup(account)
401
        if node is None:
402
            return
403
        if not self.node.node_remove(node):
404
            raise AccountNotEmpty('Account is not empty')
405
        self.permissions.group_destroy(account)
406

    
407
    @backend_method
408
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
409
        """Return a list of containers existing under an account."""
410

    
411
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
412
                     account, marker, limit, shared, until, public)
413
        if user != account:
414
            if until or account not in self._allowed_accounts(user):
415
                raise NotAllowedError
416
            allowed = self._allowed_containers(user, account)
417
            start, limit = self._list_limits(allowed, marker, limit)
418
            return allowed[start:start + limit]
419
        if shared or public:
420
            allowed = set()
421
            if shared:
422
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
423
            if public:
424
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
425
            allowed = sorted(allowed)
426
            start, limit = self._list_limits(allowed, marker, limit)
427
            return allowed[start:start + limit]
428
        node = self.node.node_lookup(account)
429
        containers = [x[0] for x in self._list_object_properties(
430
            node, account, '', '/', marker, limit, False, None, [], until)]
431
        start, limit = self._list_limits(
432
            [x[0] for x in containers], marker, limit)
433
        return containers[start:start + limit]
434

    
435
    @backend_method
436
    def list_container_meta(self, user, account, container, domain, until=None):
437
        """Return a list with all the container's object meta keys for the domain."""
438

    
439
        logger.debug("list_container_meta: %s %s %s %s %s", user,
440
                     account, container, domain, until)
441
        allowed = []
442
        if user != account:
443
            if until:
444
                raise NotAllowedError
445
            allowed = self.permissions.access_list_paths(
446
                user, '/'.join((account, container)))
447
            if not allowed:
448
                raise NotAllowedError
449
        path, node = self._lookup_container(account, container)
450
        before = until if until is not None else inf
451
        allowed = self._get_formatted_paths(allowed)
452
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
453

    
454
    @backend_method
455
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
456
        """Return a dictionary with the container metadata for the domain."""
457

    
458
        logger.debug("get_container_meta: %s %s %s %s %s", user,
459
                     account, container, domain, until)
460
        if user != account:
461
            if until or container not in self._allowed_containers(user, account):
462
                raise NotAllowedError
463
        path, node = self._lookup_container(account, container)
464
        props = self._get_properties(node, until)
465
        mtime = props[self.MTIME]
466
        count, bytes, tstamp = self._get_statistics(node, until)
467
        tstamp = max(tstamp, mtime)
468
        if until is None:
469
            modified = tstamp
470
        else:
471
            modified = self._get_statistics(
472
                node)[2]  # Overall last modification.
473
            modified = max(modified, mtime)
474

    
475
        if user != account:
476
            meta = {'name': container}
477
        else:
478
            meta = {}
479
            if include_user_defined:
480
                meta.update(
481
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
482
            if until is not None:
483
                meta.update({'until_timestamp': tstamp})
484
            meta.update({'name': container, 'count': count, 'bytes': bytes})
485
        meta.update({'modified': modified})
486
        return meta
487

    
488
    @backend_method
489
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
490
        """Update the metadata associated with the container for the domain."""
491

    
492
        logger.debug("update_container_meta: %s %s %s %s %s %s",
493
                     user, account, container, domain, meta, replace)
494
        if user != account:
495
            raise NotAllowedError
496
        path, node = self._lookup_container(account, container)
497
        src_version_id, dest_version_id = self._put_metadata(
498
            user, node, domain, meta, replace)
499
        if src_version_id is not None:
500
            versioning = self._get_policy(
501
                node, is_account_policy=False)['versioning']
502
            if versioning != 'auto':
503
                self.node.version_remove(src_version_id)
504

    
505
    @backend_method
506
    def get_container_policy(self, user, account, container):
507
        """Return a dictionary with the container policy."""
508

    
509
        logger.debug(
510
            "get_container_policy: %s %s %s", user, account, container)
511
        if user != account:
512
            if container not in self._allowed_containers(user, account):
513
                raise NotAllowedError
514
            return {}
515
        path, node = self._lookup_container(account, container)
516
        return self._get_policy(node, is_account_policy=False)
517

    
518
    @backend_method
519
    def update_container_policy(self, user, account, container, policy, replace=False):
520
        """Update the policy associated with the container."""
521

    
522
        logger.debug("update_container_policy: %s %s %s %s %s",
523
                     user, account, container, policy, replace)
524
        if user != account:
525
            raise NotAllowedError
526
        path, node = self._lookup_container(account, container)
527
        self._check_policy(policy, is_account_policy=False)
528
        self._put_policy(node, policy, replace, is_account_policy=False)
529

    
530
    @backend_method
531
    def put_container(self, user, account, container, policy=None):
532
        """Create a new container with the given name."""
533

    
534
        logger.debug(
535
            "put_container: %s %s %s %s", user, account, container, policy)
536
        policy = policy or {}
537
        if user != account:
538
            raise NotAllowedError
539
        try:
540
            path, node = self._lookup_container(account, container)
541
        except NameError:
542
            pass
543
        else:
544
            raise ContainerExists('Container already exists')
545
        if policy:
546
            self._check_policy(policy, is_account_policy=False)
547
        path = '/'.join((account, container))
548
        node = self._put_path(
549
            user, self._lookup_account(account, True)[1], path)
550
        self._put_policy(node, policy, True, is_account_policy=False)
551

    
552
    @backend_method
553
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
554
        """Delete/purge the container with the given name."""
555

    
556
        logger.debug("delete_container: %s %s %s %s %s %s", user,
557
                     account, container, until, prefix, delimiter)
558
        if user != account:
559
            raise NotAllowedError
560
        path, node = self._lookup_container(account, container)
561

    
562
        if until is not None:
563
            hashes, size, serials = self.node.node_purge_children(
564
                node, until, CLUSTER_HISTORY)
565
            for h in hashes:
566
                self.store.map_delete(h)
567
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
568
            if not self.free_versioning:
569
                self._report_size_change(
570
                    user, account, -size, {
571
                        'action':'container purge',
572
                        'path': path,
573
                        'versions': ','.join(str(i) for i in serials)
574
                    }
575
                )
576
            return
577

    
578
        if not delimiter:
579
            if self._get_statistics(node)[0] > 0:
580
                raise ContainerNotEmpty('Container is not empty')
581
            hashes, size, serials = self.node.node_purge_children(
582
                node, inf, CLUSTER_HISTORY)
583
            for h in hashes:
584
                self.store.map_delete(h)
585
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
586
            self.node.node_remove(node)
587
            if not self.free_versioning:
588
                self._report_size_change(
589
                    user, account, -size, {
590
                        'action':'container purge',
591
                        'path': path,
592
                        'versions': ','.join(str(i) for i in serials)
593
                    }
594
                )
595
        else:
596
            # remove only contents
597
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
598
            paths = []
599
            for t in src_names:
600
                path = '/'.join((account, container, t[0]))
601
                node = t[2]
602
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
603
                del_size = self._apply_versioning(
604
                    account, container, src_version_id)
605
                self._report_size_change(
606
                        user, account, -del_size, {
607
                                'action': 'object delete',
608
                                'path': path,
609
                        'versions': ','.join([str(dest_version_id)])
610
                     }
611
                )
612
                self._report_object_change(
613
                    user, account, path, details={'action': 'object delete'})
614
                paths.append(path)
615
            self.permissions.access_clear_bulk(paths)
616

    
617
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
618
        if user != account and until:
619
            raise NotAllowedError
620
        if shared and public:
621
            # get shared first
622
            shared_paths = self._list_object_permissions(
623
                user, account, container, prefix, shared=True, public=False)
624
            objects = set()
625
            if shared_paths:
626
                path, node = self._lookup_container(account, container)
627
                shared_paths = self._get_formatted_paths(shared_paths)
628
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
629

    
630
            # get public
631
            objects |= set(self._list_public_object_properties(
632
                user, account, container, prefix, all_props))
633
            objects = list(objects)
634

    
635
            objects.sort(key=lambda x: x[0])
636
            start, limit = self._list_limits(
637
                [x[0] for x in objects], marker, limit)
638
            return objects[start:start + limit]
639
        elif public:
640
            objects = self._list_public_object_properties(
641
                user, account, container, prefix, all_props)
642
            start, limit = self._list_limits(
643
                [x[0] for x in objects], marker, limit)
644
            return objects[start:start + limit]
645

    
646
        allowed = self._list_object_permissions(
647
            user, account, container, prefix, shared, public)
648
        if shared and not allowed:
649
            return []
650
        path, node = self._lookup_container(account, container)
651
        allowed = self._get_formatted_paths(allowed)
652
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
653
        start, limit = self._list_limits(
654
            [x[0] for x in objects], marker, limit)
655
        return objects[start:start + limit]
656

    
657
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
658
        public = self._list_object_permissions(
659
            user, account, container, prefix, shared=False, public=True)
660
        paths, nodes = self._lookup_objects(public)
661
        path = '/'.join((account, container))
662
        cont_prefix = path + '/'
663
        paths = [x[len(cont_prefix):] for x in paths]
664
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
665
        objects = [(path,) + props for path, props in zip(paths, props)]
666
        return objects
667

    
668
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
669
        objects = []
670
        while True:
671
            marker = objects[-1] if objects else None
672
            limit = 10000
673
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
674
            objects.extend(l)
675
            if not l or len(l) < limit:
676
                break
677
        return objects
678

    
679
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
680
        allowed = []
681
        path = '/'.join((account, container, prefix)).rstrip('/')
682
        if user != account:
683
            allowed = self.permissions.access_list_paths(user, path)
684
            if not allowed:
685
                raise NotAllowedError
686
        else:
687
            allowed = set()
688
            if shared:
689
                allowed.update(self.permissions.access_list_shared(path))
690
            if public:
691
                allowed.update(
692
                    [x[0] for x in self.permissions.public_list(path)])
693
            allowed = sorted(allowed)
694
            if not allowed:
695
                return []
696
        return allowed
697

    
698
    @backend_method
699
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
700
        """Return a list of object (name, version_id) tuples existing under a container."""
701

    
702
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
703
        keys = keys or []
704
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
705

    
706
    @backend_method
707
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
708
        """Return a list of object metadata dicts existing under a container."""
709

    
710
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
711
        keys = keys or []
712
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
713
        objects = []
714
        for p in props:
715
            if len(p) == 2:
716
                objects.append({'subdir': p[0]})
717
            else:
718
                objects.append({'name': p[0],
719
                                'bytes': p[self.SIZE + 1],
720
                                'type': p[self.TYPE + 1],
721
                                'hash': p[self.HASH + 1],
722
                                'version': p[self.SERIAL + 1],
723
                                'version_timestamp': p[self.MTIME + 1],
724
                                'modified': p[self.MTIME + 1] if until is None else None,
725
                                'modified_by': p[self.MUSER + 1],
726
                                'uuid': p[self.UUID + 1],
727
                                'checksum': p[self.CHECKSUM + 1]})
728
        return objects
729

    
730
    @backend_method
731
    def list_object_permissions(self, user, account, container, prefix=''):
732
        """Return a list of paths that enforce permissions under a container."""
733

    
734
        logger.debug("list_object_permissions: %s %s %s %s", user,
735
                     account, container, prefix)
736
        return self._list_object_permissions(user, account, container, prefix, True, False)
737

    
738
    @backend_method
739
    def list_object_public(self, user, account, container, prefix=''):
740
        """Return a dict mapping paths to public ids for objects that are public under a container."""
741

    
742
        logger.debug("list_object_public: %s %s %s %s", user,
743
                     account, container, prefix)
744
        public = {}
745
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
746
            public[path] = p
747
        return public
748

    
749
    @backend_method
750
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
751
        """Return a dictionary with the object metadata for the domain."""
752

    
753
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
754
                     account, container, name, domain, version)
755
        self._can_read(user, account, container, name)
756
        path, node = self._lookup_object(account, container, name)
757
        props = self._get_version(node, version)
758
        if version is None:
759
            modified = props[self.MTIME]
760
        else:
761
            try:
762
                modified = self._get_version(
763
                    node)[self.MTIME]  # Overall last modification.
764
            except NameError:  # Object may be deleted.
765
                del_props = self.node.version_lookup(
766
                    node, inf, CLUSTER_DELETED)
767
                if del_props is None:
768
                    raise ItemNotExists('Object does not exist')
769
                modified = del_props[self.MTIME]
770

    
771
        meta = {}
772
        if include_user_defined:
773
            meta.update(
774
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
775
        meta.update({'name': name,
776
                     'bytes': props[self.SIZE],
777
                     'type': props[self.TYPE],
778
                     'hash': props[self.HASH],
779
                     'version': props[self.SERIAL],
780
                     'version_timestamp': props[self.MTIME],
781
                     'modified': modified,
782
                     'modified_by': props[self.MUSER],
783
                     'uuid': props[self.UUID],
784
                     'checksum': props[self.CHECKSUM]})
785
        return meta
786

    
787
    @backend_method
788
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
789
        """Update the metadata associated with the object for the domain and return the new version."""
790

    
791
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
792
                     user, account, container, name, domain, meta, replace)
793
        self._can_write(user, account, container, name)
794
        path, node = self._lookup_object(account, container, name)
795
        src_version_id, dest_version_id = self._put_metadata(
796
            user, node, domain, meta, replace)
797
        self._apply_versioning(account, container, src_version_id)
798
        return dest_version_id
799

    
800
    @backend_method
801
    def get_object_permissions(self, user, account, container, name):
802
        """Return the action allowed on the object, the path
803
        from which the object gets its permissions from,
804
        along with a dictionary containing the permissions."""
805

    
806
        logger.debug("get_object_permissions: %s %s %s %s", user,
807
                     account, container, name)
808
        allowed = 'write'
809
        permissions_path = self._get_permissions_path(account, container, name)
810
        if user != account:
811
            if self.permissions.access_check(permissions_path, self.WRITE, user):
812
                allowed = 'write'
813
            elif self.permissions.access_check(permissions_path, self.READ, user):
814
                allowed = 'read'
815
            else:
816
                raise NotAllowedError
817
        self._lookup_object(account, container, name)
818
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
819

    
820
    @backend_method
821
    def update_object_permissions(self, user, account, container, name, permissions):
822
        """Update the permissions associated with the object."""
823

    
824
        logger.debug("update_object_permissions: %s %s %s %s %s",
825
                     user, account, container, name, permissions)
826
        if user != account:
827
            raise NotAllowedError
828
        path = self._lookup_object(account, container, name)[0]
829
        self._check_permissions(path, permissions)
830
        self.permissions.access_set(path, permissions)
831
        self._report_sharing_change(user, account, path, {'members':
832
                                    self.permissions.access_members(path)})
833

    
834
    @backend_method
835
    def get_object_public(self, user, account, container, name):
836
        """Return the public id of the object if applicable."""
837

    
838
        logger.debug(
839
            "get_object_public: %s %s %s %s", user, account, container, name)
840
        self._can_read(user, account, container, name)
841
        path = self._lookup_object(account, container, name)[0]
842
        p = self.permissions.public_get(path)
843
        return p
844

    
845
    @backend_method
846
    def update_object_public(self, user, account, container, name, public):
847
        """Update the public status of the object."""
848

    
849
        logger.debug("update_object_public: %s %s %s %s %s", user,
850
                     account, container, name, public)
851
        self._can_write(user, account, container, name)
852
        path = self._lookup_object(account, container, name)[0]
853
        if not public:
854
            self.permissions.public_unset(path)
855
        else:
856
            self.permissions.public_set(
857
                path, self.public_url_security, self.public_url_alphabet
858
            )
859

    
860
    @backend_method
861
    def get_object_hashmap(self, user, account, container, name, version=None):
862
        """Return the object's size and a list with partial hashes."""
863

    
864
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
865
                     account, container, name, version)
866
        self._can_read(user, account, container, name)
867
        path, node = self._lookup_object(account, container, name)
868
        props = self._get_version(node, version)
869
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
870
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
871

    
872
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
873
        if permissions is not None and user != account:
874
            raise NotAllowedError
875
        self._can_write(user, account, container, name)
876
        if permissions is not None:
877
            path = '/'.join((account, container, name))
878
            self._check_permissions(path, permissions)
879

    
880
        account_path, account_node = self._lookup_account(account, True)
881
        container_path, container_node = self._lookup_container(
882
            account, container)
883

    
884
        path, node = self._put_object_node(
885
            container_path, container_node, name)
886
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
887

    
888
        # Handle meta.
889
        if src_version_id is None:
890
            src_version_id = pre_version_id
891
        self._put_metadata_duplicate(
892
            src_version_id, dest_version_id, domain, meta, replace_meta)
893

    
894
        del_size = self._apply_versioning(account, container, pre_version_id)
895
        size_delta = size - del_size
896
        if size_delta > 0:
897
            # Check account quota.
898
            if not self.using_external_quotaholder:
899
                account_quota = long(
900
                    self._get_policy(account_node, is_account_policy=True
901
                    )['quota']
902
                )
903
                account_usage = self._get_statistics(account_node)[1]
904
                if (account_quota > 0 and account_usage > account_quota):
905
                    raise QuotaError(
906
                        'Account quota exceeded: limit: %s, usage: %s' % (
907
                            account_quota, account_usage
908
                        )
909
                    )
910

    
911
            # Check container quota.
912
            container_quota = long(
913
                self._get_policy(container_node, is_account_policy=False
914
                )['quota']
915
            )
916
            container_usage = self._get_statistics(container_node)[1]
917
            if (container_quota > 0 and container_usage > container_quota):
918
                # This must be executed in a transaction, so the version is
919
                # never created if it fails.
920
                raise QuotaError(
921
                    'Container quota exceeded: limit: %s, usage: %s' % (
922
                        container_quota, container_usage
923
                    )
924
                )
925

    
926
        self._report_size_change(user, account, size_delta,
927
                                 {'action': 'object update', 'path': path,
928
                                  'versions': ','.join([str(dest_version_id)])})
929
        if permissions is not None:
930
            self.permissions.access_set(path, permissions)
931
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
932

    
933
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
934
        return dest_version_id
935

    
936
    @backend_method
937
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
938
        """Create/update an object with the specified size and partial hashes."""
939

    
940
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
941
                     account, container, name, size, type, hashmap, checksum)
942
        meta = meta or {}
943
        if size == 0:  # No such thing as an empty hashmap.
944
            hashmap = [self.put_block('')]
945
        map = HashMap(self.block_size, self.hash_algorithm)
946
        map.extend([binascii.unhexlify(x) for x in hashmap])
947
        missing = self.store.block_search(map)
948
        if missing:
949
            ie = IndexError()
950
            ie.data = [binascii.hexlify(x) for x in missing]
951
            raise ie
952

    
953
        hash = map.hash()
954
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
955
        self.store.map_put(hash, map)
956
        return dest_version_id
957

    
958
    @backend_method
959
    def update_object_checksum(self, user, account, container, name, version, checksum):
960
        """Update an object's checksum."""
961

    
962
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
963
                     user, account, container, name, version, checksum)
964
        # Update objects with greater version and same hashmap and size (fix metadata updates).
965
        self._can_write(user, account, container, name)
966
        path, node = self._lookup_object(account, container, name)
967
        props = self._get_version(node, version)
968
        versions = self.node.node_get_versions(node)
969
        for x in versions:
970
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
971
                self.node.version_put_property(
972
                    x[self.SERIAL], 'checksum', checksum)
973

    
974
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
975
        dest_meta = dest_meta or {}
976
        dest_version_ids = []
977
        self._can_read(user, src_account, src_container, src_name)
978
        path, node = self._lookup_object(src_account, src_container, src_name)
979
        # TODO: Will do another fetch of the properties in duplicate version...
980
        props = self._get_version(
981
            node, src_version)  # Check to see if source exists.
982
        src_version_id = props[self.SERIAL]
983
        hash = props[self.HASH]
984
        size = props[self.SIZE]
985
        is_copy = not is_move and (src_account, src_container, src_name) != (
986
            dest_account, dest_container, dest_name)  # New uuid.
987
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
988
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
989
            self._delete_object(user, src_account, src_container, src_name)
990

    
991
        if delimiter:
992
            prefix = src_name + \
993
                delimiter if not src_name.endswith(delimiter) else src_name
994
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
995
            src_names.sort(key=lambda x: x[2])  # order by nodes
996
            paths = [elem[0] for elem in src_names]
997
            nodes = [elem[2] for elem in src_names]
998
            # TODO: Will do another fetch of the properties in duplicate version...
999
            props = self._get_versions(nodes)  # Check to see if source exists.
1000

    
1001
            for prop, path, node in zip(props, paths, nodes):
1002
                src_version_id = prop[self.SERIAL]
1003
                hash = prop[self.HASH]
1004
                vtype = prop[self.TYPE]
1005
                size = prop[self.SIZE]
1006
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1007
                    delimiter) else dest_name
1008
                vdest_name = path.replace(prefix, dest_prefix, 1)
1009
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1010
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1011
                    self._delete_object(user, src_account, src_container, path)
1012
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1013

    
1014
    @backend_method
1015
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1016
        """Copy an object's data and metadata."""
1017

    
1018
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1019
        meta = meta or {}
1020
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1021
        return dest_version_id
1022

    
1023
    @backend_method
1024
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1025
        """Move an object's data and metadata."""
1026

    
1027
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1028
        meta = meta or {}
1029
        if user != src_account:
1030
            raise NotAllowedError
1031
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1032
        return dest_version_id
1033

    
1034
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1035
        if user != account:
1036
            raise NotAllowedError
1037

    
1038
        if until is not None:
1039
            path = '/'.join((account, container, name))
1040
            node = self.node.node_lookup(path)
1041
            if node is None:
1042
                return
1043
            hashes = []
1044
            size = 0
1045
            serials = []
1046
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1047
            hashes += h
1048
            size += s
1049
            serials += v
1050
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1051
            hashes += h
1052
            if not self.free_versioning:
1053
                size += s
1054
            serials += v
1055
            for h in hashes:
1056
                self.store.map_delete(h)
1057
            self.node.node_purge(node, until, CLUSTER_DELETED)
1058
            try:
1059
                props = self._get_version(node)
1060
            except NameError:
1061
                self.permissions.access_clear(path)
1062
            self._report_size_change(
1063
                user, account, -size, {
1064
                    'action': 'object purge',
1065
                    'path': path,
1066
                    'versions': ','.join(str(i) for i in serials)
1067
                }
1068
            )
1069
            return
1070

    
1071
        path, node = self._lookup_object(account, container, name)
1072
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1073
        del_size = self._apply_versioning(account, container, src_version_id)
1074
        self._report_size_change(user, account, -del_size,
1075
                                 {'action': 'object delete', 'path': path,
1076
                                  'versions': ','.join([str(dest_version_id)])})
1077
        self._report_object_change(
1078
            user, account, path, details={'action': 'object delete'})
1079
        self.permissions.access_clear(path)
1080

    
1081
        if delimiter:
1082
            prefix = name + delimiter if not name.endswith(delimiter) else name
1083
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1084
            paths = []
1085
            for t in src_names:
1086
                path = '/'.join((account, container, t[0]))
1087
                node = t[2]
1088
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1089
                del_size = self._apply_versioning(
1090
                    account, container, src_version_id)
1091
                self._report_size_change(user, account, -del_size,
1092
                                         {'action': 'object delete',
1093
                                          'path': path,
1094
                                          'versions': ','.join([str(dest_version_id)])})
1095
                self._report_object_change(
1096
                    user, account, path, details={'action': 'object delete'})
1097
                paths.append(path)
1098
            self.permissions.access_clear_bulk(paths)
1099

    
1100
    @backend_method
1101
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1102
        """Delete/purge an object."""
1103

    
1104
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1105
                     account, container, name, until, prefix, delimiter)
1106
        self._delete_object(user, account, container, name, until, delimiter)
1107

    
1108
    @backend_method
1109
    def list_versions(self, user, account, container, name):
1110
        """Return a list of all (version, version_timestamp) tuples for an object."""
1111

    
1112
        logger.debug(
1113
            "list_versions: %s %s %s %s", user, account, container, name)
1114
        self._can_read(user, account, container, name)
1115
        path, node = self._lookup_object(account, container, name)
1116
        versions = self.node.node_get_versions(node)
1117
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1118

    
1119
    @backend_method
1120
    def get_uuid(self, user, uuid):
1121
        """Return the (account, container, name) for the UUID given."""
1122

    
1123
        logger.debug("get_uuid: %s %s", user, uuid)
1124
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1125
        if info is None:
1126
            raise NameError
1127
        path, serial = info
1128
        account, container, name = path.split('/', 2)
1129
        self._can_read(user, account, container, name)
1130
        return (account, container, name)
1131

    
1132
    @backend_method
1133
    def get_public(self, user, public):
1134
        """Return the (account, container, name) for the public id given."""
1135

    
1136
        logger.debug("get_public: %s %s", user, public)
1137
        path = self.permissions.public_path(public)
1138
        if path is None:
1139
            raise NameError
1140
        account, container, name = path.split('/', 2)
1141
        self._can_read(user, account, container, name)
1142
        return (account, container, name)
1143

    
1144
    @backend_method(autocommit=0)
1145
    def get_block(self, hash):
1146
        """Return a block's data."""
1147

    
1148
        logger.debug("get_block: %s", hash)
1149
        block = self.store.block_get(binascii.unhexlify(hash))
1150
        if not block:
1151
            raise ItemNotExists('Block does not exist')
1152
        return block
1153

    
1154
    @backend_method(autocommit=0)
1155
    def put_block(self, data):
1156
        """Store a block and return the hash."""
1157

    
1158
        logger.debug("put_block: %s", len(data))
1159
        return binascii.hexlify(self.store.block_put(data))
1160

    
1161
    @backend_method(autocommit=0)
1162
    def update_block(self, hash, data, offset=0):
1163
        """Update a known block and return the hash."""
1164

    
1165
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1166
        if offset == 0 and len(data) == self.block_size:
1167
            return self.put_block(data)
1168
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1169
        return binascii.hexlify(h)
1170

    
1171
    # Path functions.
1172

    
1173
    def _generate_uuid(self):
1174
        return str(uuidlib.uuid4())
1175

    
1176
    def _put_object_node(self, path, parent, name):
1177
        path = '/'.join((path, name))
1178
        node = self.node.node_lookup(path)
1179
        if node is None:
1180
            node = self.node.node_create(parent, path)
1181
        return path, node
1182

    
1183
    def _put_path(self, user, parent, path):
1184
        node = self.node.node_create(parent, path)
1185
        self.node.version_create(node, None, 0, '', None, user,
1186
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1187
        return node
1188

    
1189
    def _lookup_account(self, account, create=True):
1190
        node = self.node.node_lookup(account)
1191
        if node is None and create:
1192
            node = self._put_path(
1193
                account, self.ROOTNODE, account)  # User is account.
1194
        return account, node
1195

    
1196
    def _lookup_container(self, account, container):
1197
        path = '/'.join((account, container))
1198
        node = self.node.node_lookup(path)
1199
        if node is None:
1200
            raise ItemNotExists('Container does not exist')
1201
        return path, node
1202

    
1203
    def _lookup_object(self, account, container, name):
1204
        path = '/'.join((account, container, name))
1205
        node = self.node.node_lookup(path)
1206
        if node is None:
1207
            raise ItemNotExists('Object does not exist')
1208
        return path, node
1209

    
1210
    def _lookup_objects(self, paths):
1211
        nodes = self.node.node_lookup_bulk(paths)
1212
        return paths, nodes
1213

    
1214
    def _get_properties(self, node, until=None):
1215
        """Return properties until the timestamp given."""
1216

    
1217
        before = until if until is not None else inf
1218
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1219
        if props is None and until is not None:
1220
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1221
        if props is None:
1222
            raise ItemNotExists('Path does not exist')
1223
        return props
1224

    
1225
    def _get_statistics(self, node, until=None):
1226
        """Return count, sum of size and latest timestamp of everything under node."""
1227

    
1228
        if until is None:
1229
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1230
        else:
1231
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1232
        if stats is None:
1233
            stats = (0, 0, 0)
1234
        return stats
1235

    
1236
    def _get_version(self, node, version=None):
1237
        if version is None:
1238
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1239
            if props is None:
1240
                raise ItemNotExists('Object does not exist')
1241
        else:
1242
            try:
1243
                version = int(version)
1244
            except ValueError:
1245
                raise VersionNotExists('Version does not exist')
1246
            props = self.node.version_get_properties(version)
1247
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1248
                raise VersionNotExists('Version does not exist')
1249
        return props
1250

    
1251
    def _get_versions(self, nodes):
1252
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1253

    
1254
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1255
        """Create a new version of the node."""
1256

    
1257
        props = self.node.version_lookup(
1258
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1259
        if props is not None:
1260
            src_version_id = props[self.SERIAL]
1261
            src_hash = props[self.HASH]
1262
            src_size = props[self.SIZE]
1263
            src_type = props[self.TYPE]
1264
            src_checksum = props[self.CHECKSUM]
1265
        else:
1266
            src_version_id = None
1267
            src_hash = None
1268
            src_size = 0
1269
            src_type = ''
1270
            src_checksum = ''
1271
        if size is None:  # Set metadata.
1272
            hash = src_hash  # This way hash can be set to None (account or container).
1273
            size = src_size
1274
        if type is None:
1275
            type = src_type
1276
        if checksum is None:
1277
            checksum = src_checksum
1278
        uuid = self._generate_uuid(
1279
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1280

    
1281
        if src_node is None:
1282
            pre_version_id = src_version_id
1283
        else:
1284
            pre_version_id = None
1285
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1286
            if props is not None:
1287
                pre_version_id = props[self.SERIAL]
1288
        if pre_version_id is not None:
1289
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1290

    
1291
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1292
        return pre_version_id, dest_version_id
1293

    
1294
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1295
        if src_version_id is not None:
1296
            self.node.attribute_copy(src_version_id, dest_version_id)
1297
        if not replace:
1298
            self.node.attribute_del(dest_version_id, domain, (
1299
                k for k, v in meta.iteritems() if v == ''))
1300
            self.node.attribute_set(dest_version_id, domain, (
1301
                (k, v) for k, v in meta.iteritems() if v != ''))
1302
        else:
1303
            self.node.attribute_del(dest_version_id, domain)
1304
            self.node.attribute_set(dest_version_id, domain, ((
1305
                k, v) for k, v in meta.iteritems()))
1306

    
1307
    def _put_metadata(self, user, node, domain, meta, replace=False):
1308
        """Create a new version and store metadata."""
1309

    
1310
        src_version_id, dest_version_id = self._put_version_duplicate(
1311
            user, node)
1312
        self._put_metadata_duplicate(
1313
            src_version_id, dest_version_id, domain, meta, replace)
1314
        return src_version_id, dest_version_id
1315

    
1316
    def _list_limits(self, listing, marker, limit):
1317
        start = 0
1318
        if marker:
1319
            try:
1320
                start = listing.index(marker) + 1
1321
            except ValueError:
1322
                pass
1323
        if not limit or limit > 10000:
1324
            limit = 10000
1325
        return start, limit
1326

    
1327
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1328
        keys = keys or []
1329
        allowed = allowed or []
1330
        cont_prefix = path + '/'
1331
        prefix = cont_prefix + prefix
1332
        start = cont_prefix + marker if marker else None
1333
        before = until if until is not None else inf
1334
        filterq = keys if domain else []
1335
        sizeq = size_range
1336

    
1337
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1338
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1339
        objects.sort(key=lambda x: x[0])
1340
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1341
        return objects
1342

    
1343
    # Reporting functions.
1344

    
1345
    def _report_size_change(self, user, account, size, details=None):
1346
        details = details or {}
1347

    
1348
        if size == 0:
1349
            return
1350

    
1351
        account_node = self._lookup_account(account, True)[1]
1352
        total = self._get_statistics(account_node)[1]
1353
        details.update({'user': user, 'total': total})
1354
        logger.debug(
1355
            "_report_size_change: %s %s %s %s", user, account, size, details)
1356
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1357
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1358
                              float(size), details))
1359

    
1360
        if not self.using_external_quotaholder:
1361
            return
1362

    
1363
        try:
1364
            name = details['path'] if 'path' in details else ''
1365
            serial = self.astakosclient.issue_one_commission(
1366
                token=self.service_token,
1367
                holder=account,
1368
                source=DEFAULT_SOURCE,
1369
                provisions={'pithos.diskspace': size},
1370
                name=name
1371
                )
1372
        except BaseException, e:
1373
            raise QuotaError(e)
1374
        else:
1375
            self.serials.append(serial)
1376

    
1377
    def _report_object_change(self, user, account, path, details=None):
1378
        details = details or {}
1379
        details.update({'user': user})
1380
        logger.debug("_report_object_change: %s %s %s %s", user,
1381
                     account, path, details)
1382
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1383
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1384

    
1385
    def _report_sharing_change(self, user, account, path, details=None):
1386
        logger.debug("_report_permissions_change: %s %s %s %s",
1387
                     user, account, path, details)
1388
        details = details or {}
1389
        details.update({'user': user})
1390
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1391
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1392

    
1393
    # Policy functions.
1394

    
1395
    def _check_policy(self, policy, is_account_policy=True):
1396
        default_policy = self.default_account_policy \
1397
            if is_account_policy else self.default_container_policy
1398
        for k in policy.keys():
1399
            if policy[k] == '':
1400
                policy[k] = default_policy.get(k)
1401
        for k, v in policy.iteritems():
1402
            if k == 'quota':
1403
                q = int(v)  # May raise ValueError.
1404
                if q < 0:
1405
                    raise ValueError
1406
            elif k == 'versioning':
1407
                if v not in ['auto', 'none']:
1408
                    raise ValueError
1409
            else:
1410
                raise ValueError
1411

    
1412
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1413
        default_policy = self.default_account_policy \
1414
            if is_account_policy else self.default_container_policy
1415
        if replace:
1416
            for k, v in default_policy.iteritems():
1417
                if k not in policy:
1418
                    policy[k] = v
1419
        self.node.policy_set(node, policy)
1420

    
1421
    def _get_policy(self, node, is_account_policy=True):
1422
        default_policy = self.default_account_policy \
1423
            if is_account_policy else self.default_container_policy
1424
        policy = default_policy.copy()
1425
        policy.update(self.node.policy_get(node))
1426
        return policy
1427

    
1428
    def _apply_versioning(self, account, container, version_id):
1429
        """Delete the provided version if such is the policy.
1430
           Return size of object removed.
1431
        """
1432

    
1433
        if version_id is None:
1434
            return 0
1435
        path, node = self._lookup_container(account, container)
1436
        versioning = self._get_policy(
1437
            node, is_account_policy=False)['versioning']
1438
        if versioning != 'auto':
1439
            hash, size = self.node.version_remove(version_id)
1440
            self.store.map_delete(hash)
1441
            return size
1442
        elif self.free_versioning:
1443
            return self.node.version_get_properties(
1444
                version_id, keys=('size',))[0]
1445
        return 0
1446

    
1447
    # Access control functions.
1448

    
1449
    def _check_groups(self, groups):
1450
        # raise ValueError('Bad characters in groups')
1451
        pass
1452

    
1453
    def _check_permissions(self, path, permissions):
1454
        # raise ValueError('Bad characters in permissions')
1455
        pass
1456

    
1457
    def _get_formatted_paths(self, paths):
1458
        formatted = []
1459
        for p in paths:
1460
            node = self.node.node_lookup(p)
1461
            props = None
1462
            if node is not None:
1463
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1464
            if props is not None:
1465
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1466
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1467
                formatted.append((p, self.MATCH_EXACT))
1468
        return formatted
1469

    
1470
    def _get_permissions_path(self, account, container, name):
1471
        path = '/'.join((account, container, name))
1472
        permission_paths = self.permissions.access_inherit(path)
1473
        permission_paths.sort()
1474
        permission_paths.reverse()
1475
        for p in permission_paths:
1476
            if p == path:
1477
                return p
1478
            else:
1479
                if p.count('/') < 2:
1480
                    continue
1481
                node = self.node.node_lookup(p)
1482
                props = None
1483
                if node is not None:
1484
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1485
                if props is not None:
1486
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1487
                        return p
1488
        return None
1489

    
1490
    def _can_read(self, user, account, container, name):
1491
        if user == account:
1492
            return True
1493
        path = '/'.join((account, container, name))
1494
        if self.permissions.public_get(path) is not None:
1495
            return True
1496
        path = self._get_permissions_path(account, container, name)
1497
        if not path:
1498
            raise NotAllowedError
1499
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1500
            raise NotAllowedError
1501

    
1502
    def _can_write(self, user, account, container, name):
1503
        if user == account:
1504
            return True
1505
        path = '/'.join((account, container, name))
1506
        path = self._get_permissions_path(account, container, name)
1507
        if not path:
1508
            raise NotAllowedError
1509
        if not self.permissions.access_check(path, self.WRITE, user):
1510
            raise NotAllowedError
1511

    
1512
    def _allowed_accounts(self, user):
1513
        allow = set()
1514
        for path in self.permissions.access_list_paths(user):
1515
            allow.add(path.split('/', 1)[0])
1516
        return sorted(allow)
1517

    
1518
    def _allowed_containers(self, user, account):
1519
        allow = set()
1520
        for path in self.permissions.access_list_paths(user, account):
1521
            allow.add(path.split('/', 2)[1])
1522
        return sorted(allow)
1523

    
1524
    # Domain functions
1525

    
1526
    @backend_method
1527
    def get_domain_objects(self, domain, user=None):
1528
        obj_list = self.node.domain_object_list(domain, CLUSTER_NORMAL)
1529
        if user != None:
1530
            obj_list = [t for t in obj_list \
1531
                if self._has_read_access(user, t[0])]
1532
        return [(path,
1533
                 self._build_metadata(props, user_defined_meta),
1534
                 self.permissions.access_get(path)) \
1535
            for path, props, user_defined_meta in obj_list]
1536

    
1537
    # util functions
1538

    
1539
    def _build_metadata(self, props, user_defined=None,
1540
                        include_user_defined=True):
1541
        meta = {'bytes': props[self.SIZE],
1542
                'type': props[self.TYPE],
1543
                'hash': props[self.HASH],
1544
                'version': props[self.SERIAL],
1545
                'version_timestamp': props[self.MTIME],
1546
                'modified_by': props[self.MUSER],
1547
                'uuid': props[self.UUID],
1548
                'checksum': props[self.CHECKSUM]}
1549
        if include_user_defined and user_defined != None:
1550
            meta.update(user_defined)
1551
        return meta
1552

    
1553
    def _has_read_access(self, user, path):
1554
        try:
1555
            account, container, object = path.split('/', 2)
1556
        except ValueError:
1557
            raise ValueError('Invalid object path')
1558

    
1559
        assert isinstance(user, basestring), "Invalid user"
1560

    
1561
        try:
1562
            self._can_read(user, account, container, object)
1563
        except NotAllowedError:
1564
            return False
1565
        else:
1566
            return True