Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ fed9c5c7

History | View | Annotate | Download (65.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from synnefo.lib.quotaholder import QuotaholderClient
41

    
42
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
43
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
44
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
45
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
88
                               'abcdefghijklmnopqrstuvwxyz'
89
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
90
DEFAULT_PUBLIC_URL_SECURITY = 16
91

    
92
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
93
QUEUE_CLIENT_ID = 'pithos'
94
QUEUE_INSTANCE_ID = '1'
95

    
96
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
97

    
98
inf = float('inf')
99

    
100
ULTIMATE_ANSWER = 42
101

    
102

    
103
logger = logging.getLogger(__name__)
104

    
105

    
106
def backend_method(func=None, autocommit=1):
107
    if func is None:
108
        def fn(func):
109
            return backend_method(func, autocommit)
110
        return fn
111

    
112
    if not autocommit:
113
        return func
114

    
115
    def fn(self, *args, **kw):
116
        self.wrapper.execute()
117
        serials = []
118
        self.serials = serials
119
        self.messages = []
120

    
121
        try:
122
            ret = func(self, *args, **kw)
123
            for m in self.messages:
124
                self.queue.send(*m)
125
            if serials:
126
                self.quotaholder_serials.insert_many(serials)
127
                self.quotaholder.accept_commission(
128
                            context     =   {},
129
                            clientkey   =   'pithos',
130
                            serials     =   serials)
131
            self.wrapper.commit()
132
            return ret
133
        except:
134
            if serials:
135
                self.quotaholder.reject_commission(
136
                            context     =   {},
137
                            clientkey   =   'pithos',
138
                            serials     =   serials)
139
            self.wrapper.rollback()
140
            raise
141
    return fn
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 queue_module=None, queue_hosts=None, queue_exchange=None,
153
                 quotaholder_enabled=False,
154
                 quotaholder_url=None, quotaholder_token=None,
155
                 quotaholder_client_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
169
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
170
        container_quota_policy = container_quota_policy \
171
            or DEFAULT_CONTAINER_QUOTA
172
        container_versioning_policy = container_versioning_policy \
173
            or DEFAULT_CONTAINER_VERSIONING
174

    
175
        self.default_account_policy = {'quota': account_quota_policy}
176
        self.default_container_policy = {
177
            'quota': container_quota_policy,
178
            'versioning': container_versioning_policy
179
        }
180
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
181
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
182

    
183
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
184
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
185

    
186
        self.hash_algorithm = 'sha256'
187
        self.block_size = 4 * 1024 * 1024  # 4MB
188
        self.free_versioning = free_versioning
189

    
190
        def load_module(m):
191
            __import__(m)
192
            return sys.modules[m]
193

    
194
        self.db_module = load_module(db_module)
195
        self.wrapper = self.db_module.DBWrapper(db_connection)
196
        params = {'wrapper': self.wrapper}
197
        self.permissions = self.db_module.Permissions(**params)
198
        self.config = self.db_module.Config(**params)
199
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
200
        for x in ['READ', 'WRITE']:
201
            setattr(self, x, getattr(self.db_module, x))
202
        self.node = self.db_module.Node(**params)
203
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
204
            setattr(self, x, getattr(self.db_module, x))
205

    
206
        self.block_module = load_module(block_module)
207
        self.block_params = block_params
208
        params = {'path': block_path,
209
                  'block_size': self.block_size,
210
                  'hash_algorithm': self.hash_algorithm,
211
                  'umask': block_umask}
212
        params.update(self.block_params)
213
        self.store = self.block_module.Store(**params)
214

    
215
        if queue_module and queue_hosts:
216
            self.queue_module = load_module(queue_module)
217
            params = {'hosts': queue_hosts,
218
                      'exchange': queue_exchange,
219
                      'client_id': QUEUE_CLIENT_ID}
220
            self.queue = self.queue_module.Queue(**params)
221
        else:
222
            class NoQueue:
223
                def send(self, *args):
224
                    pass
225

    
226
                def close(self):
227
                    pass
228

    
229
            self.queue = NoQueue()
230

    
231
        self.quotaholder_enabled = quotaholder_enabled
232
        if quotaholder_enabled:
233
            self.quotaholder_url = quotaholder_url
234
            self.quotaholder_token = quotaholder_token
235
            self.quotaholder = QuotaholderClient(
236
                                    quotaholder_url,
237
                                    token=quotaholder_token,
238
                                    poolsize=quotaholder_client_poolsize)
239

    
240
        self.serials = []
241
        self.messages = []
242

    
243
    def close(self):
244
        self.wrapper.close()
245
        self.queue.close()
246

    
247
    @property
248
    def using_external_quotaholder(self):
249
        return self.quotaholder_enabled
250

    
251
    @backend_method
252
    def list_accounts(self, user, marker=None, limit=10000):
253
        """Return a list of accounts the user can access."""
254

    
255
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
256
        allowed = self._allowed_accounts(user)
257
        start, limit = self._list_limits(allowed, marker, limit)
258
        return allowed[start:start + limit]
259

    
260
    @backend_method
261
    def get_account_meta(
262
            self, user, account, domain, until=None, include_user_defined=True,
263
            external_quota=None):
264
        """Return a dictionary with the account metadata for the domain."""
265

    
266
        logger.debug(
267
            "get_account_meta: %s %s %s %s", user, account, domain, until)
268
        path, node = self._lookup_account(account, user == account)
269
        if user != account:
270
            if until or node is None or account not in self._allowed_accounts(user):
271
                raise NotAllowedError
272
        try:
273
            props = self._get_properties(node, until)
274
            mtime = props[self.MTIME]
275
        except NameError:
276
            props = None
277
            mtime = until
278
        count, bytes, tstamp = self._get_statistics(node, until)
279
        tstamp = max(tstamp, mtime)
280
        if until is None:
281
            modified = tstamp
282
        else:
283
            modified = self._get_statistics(
284
                node)[2]  # Overall last modification.
285
            modified = max(modified, mtime)
286

    
287
        if user != account:
288
            meta = {'name': account}
289
        else:
290
            meta = {}
291
            if props is not None and include_user_defined:
292
                meta.update(
293
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
294
            if until is not None:
295
                meta.update({'until_timestamp': tstamp})
296
            meta.update({'name': account, 'count': count, 'bytes': bytes})
297
            if self.using_external_quotaholder:
298
                external_quota = external_quota or {}
299
                meta['bytes'] = external_quota.get('currValue', 0)
300
        meta.update({'modified': modified})
301
        return meta
302

    
303
    @backend_method
304
    def update_account_meta(self, user, account, domain, meta, replace=False):
305
        """Update the metadata associated with the account for the domain."""
306

    
307
        logger.debug("update_account_meta: %s %s %s %s %s", user,
308
                     account, domain, meta, replace)
309
        if user != account:
310
            raise NotAllowedError
311
        path, node = self._lookup_account(account, True)
312
        self._put_metadata(user, node, domain, meta, replace)
313

    
314
    @backend_method
315
    def get_account_groups(self, user, account):
316
        """Return a dictionary with the user groups defined for this account."""
317

    
318
        logger.debug("get_account_groups: %s %s", user, account)
319
        if user != account:
320
            if account not in self._allowed_accounts(user):
321
                raise NotAllowedError
322
            return {}
323
        self._lookup_account(account, True)
324
        return self.permissions.group_dict(account)
325

    
326
    @backend_method
327
    def update_account_groups(self, user, account, groups, replace=False):
328
        """Update the groups associated with the account."""
329

    
330
        logger.debug("update_account_groups: %s %s %s %s", user,
331
                     account, groups, replace)
332
        if user != account:
333
            raise NotAllowedError
334
        self._lookup_account(account, True)
335
        self._check_groups(groups)
336
        if replace:
337
            self.permissions.group_destroy(account)
338
        for k, v in groups.iteritems():
339
            if not replace:  # If not already deleted.
340
                self.permissions.group_delete(account, k)
341
            if v:
342
                self.permissions.group_addmany(account, k, v)
343

    
344
    @backend_method
345
    def get_account_policy(self, user, account, external_quota=None):
346
        """Return a dictionary with the account policy."""
347

    
348
        logger.debug("get_account_policy: %s %s", user, account)
349
        if user != account:
350
            if account not in self._allowed_accounts(user):
351
                raise NotAllowedError
352
            return {}
353
        path, node = self._lookup_account(account, True)
354
        policy = self._get_policy(node, is_account_policy=True)
355
        if self.using_external_quotaholder:
356
            external_quota = external_quota or {}
357
            policy['quota'] = external_quota.get('maxValue', 0)
358
        return policy
359

    
360
    @backend_method
361
    def update_account_policy(self, user, account, policy, replace=False):
362
        """Update the policy associated with the account."""
363

    
364
        logger.debug("update_account_policy: %s %s %s %s", user,
365
                     account, policy, replace)
366
        if user != account:
367
            raise NotAllowedError
368
        path, node = self._lookup_account(account, True)
369
        self._check_policy(policy, is_account_policy=True)
370
        self._put_policy(node, policy, replace, is_account_policy=True)
371

    
372
    @backend_method
373
    def put_account(self, user, account, policy=None):
374
        """Create a new account with the given name."""
375

    
376
        logger.debug("put_account: %s %s %s", user, account, policy)
377
        policy = policy or {}
378
        if user != account:
379
            raise NotAllowedError
380
        node = self.node.node_lookup(account)
381
        if node is not None:
382
            raise AccountExists('Account already exists')
383
        if policy:
384
            self._check_policy(policy, is_account_policy=True)
385
        node = self._put_path(user, self.ROOTNODE, account)
386
        self._put_policy(node, policy, True, is_account_policy=True)
387

    
388
    @backend_method
389
    def delete_account(self, user, account):
390
        """Delete the account with the given name."""
391

    
392
        logger.debug("delete_account: %s %s", user, account)
393
        if user != account:
394
            raise NotAllowedError
395
        node = self.node.node_lookup(account)
396
        if node is None:
397
            return
398
        if not self.node.node_remove(node):
399
            raise AccountNotEmpty('Account is not empty')
400
        self.permissions.group_destroy(account)
401

    
402
    @backend_method
403
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
404
        """Return a list of containers existing under an account."""
405

    
406
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
407
                     account, marker, limit, shared, until, public)
408
        if user != account:
409
            if until or account not in self._allowed_accounts(user):
410
                raise NotAllowedError
411
            allowed = self._allowed_containers(user, account)
412
            start, limit = self._list_limits(allowed, marker, limit)
413
            return allowed[start:start + limit]
414
        if shared or public:
415
            allowed = set()
416
            if shared:
417
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
418
            if public:
419
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
420
            allowed = sorted(allowed)
421
            start, limit = self._list_limits(allowed, marker, limit)
422
            return allowed[start:start + limit]
423
        node = self.node.node_lookup(account)
424
        containers = [x[0] for x in self._list_object_properties(
425
            node, account, '', '/', marker, limit, False, None, [], until)]
426
        start, limit = self._list_limits(
427
            [x[0] for x in containers], marker, limit)
428
        return containers[start:start + limit]
429

    
430
    @backend_method
431
    def list_container_meta(self, user, account, container, domain, until=None):
432
        """Return a list with all the container's object meta keys for the domain."""
433

    
434
        logger.debug("list_container_meta: %s %s %s %s %s", user,
435
                     account, container, domain, until)
436
        allowed = []
437
        if user != account:
438
            if until:
439
                raise NotAllowedError
440
            allowed = self.permissions.access_list_paths(
441
                user, '/'.join((account, container)))
442
            if not allowed:
443
                raise NotAllowedError
444
        path, node = self._lookup_container(account, container)
445
        before = until if until is not None else inf
446
        allowed = self._get_formatted_paths(allowed)
447
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
448

    
449
    @backend_method
450
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
451
        """Return a dictionary with the container metadata for the domain."""
452

    
453
        logger.debug("get_container_meta: %s %s %s %s %s", user,
454
                     account, container, domain, until)
455
        if user != account:
456
            if until or container not in self._allowed_containers(user, account):
457
                raise NotAllowedError
458
        path, node = self._lookup_container(account, container)
459
        props = self._get_properties(node, until)
460
        mtime = props[self.MTIME]
461
        count, bytes, tstamp = self._get_statistics(node, until)
462
        tstamp = max(tstamp, mtime)
463
        if until is None:
464
            modified = tstamp
465
        else:
466
            modified = self._get_statistics(
467
                node)[2]  # Overall last modification.
468
            modified = max(modified, mtime)
469

    
470
        if user != account:
471
            meta = {'name': container}
472
        else:
473
            meta = {}
474
            if include_user_defined:
475
                meta.update(
476
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
477
            if until is not None:
478
                meta.update({'until_timestamp': tstamp})
479
            meta.update({'name': container, 'count': count, 'bytes': bytes})
480
        meta.update({'modified': modified})
481
        return meta
482

    
483
    @backend_method
484
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
485
        """Update the metadata associated with the container for the domain."""
486

    
487
        logger.debug("update_container_meta: %s %s %s %s %s %s",
488
                     user, account, container, domain, meta, replace)
489
        if user != account:
490
            raise NotAllowedError
491
        path, node = self._lookup_container(account, container)
492
        src_version_id, dest_version_id = self._put_metadata(
493
            user, node, domain, meta, replace)
494
        if src_version_id is not None:
495
            versioning = self._get_policy(
496
                node, is_account_policy=False)['versioning']
497
            if versioning != 'auto':
498
                self.node.version_remove(src_version_id)
499

    
500
    @backend_method
501
    def get_container_policy(self, user, account, container):
502
        """Return a dictionary with the container policy."""
503

    
504
        logger.debug(
505
            "get_container_policy: %s %s %s", user, account, container)
506
        if user != account:
507
            if container not in self._allowed_containers(user, account):
508
                raise NotAllowedError
509
            return {}
510
        path, node = self._lookup_container(account, container)
511
        return self._get_policy(node, is_account_policy=False)
512

    
513
    @backend_method
514
    def update_container_policy(self, user, account, container, policy, replace=False):
515
        """Update the policy associated with the container."""
516

    
517
        logger.debug("update_container_policy: %s %s %s %s %s",
518
                     user, account, container, policy, replace)
519
        if user != account:
520
            raise NotAllowedError
521
        path, node = self._lookup_container(account, container)
522
        self._check_policy(policy, is_account_policy=False)
523
        self._put_policy(node, policy, replace, is_account_policy=False)
524

    
525
    @backend_method
526
    def put_container(self, user, account, container, policy=None):
527
        """Create a new container with the given name."""
528

    
529
        logger.debug(
530
            "put_container: %s %s %s %s", user, account, container, policy)
531
        policy = policy or {}
532
        if user != account:
533
            raise NotAllowedError
534
        try:
535
            path, node = self._lookup_container(account, container)
536
        except NameError:
537
            pass
538
        else:
539
            raise ContainerExists('Container already exists')
540
        if policy:
541
            self._check_policy(policy, is_account_policy=False)
542
        path = '/'.join((account, container))
543
        node = self._put_path(
544
            user, self._lookup_account(account, True)[1], path)
545
        self._put_policy(node, policy, True, is_account_policy=False)
546

    
547
    @backend_method
548
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
549
        """Delete/purge the container with the given name."""
550

    
551
        logger.debug("delete_container: %s %s %s %s %s %s", user,
552
                     account, container, until, prefix, delimiter)
553
        if user != account:
554
            raise NotAllowedError
555
        path, node = self._lookup_container(account, container)
556

    
557
        if until is not None:
558
            hashes, size, serials = self.node.node_purge_children(
559
                node, until, CLUSTER_HISTORY)
560
            for h in hashes:
561
                self.store.map_delete(h)
562
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
563
            if not self.free_versioning:
564
                self._report_size_change(
565
                    user, account, -size, {
566
                        'action':'container purge',
567
                        'path': path,
568
                        'versions': ','.join(str(i) for i in serials)
569
                    }
570
                )
571
            return
572

    
573
        if not delimiter:
574
            if self._get_statistics(node)[0] > 0:
575
                raise ContainerNotEmpty('Container is not empty')
576
            hashes, size, serials = self.node.node_purge_children(
577
                node, inf, CLUSTER_HISTORY)
578
            for h in hashes:
579
                self.store.map_delete(h)
580
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
581
            self.node.node_remove(node)
582
            if not self.free_versioning:
583
                self._report_size_change(
584
                    user, account, -size, {
585
                        'action':'container purge',
586
                        'path': path,
587
                        'versions': ','.join(str(i) for i in serials)
588
                    }
589
                )
590
        else:
591
            # remove only contents
592
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
593
            paths = []
594
            for t in src_names:
595
                path = '/'.join((account, container, t[0]))
596
                node = t[2]
597
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
598
                del_size = self._apply_versioning(
599
                    account, container, src_version_id)
600
                self._report_size_change(
601
                        user, account, -del_size, {
602
                                'action': 'object delete',
603
                                'path': path,
604
                        'versions': ','.join([str(dest_version_id)])
605
                     }
606
                )
607
                self._report_object_change(
608
                    user, account, path, details={'action': 'object delete'})
609
                paths.append(path)
610
            self.permissions.access_clear_bulk(paths)
611

    
612
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
613
        if user != account and until:
614
            raise NotAllowedError
615
        if shared and public:
616
            # get shared first
617
            shared = self._list_object_permissions(
618
                user, account, container, prefix, shared=True, public=False)
619
            objects = set()
620
            if shared:
621
                path, node = self._lookup_container(account, container)
622
                shared = self._get_formatted_paths(shared)
623
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
624

    
625
            # get public
626
            objects |= set(self._list_public_object_properties(
627
                user, account, container, prefix, all_props))
628
            objects = list(objects)
629

    
630
            objects.sort(key=lambda x: x[0])
631
            start, limit = self._list_limits(
632
                [x[0] for x in objects], marker, limit)
633
            return objects[start:start + limit]
634
        elif public:
635
            objects = self._list_public_object_properties(
636
                user, account, container, prefix, all_props)
637
            start, limit = self._list_limits(
638
                [x[0] for x in objects], marker, limit)
639
            return objects[start:start + limit]
640

    
641
        allowed = self._list_object_permissions(
642
            user, account, container, prefix, shared, public)
643
        if shared and not allowed:
644
            return []
645
        path, node = self._lookup_container(account, container)
646
        allowed = self._get_formatted_paths(allowed)
647
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
648
        start, limit = self._list_limits(
649
            [x[0] for x in objects], marker, limit)
650
        return objects[start:start + limit]
651

    
652
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
653
        public = self._list_object_permissions(
654
            user, account, container, prefix, shared=False, public=True)
655
        paths, nodes = self._lookup_objects(public)
656
        path = '/'.join((account, container))
657
        cont_prefix = path + '/'
658
        paths = [x[len(cont_prefix):] for x in paths]
659
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
660
        objects = [(path,) + props for path, props in zip(paths, props)]
661
        return objects
662

    
663
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
664
        objects = []
665
        while True:
666
            marker = objects[-1] if objects else None
667
            limit = 10000
668
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
669
            objects.extend(l)
670
            if not l or len(l) < limit:
671
                break
672
        return objects
673

    
674
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
675
        allowed = []
676
        path = '/'.join((account, container, prefix)).rstrip('/')
677
        if user != account:
678
            allowed = self.permissions.access_list_paths(user, path)
679
            if not allowed:
680
                raise NotAllowedError
681
        else:
682
            allowed = set()
683
            if shared:
684
                allowed.update(self.permissions.access_list_shared(path))
685
            if public:
686
                allowed.update(
687
                    [x[0] for x in self.permissions.public_list(path)])
688
            allowed = sorted(allowed)
689
            if not allowed:
690
                return []
691
        return allowed
692

    
693
    @backend_method
694
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
695
        """Return a list of object (name, version_id) tuples existing under a container."""
696

    
697
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
698
        keys = keys or []
699
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
700

    
701
    @backend_method
702
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
703
        """Return a list of object metadata dicts existing under a container."""
704

    
705
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
706
        keys = keys or []
707
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
708
        objects = []
709
        for p in props:
710
            if len(p) == 2:
711
                objects.append({'subdir': p[0]})
712
            else:
713
                objects.append({'name': p[0],
714
                                'bytes': p[self.SIZE + 1],
715
                                'type': p[self.TYPE + 1],
716
                                'hash': p[self.HASH + 1],
717
                                'version': p[self.SERIAL + 1],
718
                                'version_timestamp': p[self.MTIME + 1],
719
                                'modified': p[self.MTIME + 1] if until is None else None,
720
                                'modified_by': p[self.MUSER + 1],
721
                                'uuid': p[self.UUID + 1],
722
                                'checksum': p[self.CHECKSUM + 1]})
723
        return objects
724

    
725
    @backend_method
726
    def list_object_permissions(self, user, account, container, prefix=''):
727
        """Return a list of paths that enforce permissions under a container."""
728

    
729
        logger.debug("list_object_permissions: %s %s %s %s", user,
730
                     account, container, prefix)
731
        return self._list_object_permissions(user, account, container, prefix, True, False)
732

    
733
    @backend_method
734
    def list_object_public(self, user, account, container, prefix=''):
735
        """Return a dict mapping paths to public ids for objects that are public under a container."""
736

    
737
        logger.debug("list_object_public: %s %s %s %s", user,
738
                     account, container, prefix)
739
        public = {}
740
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
741
            public[path] = p
742
        return public
743

    
744
    @backend_method
745
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
746
        """Return a dictionary with the object metadata for the domain."""
747

    
748
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
749
                     account, container, name, domain, version)
750
        self._can_read(user, account, container, name)
751
        path, node = self._lookup_object(account, container, name)
752
        props = self._get_version(node, version)
753
        if version is None:
754
            modified = props[self.MTIME]
755
        else:
756
            try:
757
                modified = self._get_version(
758
                    node)[self.MTIME]  # Overall last modification.
759
            except NameError:  # Object may be deleted.
760
                del_props = self.node.version_lookup(
761
                    node, inf, CLUSTER_DELETED)
762
                if del_props is None:
763
                    raise ItemNotExists('Object does not exist')
764
                modified = del_props[self.MTIME]
765

    
766
        meta = {}
767
        if include_user_defined:
768
            meta.update(
769
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
770
        meta.update({'name': name,
771
                     'bytes': props[self.SIZE],
772
                     'type': props[self.TYPE],
773
                     'hash': props[self.HASH],
774
                     'version': props[self.SERIAL],
775
                     'version_timestamp': props[self.MTIME],
776
                     'modified': modified,
777
                     'modified_by': props[self.MUSER],
778
                     'uuid': props[self.UUID],
779
                     'checksum': props[self.CHECKSUM]})
780
        return meta
781

    
782
    @backend_method
783
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
784
        """Update the metadata associated with the object for the domain and return the new version."""
785

    
786
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
787
                     user, account, container, name, domain, meta, replace)
788
        self._can_write(user, account, container, name)
789
        path, node = self._lookup_object(account, container, name)
790
        src_version_id, dest_version_id = self._put_metadata(
791
            user, node, domain, meta, replace)
792
        self._apply_versioning(account, container, src_version_id)
793
        return dest_version_id
794

    
795
    @backend_method
796
    def get_object_permissions(self, user, account, container, name):
797
        """Return the action allowed on the object, the path
798
        from which the object gets its permissions from,
799
        along with a dictionary containing the permissions."""
800

    
801
        logger.debug("get_object_permissions: %s %s %s %s", user,
802
                     account, container, name)
803
        allowed = 'write'
804
        permissions_path = self._get_permissions_path(account, container, name)
805
        if user != account:
806
            if self.permissions.access_check(permissions_path, self.WRITE, user):
807
                allowed = 'write'
808
            elif self.permissions.access_check(permissions_path, self.READ, user):
809
                allowed = 'read'
810
            else:
811
                raise NotAllowedError
812
        self._lookup_object(account, container, name)
813
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
814

    
815
    @backend_method
816
    def update_object_permissions(self, user, account, container, name, permissions):
817
        """Update the permissions associated with the object."""
818

    
819
        logger.debug("update_object_permissions: %s %s %s %s %s",
820
                     user, account, container, name, permissions)
821
        if user != account:
822
            raise NotAllowedError
823
        path = self._lookup_object(account, container, name)[0]
824
        self._check_permissions(path, permissions)
825
        self.permissions.access_set(path, permissions)
826
        self._report_sharing_change(user, account, path, {'members':
827
                                    self.permissions.access_members(path)})
828

    
829
    @backend_method
830
    def get_object_public(self, user, account, container, name):
831
        """Return the public id of the object if applicable."""
832

    
833
        logger.debug(
834
            "get_object_public: %s %s %s %s", user, account, container, name)
835
        self._can_read(user, account, container, name)
836
        path = self._lookup_object(account, container, name)[0]
837
        p = self.permissions.public_get(path)
838
        return p
839

    
840
    @backend_method
841
    def update_object_public(self, user, account, container, name, public):
842
        """Update the public status of the object."""
843

    
844
        logger.debug("update_object_public: %s %s %s %s %s", user,
845
                     account, container, name, public)
846
        self._can_write(user, account, container, name)
847
        path = self._lookup_object(account, container, name)[0]
848
        if not public:
849
            self.permissions.public_unset(path)
850
        else:
851
            self.permissions.public_set(
852
                path, self.public_url_security, self.public_url_alphabet
853
            )
854

    
855
    @backend_method
856
    def get_object_hashmap(self, user, account, container, name, version=None):
857
        """Return the object's size and a list with partial hashes."""
858

    
859
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
860
                     account, container, name, version)
861
        self._can_read(user, account, container, name)
862
        path, node = self._lookup_object(account, container, name)
863
        props = self._get_version(node, version)
864
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
865
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
866

    
867
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
868
        if permissions is not None and user != account:
869
            raise NotAllowedError
870
        self._can_write(user, account, container, name)
871
        if permissions is not None:
872
            path = '/'.join((account, container, name))
873
            self._check_permissions(path, permissions)
874

    
875
        account_path, account_node = self._lookup_account(account, True)
876
        container_path, container_node = self._lookup_container(
877
            account, container)
878

    
879
        path, node = self._put_object_node(
880
            container_path, container_node, name)
881
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
882

    
883
        # Handle meta.
884
        if src_version_id is None:
885
            src_version_id = pre_version_id
886
        self._put_metadata_duplicate(
887
            src_version_id, dest_version_id, domain, meta, replace_meta)
888

    
889
        del_size = self._apply_versioning(account, container, pre_version_id)
890
        size_delta = size - del_size
891
        if size_delta > 0:
892
            # Check account quota.
893
            if not self.using_external_quotaholder:
894
                account_quota = long(
895
                    self._get_policy(account_node, is_account_policy=True
896
                    )['quota']
897
                )
898
                account_usage = self._get_statistics(account_node)[1]
899
                if (account_quota > 0 and account_usage > account_quota):
900
                    raise QuotaError(
901
                        'Account quota exceeded: limit: %s, usage: %s' % (
902
                            account_quota, account_usage
903
                        )
904
                    )
905

    
906
            # Check container quota.
907
            container_quota = long(
908
                self._get_policy(container_node, is_account_policy=False
909
                )['quota']
910
            )
911
            container_usage = self._get_statistics(container_node)[1]
912
            if (container_quota > 0 and container_usage > container_quota):
913
                # This must be executed in a transaction, so the version is
914
                # never created if it fails.
915
                raise QuotaError(
916
                    'Container quota exceeded: limit: %s, usage: %s' % (
917
                        container_quota, container_usage
918
                    )
919
                )
920

    
921
        self._report_size_change(user, account, size_delta,
922
                                 {'action': 'object update', 'path': path,
923
                                  'versions': ','.join([str(dest_version_id)])})
924
        if permissions is not None:
925
            self.permissions.access_set(path, permissions)
926
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
927

    
928
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
929
        return dest_version_id
930

    
931
    @backend_method
932
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
933
        """Create/update an object with the specified size and partial hashes."""
934

    
935
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
936
                     account, container, name, size, type, hashmap, checksum)
937
        meta = meta or {}
938
        if size == 0:  # No such thing as an empty hashmap.
939
            hashmap = [self.put_block('')]
940
        map = HashMap(self.block_size, self.hash_algorithm)
941
        map.extend([binascii.unhexlify(x) for x in hashmap])
942
        missing = self.store.block_search(map)
943
        if missing:
944
            ie = IndexError()
945
            ie.data = [binascii.hexlify(x) for x in missing]
946
            raise ie
947

    
948
        hash = map.hash()
949
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
950
        self.store.map_put(hash, map)
951
        return dest_version_id
952

    
953
    @backend_method
954
    def update_object_checksum(self, user, account, container, name, version, checksum):
955
        """Update an object's checksum."""
956

    
957
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
958
                     user, account, container, name, version, checksum)
959
        # Update objects with greater version and same hashmap and size (fix metadata updates).
960
        self._can_write(user, account, container, name)
961
        path, node = self._lookup_object(account, container, name)
962
        props = self._get_version(node, version)
963
        versions = self.node.node_get_versions(node)
964
        for x in versions:
965
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
966
                self.node.version_put_property(
967
                    x[self.SERIAL], 'checksum', checksum)
968

    
969
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
970
        dest_meta = dest_meta or {}
971
        dest_version_ids = []
972
        self._can_read(user, src_account, src_container, src_name)
973
        path, node = self._lookup_object(src_account, src_container, src_name)
974
        # TODO: Will do another fetch of the properties in duplicate version...
975
        props = self._get_version(
976
            node, src_version)  # Check to see if source exists.
977
        src_version_id = props[self.SERIAL]
978
        hash = props[self.HASH]
979
        size = props[self.SIZE]
980
        is_copy = not is_move and (src_account, src_container, src_name) != (
981
            dest_account, dest_container, dest_name)  # New uuid.
982
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
983
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
984
            self._delete_object(user, src_account, src_container, src_name)
985

    
986
        if delimiter:
987
            prefix = src_name + \
988
                delimiter if not src_name.endswith(delimiter) else src_name
989
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
990
            src_names.sort(key=lambda x: x[2])  # order by nodes
991
            paths = [elem[0] for elem in src_names]
992
            nodes = [elem[2] for elem in src_names]
993
            # TODO: Will do another fetch of the properties in duplicate version...
994
            props = self._get_versions(nodes)  # Check to see if source exists.
995

    
996
            for prop, path, node in zip(props, paths, nodes):
997
                src_version_id = prop[self.SERIAL]
998
                hash = prop[self.HASH]
999
                vtype = prop[self.TYPE]
1000
                size = prop[self.SIZE]
1001
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1002
                    delimiter) else dest_name
1003
                vdest_name = path.replace(prefix, dest_prefix, 1)
1004
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1005
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1006
                    self._delete_object(user, src_account, src_container, path)
1007
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1008

    
1009
    @backend_method
1010
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1011
        """Copy an object's data and metadata."""
1012

    
1013
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1014
        meta = meta or {}
1015
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1016
        return dest_version_id
1017

    
1018
    @backend_method
1019
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1020
        """Move an object's data and metadata."""
1021

    
1022
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1023
        meta = meta or {}
1024
        if user != src_account:
1025
            raise NotAllowedError
1026
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1027
        return dest_version_id
1028

    
1029
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1030
        if user != account:
1031
            raise NotAllowedError
1032

    
1033
        if until is not None:
1034
            path = '/'.join((account, container, name))
1035
            node = self.node.node_lookup(path)
1036
            if node is None:
1037
                return
1038
            hashes = []
1039
            size = 0
1040
            serials = []
1041
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1042
            hashes += h
1043
            size += s
1044
            serials += v
1045
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1046
            hashes += h
1047
            if not self.free_versioning:
1048
                size += s
1049
            serials += v
1050
            for h in hashes:
1051
                self.store.map_delete(h)
1052
            self.node.node_purge(node, until, CLUSTER_DELETED)
1053
            try:
1054
                props = self._get_version(node)
1055
            except NameError:
1056
                self.permissions.access_clear(path)
1057
            self._report_size_change(
1058
                user, account, -size, {
1059
                    'action': 'object purge',
1060
                    'path': path,
1061
                    'versions': ','.join(str(i) for i in serials)
1062
                }
1063
            )
1064
            return
1065

    
1066
        path, node = self._lookup_object(account, container, name)
1067
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1068
        del_size = self._apply_versioning(account, container, src_version_id)
1069
        self._report_size_change(user, account, -del_size,
1070
                                 {'action': 'object delete', 'path': path,
1071
                                  'versions': ','.join([str(dest_version_id)])})
1072
        self._report_object_change(
1073
            user, account, path, details={'action': 'object delete'})
1074
        self.permissions.access_clear(path)
1075

    
1076
        if delimiter:
1077
            prefix = name + delimiter if not name.endswith(delimiter) else name
1078
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1079
            paths = []
1080
            for t in src_names:
1081
                path = '/'.join((account, container, t[0]))
1082
                node = t[2]
1083
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1084
                del_size = self._apply_versioning(
1085
                    account, container, src_version_id)
1086
                self._report_size_change(user, account, -del_size,
1087
                                         {'action': 'object delete',
1088
                                          'path': path,
1089
                                          'versions': ','.join([str(dest_version_id)])})
1090
                self._report_object_change(
1091
                    user, account, path, details={'action': 'object delete'})
1092
                paths.append(path)
1093
            self.permissions.access_clear_bulk(paths)
1094

    
1095
    @backend_method
1096
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1097
        """Delete/purge an object."""
1098

    
1099
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1100
                     account, container, name, until, prefix, delimiter)
1101
        self._delete_object(user, account, container, name, until, delimiter)
1102

    
1103
    @backend_method
1104
    def list_versions(self, user, account, container, name):
1105
        """Return a list of all (version, version_timestamp) tuples for an object."""
1106

    
1107
        logger.debug(
1108
            "list_versions: %s %s %s %s", user, account, container, name)
1109
        self._can_read(user, account, container, name)
1110
        path, node = self._lookup_object(account, container, name)
1111
        versions = self.node.node_get_versions(node)
1112
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1113

    
1114
    @backend_method
1115
    def get_uuid(self, user, uuid):
1116
        """Return the (account, container, name) for the UUID given."""
1117

    
1118
        logger.debug("get_uuid: %s %s", user, uuid)
1119
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1120
        if info is None:
1121
            raise NameError
1122
        path, serial = info
1123
        account, container, name = path.split('/', 2)
1124
        self._can_read(user, account, container, name)
1125
        return (account, container, name)
1126

    
1127
    @backend_method
1128
    def get_public(self, user, public):
1129
        """Return the (account, container, name) for the public id given."""
1130

    
1131
        logger.debug("get_public: %s %s", user, public)
1132
        path = self.permissions.public_path(public)
1133
        if path is None:
1134
            raise NameError
1135
        account, container, name = path.split('/', 2)
1136
        self._can_read(user, account, container, name)
1137
        return (account, container, name)
1138

    
1139
    @backend_method(autocommit=0)
1140
    def get_block(self, hash):
1141
        """Return a block's data."""
1142

    
1143
        logger.debug("get_block: %s", hash)
1144
        block = self.store.block_get(binascii.unhexlify(hash))
1145
        if not block:
1146
            raise ItemNotExists('Block does not exist')
1147
        return block
1148

    
1149
    @backend_method(autocommit=0)
1150
    def put_block(self, data):
1151
        """Store a block and return the hash."""
1152

    
1153
        logger.debug("put_block: %s", len(data))
1154
        return binascii.hexlify(self.store.block_put(data))
1155

    
1156
    @backend_method(autocommit=0)
1157
    def update_block(self, hash, data, offset=0):
1158
        """Update a known block and return the hash."""
1159

    
1160
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1161
        if offset == 0 and len(data) == self.block_size:
1162
            return self.put_block(data)
1163
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1164
        return binascii.hexlify(h)
1165

    
1166
    # Path functions.
1167

    
1168
    def _generate_uuid(self):
1169
        return str(uuidlib.uuid4())
1170

    
1171
    def _put_object_node(self, path, parent, name):
1172
        path = '/'.join((path, name))
1173
        node = self.node.node_lookup(path)
1174
        if node is None:
1175
            node = self.node.node_create(parent, path)
1176
        return path, node
1177

    
1178
    def _put_path(self, user, parent, path):
1179
        node = self.node.node_create(parent, path)
1180
        self.node.version_create(node, None, 0, '', None, user,
1181
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1182
        return node
1183

    
1184
    def _lookup_account(self, account, create=True):
1185
        node = self.node.node_lookup(account)
1186
        if node is None and create:
1187
            node = self._put_path(
1188
                account, self.ROOTNODE, account)  # User is account.
1189
        return account, node
1190

    
1191
    def _lookup_container(self, account, container):
1192
        path = '/'.join((account, container))
1193
        node = self.node.node_lookup(path)
1194
        if node is None:
1195
            raise ItemNotExists('Container does not exist')
1196
        return path, node
1197

    
1198
    def _lookup_object(self, account, container, name):
1199
        path = '/'.join((account, container, name))
1200
        node = self.node.node_lookup(path)
1201
        if node is None:
1202
            raise ItemNotExists('Object does not exist')
1203
        return path, node
1204

    
1205
    def _lookup_objects(self, paths):
1206
        nodes = self.node.node_lookup_bulk(paths)
1207
        return paths, nodes
1208

    
1209
    def _get_properties(self, node, until=None):
1210
        """Return properties until the timestamp given."""
1211

    
1212
        before = until if until is not None else inf
1213
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1214
        if props is None and until is not None:
1215
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1216
        if props is None:
1217
            raise ItemNotExists('Path does not exist')
1218
        return props
1219

    
1220
    def _get_statistics(self, node, until=None):
1221
        """Return count, sum of size and latest timestamp of everything under node."""
1222

    
1223
        if until is None:
1224
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1225
        else:
1226
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1227
        if stats is None:
1228
            stats = (0, 0, 0)
1229
        return stats
1230

    
1231
    def _get_version(self, node, version=None):
1232
        if version is None:
1233
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1234
            if props is None:
1235
                raise ItemNotExists('Object does not exist')
1236
        else:
1237
            try:
1238
                version = int(version)
1239
            except ValueError:
1240
                raise VersionNotExists('Version does not exist')
1241
            props = self.node.version_get_properties(version)
1242
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1243
                raise VersionNotExists('Version does not exist')
1244
        return props
1245

    
1246
    def _get_versions(self, nodes):
1247
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1248

    
1249
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1250
        """Create a new version of the node."""
1251

    
1252
        props = self.node.version_lookup(
1253
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1254
        if props is not None:
1255
            src_version_id = props[self.SERIAL]
1256
            src_hash = props[self.HASH]
1257
            src_size = props[self.SIZE]
1258
            src_type = props[self.TYPE]
1259
            src_checksum = props[self.CHECKSUM]
1260
        else:
1261
            src_version_id = None
1262
            src_hash = None
1263
            src_size = 0
1264
            src_type = ''
1265
            src_checksum = ''
1266
        if size is None:  # Set metadata.
1267
            hash = src_hash  # This way hash can be set to None (account or container).
1268
            size = src_size
1269
        if type is None:
1270
            type = src_type
1271
        if checksum is None:
1272
            checksum = src_checksum
1273
        uuid = self._generate_uuid(
1274
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1275

    
1276
        if src_node is None:
1277
            pre_version_id = src_version_id
1278
        else:
1279
            pre_version_id = None
1280
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1281
            if props is not None:
1282
                pre_version_id = props[self.SERIAL]
1283
        if pre_version_id is not None:
1284
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1285

    
1286
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1287
        return pre_version_id, dest_version_id
1288

    
1289
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1290
        if src_version_id is not None:
1291
            self.node.attribute_copy(src_version_id, dest_version_id)
1292
        if not replace:
1293
            self.node.attribute_del(dest_version_id, domain, (
1294
                k for k, v in meta.iteritems() if v == ''))
1295
            self.node.attribute_set(dest_version_id, domain, (
1296
                (k, v) for k, v in meta.iteritems() if v != ''))
1297
        else:
1298
            self.node.attribute_del(dest_version_id, domain)
1299
            self.node.attribute_set(dest_version_id, domain, ((
1300
                k, v) for k, v in meta.iteritems()))
1301

    
1302
    def _put_metadata(self, user, node, domain, meta, replace=False):
1303
        """Create a new version and store metadata."""
1304

    
1305
        src_version_id, dest_version_id = self._put_version_duplicate(
1306
            user, node)
1307
        self._put_metadata_duplicate(
1308
            src_version_id, dest_version_id, domain, meta, replace)
1309
        return src_version_id, dest_version_id
1310

    
1311
    def _list_limits(self, listing, marker, limit):
1312
        start = 0
1313
        if marker:
1314
            try:
1315
                start = listing.index(marker) + 1
1316
            except ValueError:
1317
                pass
1318
        if not limit or limit > 10000:
1319
            limit = 10000
1320
        return start, limit
1321

    
1322
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1323
        keys = keys or []
1324
        allowed = allowed or []
1325
        cont_prefix = path + '/'
1326
        prefix = cont_prefix + prefix
1327
        start = cont_prefix + marker if marker else None
1328
        before = until if until is not None else inf
1329
        filterq = keys if domain else []
1330
        sizeq = size_range
1331

    
1332
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1333
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1334
        objects.sort(key=lambda x: x[0])
1335
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1336
        return objects
1337

    
1338
    # Reporting functions.
1339

    
1340
    def _report_size_change(self, user, account, size, details=None):
1341
        details = details or {}
1342

    
1343
        if size == 0:
1344
            return
1345

    
1346
        account_node = self._lookup_account(account, True)[1]
1347
        total = self._get_statistics(account_node)[1]
1348
        details.update({'user': user, 'total': total})
1349
        logger.debug(
1350
            "_report_size_change: %s %s %s %s", user, account, size, details)
1351
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1352
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1353
                              float(size), details))
1354

    
1355
        if not self.using_external_quotaholder:
1356
            return
1357

    
1358
        try:
1359
            serial = self.quotaholder.issue_commission(
1360
                    context     =   {},
1361
                    target      =   account,
1362
                    key         =   '1',
1363
                    clientkey   =   'pithos',
1364
                    ownerkey    =   '',
1365
                    name        =   details['path'] if 'path' in details else '',
1366
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1367
            )
1368
        except BaseException, e:
1369
            raise QuotaError(e)
1370
        else:
1371
            self.serials.append(serial)
1372

    
1373
    def _report_object_change(self, user, account, path, details=None):
1374
        details = details or {}
1375
        details.update({'user': user})
1376
        logger.debug("_report_object_change: %s %s %s %s", user,
1377
                     account, path, details)
1378
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1379
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1380

    
1381
    def _report_sharing_change(self, user, account, path, details=None):
1382
        logger.debug("_report_permissions_change: %s %s %s %s",
1383
                     user, account, path, details)
1384
        details = details or {}
1385
        details.update({'user': user})
1386
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1387
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1388

    
1389
    # Policy functions.
1390

    
1391
    def _check_policy(self, policy, is_account_policy=True):
1392
        default_policy = self.default_account_policy \
1393
            if is_account_policy else self.default_container_policy
1394
        for k in policy.keys():
1395
            if policy[k] == '':
1396
                policy[k] = default_policy.get(k)
1397
        for k, v in policy.iteritems():
1398
            if k == 'quota':
1399
                q = int(v)  # May raise ValueError.
1400
                if q < 0:
1401
                    raise ValueError
1402
            elif k == 'versioning':
1403
                if v not in ['auto', 'none']:
1404
                    raise ValueError
1405
            else:
1406
                raise ValueError
1407

    
1408
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1409
        default_policy = self.default_account_policy \
1410
            if is_account_policy else self.default_container_policy
1411
        if replace:
1412
            for k, v in default_policy.iteritems():
1413
                if k not in policy:
1414
                    policy[k] = v
1415
        self.node.policy_set(node, policy)
1416

    
1417
    def _get_policy(self, node, is_account_policy=True):
1418
        default_policy = self.default_account_policy \
1419
            if is_account_policy else self.default_container_policy
1420
        policy = default_policy.copy()
1421
        policy.update(self.node.policy_get(node))
1422
        return policy
1423

    
1424
    def _apply_versioning(self, account, container, version_id):
1425
        """Delete the provided version if such is the policy.
1426
           Return size of object removed.
1427
        """
1428

    
1429
        if version_id is None:
1430
            return 0
1431
        path, node = self._lookup_container(account, container)
1432
        versioning = self._get_policy(
1433
            node, is_account_policy=False)['versioning']
1434
        if versioning != 'auto':
1435
            hash, size = self.node.version_remove(version_id)
1436
            self.store.map_delete(hash)
1437
            return size
1438
        elif self.free_versioning:
1439
            return self.node.version_get_properties(
1440
                version_id, keys=('size',))[0]
1441
        return 0
1442

    
1443
    # Access control functions.
1444

    
1445
    def _check_groups(self, groups):
1446
        # raise ValueError('Bad characters in groups')
1447
        pass
1448

    
1449
    def _check_permissions(self, path, permissions):
1450
        # raise ValueError('Bad characters in permissions')
1451
        pass
1452

    
1453
    def _get_formatted_paths(self, paths):
1454
        formatted = []
1455
        for p in paths:
1456
            node = self.node.node_lookup(p)
1457
            props = None
1458
            if node is not None:
1459
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1460
            if props is not None:
1461
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1462
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1463
                formatted.append((p, self.MATCH_EXACT))
1464
        return formatted
1465

    
1466
    def _get_permissions_path(self, account, container, name):
1467
        path = '/'.join((account, container, name))
1468
        permission_paths = self.permissions.access_inherit(path)
1469
        permission_paths.sort()
1470
        permission_paths.reverse()
1471
        for p in permission_paths:
1472
            if p == path:
1473
                return p
1474
            else:
1475
                if p.count('/') < 2:
1476
                    continue
1477
                node = self.node.node_lookup(p)
1478
                props = None
1479
                if node is not None:
1480
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1481
                if props is not None:
1482
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1483
                        return p
1484
        return None
1485

    
1486
    def _can_read(self, user, account, container, name):
1487
        if user == account:
1488
            return True
1489
        path = '/'.join((account, container, name))
1490
        if self.permissions.public_get(path) is not None:
1491
            return True
1492
        path = self._get_permissions_path(account, container, name)
1493
        if not path:
1494
            raise NotAllowedError
1495
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1496
            raise NotAllowedError
1497

    
1498
    def _can_write(self, user, account, container, name):
1499
        if user == account:
1500
            return True
1501
        path = '/'.join((account, container, name))
1502
        path = self._get_permissions_path(account, container, name)
1503
        if not path:
1504
            raise NotAllowedError
1505
        if not self.permissions.access_check(path, self.WRITE, user):
1506
            raise NotAllowedError
1507

    
1508
    def _allowed_accounts(self, user):
1509
        allow = set()
1510
        for path in self.permissions.access_list_paths(user):
1511
            allow.add(path.split('/', 1)[0])
1512
        return sorted(allow)
1513

    
1514
    def _allowed_containers(self, user, account):
1515
        allow = set()
1516
        for path in self.permissions.access_list_paths(user, account):
1517
            allow.add(path.split('/', 2)[1])
1518
        return sorted(allow)