Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 995d7d15

History | View | Annotate | Download (65.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from synnefo.lib.quotaholder import QuotaholderClient
41

    
42
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
43
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
44
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
45
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
88
                               'abcdefghijklmnopqrstuvwxyz'
89
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
90
DEFAULT_PUBLIC_URL_SECURITY = 16
91

    
92
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
93
QUEUE_CLIENT_ID = 'pithos'
94
QUEUE_INSTANCE_ID = '1'
95

    
96
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
97

    
98
inf = float('inf')
99

    
100
ULTIMATE_ANSWER = 42
101

    
102

    
103
logger = logging.getLogger(__name__)
104

    
105

    
106
def backend_method(func=None, autocommit=1):
107
    if func is None:
108
        def fn(func):
109
            return backend_method(func, autocommit)
110
        return fn
111

    
112
    if not autocommit:
113
        return func
114

    
115
    def fn(self, *args, **kw):
116
        self.wrapper.execute()
117
        serials = []
118
        self.serials = serials
119
        self.messages = []
120

    
121
        try:
122
            ret = func(self, *args, **kw)
123
            for m in self.messages:
124
                self.queue.send(*m)
125
            if serials:
126
                self.quotaholder_serials.insert_many(serials)
127

    
128
                # commit to ensure that the serials are registered
129
                # even if accept commission fails
130
                self.wrapper.commit()
131
                self.wrapper.execute()
132

    
133
                self.quotaholder.accept_commission(
134
                            context     =   {},
135
                            clientkey   =   'pithos',
136
                            serials     =   serials)
137

    
138
                self.quotaholder_serials.delete_many(serials)
139

    
140
            self.wrapper.commit()
141
            return ret
142
        except:
143
            if serials:
144
                self.quotaholder.reject_commission(
145
                            context     =   {},
146
                            clientkey   =   'pithos',
147
                            serials     =   serials)
148
            self.wrapper.rollback()
149
            raise
150
    return fn
151

    
152

    
153
class ModularBackend(BaseBackend):
154
    """A modular backend.
155

156
    Uses modules for SQL functions and storage.
157
    """
158

    
159
    def __init__(self, db_module=None, db_connection=None,
160
                 block_module=None, block_path=None, block_umask=None,
161
                 queue_module=None, queue_hosts=None, queue_exchange=None,
162
                 quotaholder_enabled=False,
163
                 quotaholder_url=None, quotaholder_token=None,
164
                 quotaholder_client_poolsize=None,
165
                 free_versioning=True, block_params=None,
166
                 public_url_security=None,
167
                 public_url_alphabet=None,
168
                 account_quota_policy=None,
169
                 container_quota_policy=None,
170
                 container_versioning_policy=None):
171
        db_module = db_module or DEFAULT_DB_MODULE
172
        db_connection = db_connection or DEFAULT_DB_CONNECTION
173
        block_module = block_module or DEFAULT_BLOCK_MODULE
174
        block_path = block_path or DEFAULT_BLOCK_PATH
175
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
176
        block_params = block_params or DEFAULT_BLOCK_PARAMS
177
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
178
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
179
        container_quota_policy = container_quota_policy \
180
            or DEFAULT_CONTAINER_QUOTA
181
        container_versioning_policy = container_versioning_policy \
182
            or DEFAULT_CONTAINER_VERSIONING
183

    
184
        self.default_account_policy = {'quota': account_quota_policy}
185
        self.default_container_policy = {
186
            'quota': container_quota_policy,
187
            'versioning': container_versioning_policy
188
        }
189
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
190
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
191

    
192
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
193
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
194

    
195
        self.hash_algorithm = 'sha256'
196
        self.block_size = 4 * 1024 * 1024  # 4MB
197
        self.free_versioning = free_versioning
198

    
199
        def load_module(m):
200
            __import__(m)
201
            return sys.modules[m]
202

    
203
        self.db_module = load_module(db_module)
204
        self.wrapper = self.db_module.DBWrapper(db_connection)
205
        params = {'wrapper': self.wrapper}
206
        self.permissions = self.db_module.Permissions(**params)
207
        self.config = self.db_module.Config(**params)
208
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
209
        for x in ['READ', 'WRITE']:
210
            setattr(self, x, getattr(self.db_module, x))
211
        self.node = self.db_module.Node(**params)
212
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
213
            setattr(self, x, getattr(self.db_module, x))
214

    
215
        self.block_module = load_module(block_module)
216
        self.block_params = block_params
217
        params = {'path': block_path,
218
                  'block_size': self.block_size,
219
                  'hash_algorithm': self.hash_algorithm,
220
                  'umask': block_umask}
221
        params.update(self.block_params)
222
        self.store = self.block_module.Store(**params)
223

    
224
        if queue_module and queue_hosts:
225
            self.queue_module = load_module(queue_module)
226
            params = {'hosts': queue_hosts,
227
                      'exchange': queue_exchange,
228
                      'client_id': QUEUE_CLIENT_ID}
229
            self.queue = self.queue_module.Queue(**params)
230
        else:
231
            class NoQueue:
232
                def send(self, *args):
233
                    pass
234

    
235
                def close(self):
236
                    pass
237

    
238
            self.queue = NoQueue()
239

    
240
        self.quotaholder_enabled = quotaholder_enabled
241
        if quotaholder_enabled:
242
            self.quotaholder_url = quotaholder_url
243
            self.quotaholder_token = quotaholder_token
244
            self.quotaholder = QuotaholderClient(
245
                                    quotaholder_url,
246
                                    token=quotaholder_token,
247
                                    poolsize=quotaholder_client_poolsize)
248

    
249
        self.serials = []
250
        self.messages = []
251

    
252
    def close(self):
253
        self.wrapper.close()
254
        self.queue.close()
255

    
256
    @property
257
    def using_external_quotaholder(self):
258
        return self.quotaholder_enabled
259

    
260
    @backend_method
261
    def list_accounts(self, user, marker=None, limit=10000):
262
        """Return a list of accounts the user can access."""
263

    
264
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
265
        allowed = self._allowed_accounts(user)
266
        start, limit = self._list_limits(allowed, marker, limit)
267
        return allowed[start:start + limit]
268

    
269
    @backend_method
270
    def get_account_meta(
271
            self, user, account, domain, until=None, include_user_defined=True,
272
            external_quota=None):
273
        """Return a dictionary with the account metadata for the domain."""
274

    
275
        logger.debug(
276
            "get_account_meta: %s %s %s %s", user, account, domain, until)
277
        path, node = self._lookup_account(account, user == account)
278
        if user != account:
279
            if until or node is None or account not in self._allowed_accounts(user):
280
                raise NotAllowedError
281
        try:
282
            props = self._get_properties(node, until)
283
            mtime = props[self.MTIME]
284
        except NameError:
285
            props = None
286
            mtime = until
287
        count, bytes, tstamp = self._get_statistics(node, until)
288
        tstamp = max(tstamp, mtime)
289
        if until is None:
290
            modified = tstamp
291
        else:
292
            modified = self._get_statistics(
293
                node)[2]  # Overall last modification.
294
            modified = max(modified, mtime)
295

    
296
        if user != account:
297
            meta = {'name': account}
298
        else:
299
            meta = {}
300
            if props is not None and include_user_defined:
301
                meta.update(
302
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
303
            if until is not None:
304
                meta.update({'until_timestamp': tstamp})
305
            meta.update({'name': account, 'count': count, 'bytes': bytes})
306
            if self.using_external_quotaholder:
307
                external_quota = external_quota or {}
308
                meta['bytes'] = external_quota.get('currValue', 0)
309
        meta.update({'modified': modified})
310
        return meta
311

    
312
    @backend_method
313
    def update_account_meta(self, user, account, domain, meta, replace=False):
314
        """Update the metadata associated with the account for the domain."""
315

    
316
        logger.debug("update_account_meta: %s %s %s %s %s", user,
317
                     account, domain, meta, replace)
318
        if user != account:
319
            raise NotAllowedError
320
        path, node = self._lookup_account(account, True)
321
        self._put_metadata(user, node, domain, meta, replace)
322

    
323
    @backend_method
324
    def get_account_groups(self, user, account):
325
        """Return a dictionary with the user groups defined for this account."""
326

    
327
        logger.debug("get_account_groups: %s %s", user, account)
328
        if user != account:
329
            if account not in self._allowed_accounts(user):
330
                raise NotAllowedError
331
            return {}
332
        self._lookup_account(account, True)
333
        return self.permissions.group_dict(account)
334

    
335
    @backend_method
336
    def update_account_groups(self, user, account, groups, replace=False):
337
        """Update the groups associated with the account."""
338

    
339
        logger.debug("update_account_groups: %s %s %s %s", user,
340
                     account, groups, replace)
341
        if user != account:
342
            raise NotAllowedError
343
        self._lookup_account(account, True)
344
        self._check_groups(groups)
345
        if replace:
346
            self.permissions.group_destroy(account)
347
        for k, v in groups.iteritems():
348
            if not replace:  # If not already deleted.
349
                self.permissions.group_delete(account, k)
350
            if v:
351
                self.permissions.group_addmany(account, k, v)
352

    
353
    @backend_method
354
    def get_account_policy(self, user, account, external_quota=None):
355
        """Return a dictionary with the account policy."""
356

    
357
        logger.debug("get_account_policy: %s %s", user, account)
358
        if user != account:
359
            if account not in self._allowed_accounts(user):
360
                raise NotAllowedError
361
            return {}
362
        path, node = self._lookup_account(account, True)
363
        policy = self._get_policy(node, is_account_policy=True)
364
        if self.using_external_quotaholder:
365
            external_quota = external_quota or {}
366
            policy['quota'] = external_quota.get('maxValue', 0)
367
        return policy
368

    
369
    @backend_method
370
    def update_account_policy(self, user, account, policy, replace=False):
371
        """Update the policy associated with the account."""
372

    
373
        logger.debug("update_account_policy: %s %s %s %s", user,
374
                     account, policy, replace)
375
        if user != account:
376
            raise NotAllowedError
377
        path, node = self._lookup_account(account, True)
378
        self._check_policy(policy, is_account_policy=True)
379
        self._put_policy(node, policy, replace, is_account_policy=True)
380

    
381
    @backend_method
382
    def put_account(self, user, account, policy=None):
383
        """Create a new account with the given name."""
384

    
385
        logger.debug("put_account: %s %s %s", user, account, policy)
386
        policy = policy or {}
387
        if user != account:
388
            raise NotAllowedError
389
        node = self.node.node_lookup(account)
390
        if node is not None:
391
            raise AccountExists('Account already exists')
392
        if policy:
393
            self._check_policy(policy, is_account_policy=True)
394
        node = self._put_path(user, self.ROOTNODE, account)
395
        self._put_policy(node, policy, True, is_account_policy=True)
396

    
397
    @backend_method
398
    def delete_account(self, user, account):
399
        """Delete the account with the given name."""
400

    
401
        logger.debug("delete_account: %s %s", user, account)
402
        if user != account:
403
            raise NotAllowedError
404
        node = self.node.node_lookup(account)
405
        if node is None:
406
            return
407
        if not self.node.node_remove(node):
408
            raise AccountNotEmpty('Account is not empty')
409
        self.permissions.group_destroy(account)
410

    
411
    @backend_method
412
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
413
        """Return a list of containers existing under an account."""
414

    
415
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
416
                     account, marker, limit, shared, until, public)
417
        if user != account:
418
            if until or account not in self._allowed_accounts(user):
419
                raise NotAllowedError
420
            allowed = self._allowed_containers(user, account)
421
            start, limit = self._list_limits(allowed, marker, limit)
422
            return allowed[start:start + limit]
423
        if shared or public:
424
            allowed = set()
425
            if shared:
426
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
427
            if public:
428
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
429
            allowed = sorted(allowed)
430
            start, limit = self._list_limits(allowed, marker, limit)
431
            return allowed[start:start + limit]
432
        node = self.node.node_lookup(account)
433
        containers = [x[0] for x in self._list_object_properties(
434
            node, account, '', '/', marker, limit, False, None, [], until)]
435
        start, limit = self._list_limits(
436
            [x[0] for x in containers], marker, limit)
437
        return containers[start:start + limit]
438

    
439
    @backend_method
440
    def list_container_meta(self, user, account, container, domain, until=None):
441
        """Return a list with all the container's object meta keys for the domain."""
442

    
443
        logger.debug("list_container_meta: %s %s %s %s %s", user,
444
                     account, container, domain, until)
445
        allowed = []
446
        if user != account:
447
            if until:
448
                raise NotAllowedError
449
            allowed = self.permissions.access_list_paths(
450
                user, '/'.join((account, container)))
451
            if not allowed:
452
                raise NotAllowedError
453
        path, node = self._lookup_container(account, container)
454
        before = until if until is not None else inf
455
        allowed = self._get_formatted_paths(allowed)
456
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
457

    
458
    @backend_method
459
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
460
        """Return a dictionary with the container metadata for the domain."""
461

    
462
        logger.debug("get_container_meta: %s %s %s %s %s", user,
463
                     account, container, domain, until)
464
        if user != account:
465
            if until or container not in self._allowed_containers(user, account):
466
                raise NotAllowedError
467
        path, node = self._lookup_container(account, container)
468
        props = self._get_properties(node, until)
469
        mtime = props[self.MTIME]
470
        count, bytes, tstamp = self._get_statistics(node, until)
471
        tstamp = max(tstamp, mtime)
472
        if until is None:
473
            modified = tstamp
474
        else:
475
            modified = self._get_statistics(
476
                node)[2]  # Overall last modification.
477
            modified = max(modified, mtime)
478

    
479
        if user != account:
480
            meta = {'name': container}
481
        else:
482
            meta = {}
483
            if include_user_defined:
484
                meta.update(
485
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
486
            if until is not None:
487
                meta.update({'until_timestamp': tstamp})
488
            meta.update({'name': container, 'count': count, 'bytes': bytes})
489
        meta.update({'modified': modified})
490
        return meta
491

    
492
    @backend_method
493
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
494
        """Update the metadata associated with the container for the domain."""
495

    
496
        logger.debug("update_container_meta: %s %s %s %s %s %s",
497
                     user, account, container, domain, meta, replace)
498
        if user != account:
499
            raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501
        src_version_id, dest_version_id = self._put_metadata(
502
            user, node, domain, meta, replace)
503
        if src_version_id is not None:
504
            versioning = self._get_policy(
505
                node, is_account_policy=False)['versioning']
506
            if versioning != 'auto':
507
                self.node.version_remove(src_version_id)
508

    
509
    @backend_method
510
    def get_container_policy(self, user, account, container):
511
        """Return a dictionary with the container policy."""
512

    
513
        logger.debug(
514
            "get_container_policy: %s %s %s", user, account, container)
515
        if user != account:
516
            if container not in self._allowed_containers(user, account):
517
                raise NotAllowedError
518
            return {}
519
        path, node = self._lookup_container(account, container)
520
        return self._get_policy(node, is_account_policy=False)
521

    
522
    @backend_method
523
    def update_container_policy(self, user, account, container, policy, replace=False):
524
        """Update the policy associated with the container."""
525

    
526
        logger.debug("update_container_policy: %s %s %s %s %s",
527
                     user, account, container, policy, replace)
528
        if user != account:
529
            raise NotAllowedError
530
        path, node = self._lookup_container(account, container)
531
        self._check_policy(policy, is_account_policy=False)
532
        self._put_policy(node, policy, replace, is_account_policy=False)
533

    
534
    @backend_method
535
    def put_container(self, user, account, container, policy=None):
536
        """Create a new container with the given name."""
537

    
538
        logger.debug(
539
            "put_container: %s %s %s %s", user, account, container, policy)
540
        policy = policy or {}
541
        if user != account:
542
            raise NotAllowedError
543
        try:
544
            path, node = self._lookup_container(account, container)
545
        except NameError:
546
            pass
547
        else:
548
            raise ContainerExists('Container already exists')
549
        if policy:
550
            self._check_policy(policy, is_account_policy=False)
551
        path = '/'.join((account, container))
552
        node = self._put_path(
553
            user, self._lookup_account(account, True)[1], path)
554
        self._put_policy(node, policy, True, is_account_policy=False)
555

    
556
    @backend_method
557
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
558
        """Delete/purge the container with the given name."""
559

    
560
        logger.debug("delete_container: %s %s %s %s %s %s", user,
561
                     account, container, until, prefix, delimiter)
562
        if user != account:
563
            raise NotAllowedError
564
        path, node = self._lookup_container(account, container)
565

    
566
        if until is not None:
567
            hashes, size, serials = self.node.node_purge_children(
568
                node, until, CLUSTER_HISTORY)
569
            for h in hashes:
570
                self.store.map_delete(h)
571
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
572
            if not self.free_versioning:
573
                self._report_size_change(
574
                    user, account, -size, {
575
                        'action':'container purge',
576
                        'path': path,
577
                        'versions': ','.join(str(i) for i in serials)
578
                    }
579
                )
580
            return
581

    
582
        if not delimiter:
583
            if self._get_statistics(node)[0] > 0:
584
                raise ContainerNotEmpty('Container is not empty')
585
            hashes, size, serials = self.node.node_purge_children(
586
                node, inf, CLUSTER_HISTORY)
587
            for h in hashes:
588
                self.store.map_delete(h)
589
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
590
            self.node.node_remove(node)
591
            if not self.free_versioning:
592
                self._report_size_change(
593
                    user, account, -size, {
594
                        'action':'container purge',
595
                        'path': path,
596
                        'versions': ','.join(str(i) for i in serials)
597
                    }
598
                )
599
        else:
600
            # remove only contents
601
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
602
            paths = []
603
            for t in src_names:
604
                path = '/'.join((account, container, t[0]))
605
                node = t[2]
606
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
607
                del_size = self._apply_versioning(
608
                    account, container, src_version_id)
609
                self._report_size_change(
610
                        user, account, -del_size, {
611
                                'action': 'object delete',
612
                                'path': path,
613
                        'versions': ','.join([str(dest_version_id)])
614
                     }
615
                )
616
                self._report_object_change(
617
                    user, account, path, details={'action': 'object delete'})
618
                paths.append(path)
619
            self.permissions.access_clear_bulk(paths)
620

    
621
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
622
        if user != account and until:
623
            raise NotAllowedError
624
        if shared and public:
625
            # get shared first
626
            shared = self._list_object_permissions(
627
                user, account, container, prefix, shared=True, public=False)
628
            objects = set()
629
            if shared:
630
                path, node = self._lookup_container(account, container)
631
                shared = self._get_formatted_paths(shared)
632
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
633

    
634
            # get public
635
            objects |= set(self._list_public_object_properties(
636
                user, account, container, prefix, all_props))
637
            objects = list(objects)
638

    
639
            objects.sort(key=lambda x: x[0])
640
            start, limit = self._list_limits(
641
                [x[0] for x in objects], marker, limit)
642
            return objects[start:start + limit]
643
        elif public:
644
            objects = self._list_public_object_properties(
645
                user, account, container, prefix, all_props)
646
            start, limit = self._list_limits(
647
                [x[0] for x in objects], marker, limit)
648
            return objects[start:start + limit]
649

    
650
        allowed = self._list_object_permissions(
651
            user, account, container, prefix, shared, public)
652
        if shared and not allowed:
653
            return []
654
        path, node = self._lookup_container(account, container)
655
        allowed = self._get_formatted_paths(allowed)
656
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
657
        start, limit = self._list_limits(
658
            [x[0] for x in objects], marker, limit)
659
        return objects[start:start + limit]
660

    
661
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
662
        public = self._list_object_permissions(
663
            user, account, container, prefix, shared=False, public=True)
664
        paths, nodes = self._lookup_objects(public)
665
        path = '/'.join((account, container))
666
        cont_prefix = path + '/'
667
        paths = [x[len(cont_prefix):] for x in paths]
668
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
669
        objects = [(path,) + props for path, props in zip(paths, props)]
670
        return objects
671

    
672
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
673
        objects = []
674
        while True:
675
            marker = objects[-1] if objects else None
676
            limit = 10000
677
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
678
            objects.extend(l)
679
            if not l or len(l) < limit:
680
                break
681
        return objects
682

    
683
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
684
        allowed = []
685
        path = '/'.join((account, container, prefix)).rstrip('/')
686
        if user != account:
687
            allowed = self.permissions.access_list_paths(user, path)
688
            if not allowed:
689
                raise NotAllowedError
690
        else:
691
            allowed = set()
692
            if shared:
693
                allowed.update(self.permissions.access_list_shared(path))
694
            if public:
695
                allowed.update(
696
                    [x[0] for x in self.permissions.public_list(path)])
697
            allowed = sorted(allowed)
698
            if not allowed:
699
                return []
700
        return allowed
701

    
702
    @backend_method
703
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
704
        """Return a list of object (name, version_id) tuples existing under a container."""
705

    
706
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
707
        keys = keys or []
708
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
709

    
710
    @backend_method
711
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
712
        """Return a list of object metadata dicts existing under a container."""
713

    
714
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
715
        keys = keys or []
716
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
717
        objects = []
718
        for p in props:
719
            if len(p) == 2:
720
                objects.append({'subdir': p[0]})
721
            else:
722
                objects.append({'name': p[0],
723
                                'bytes': p[self.SIZE + 1],
724
                                'type': p[self.TYPE + 1],
725
                                'hash': p[self.HASH + 1],
726
                                'version': p[self.SERIAL + 1],
727
                                'version_timestamp': p[self.MTIME + 1],
728
                                'modified': p[self.MTIME + 1] if until is None else None,
729
                                'modified_by': p[self.MUSER + 1],
730
                                'uuid': p[self.UUID + 1],
731
                                'checksum': p[self.CHECKSUM + 1]})
732
        return objects
733

    
734
    @backend_method
735
    def list_object_permissions(self, user, account, container, prefix=''):
736
        """Return a list of paths that enforce permissions under a container."""
737

    
738
        logger.debug("list_object_permissions: %s %s %s %s", user,
739
                     account, container, prefix)
740
        return self._list_object_permissions(user, account, container, prefix, True, False)
741

    
742
    @backend_method
743
    def list_object_public(self, user, account, container, prefix=''):
744
        """Return a dict mapping paths to public ids for objects that are public under a container."""
745

    
746
        logger.debug("list_object_public: %s %s %s %s", user,
747
                     account, container, prefix)
748
        public = {}
749
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
750
            public[path] = p
751
        return public
752

    
753
    @backend_method
754
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
755
        """Return a dictionary with the object metadata for the domain."""
756

    
757
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
758
                     account, container, name, domain, version)
759
        self._can_read(user, account, container, name)
760
        path, node = self._lookup_object(account, container, name)
761
        props = self._get_version(node, version)
762
        if version is None:
763
            modified = props[self.MTIME]
764
        else:
765
            try:
766
                modified = self._get_version(
767
                    node)[self.MTIME]  # Overall last modification.
768
            except NameError:  # Object may be deleted.
769
                del_props = self.node.version_lookup(
770
                    node, inf, CLUSTER_DELETED)
771
                if del_props is None:
772
                    raise ItemNotExists('Object does not exist')
773
                modified = del_props[self.MTIME]
774

    
775
        meta = {}
776
        if include_user_defined:
777
            meta.update(
778
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
779
        meta.update({'name': name,
780
                     'bytes': props[self.SIZE],
781
                     'type': props[self.TYPE],
782
                     'hash': props[self.HASH],
783
                     'version': props[self.SERIAL],
784
                     'version_timestamp': props[self.MTIME],
785
                     'modified': modified,
786
                     'modified_by': props[self.MUSER],
787
                     'uuid': props[self.UUID],
788
                     'checksum': props[self.CHECKSUM]})
789
        return meta
790

    
791
    @backend_method
792
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
793
        """Update the metadata associated with the object for the domain and return the new version."""
794

    
795
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
796
                     user, account, container, name, domain, meta, replace)
797
        self._can_write(user, account, container, name)
798
        path, node = self._lookup_object(account, container, name)
799
        src_version_id, dest_version_id = self._put_metadata(
800
            user, node, domain, meta, replace)
801
        self._apply_versioning(account, container, src_version_id)
802
        return dest_version_id
803

    
804
    @backend_method
805
    def get_object_permissions(self, user, account, container, name):
806
        """Return the action allowed on the object, the path
807
        from which the object gets its permissions from,
808
        along with a dictionary containing the permissions."""
809

    
810
        logger.debug("get_object_permissions: %s %s %s %s", user,
811
                     account, container, name)
812
        allowed = 'write'
813
        permissions_path = self._get_permissions_path(account, container, name)
814
        if user != account:
815
            if self.permissions.access_check(permissions_path, self.WRITE, user):
816
                allowed = 'write'
817
            elif self.permissions.access_check(permissions_path, self.READ, user):
818
                allowed = 'read'
819
            else:
820
                raise NotAllowedError
821
        self._lookup_object(account, container, name)
822
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
823

    
824
    @backend_method
825
    def update_object_permissions(self, user, account, container, name, permissions):
826
        """Update the permissions associated with the object."""
827

    
828
        logger.debug("update_object_permissions: %s %s %s %s %s",
829
                     user, account, container, name, permissions)
830
        if user != account:
831
            raise NotAllowedError
832
        path = self._lookup_object(account, container, name)[0]
833
        self._check_permissions(path, permissions)
834
        self.permissions.access_set(path, permissions)
835
        self._report_sharing_change(user, account, path, {'members':
836
                                    self.permissions.access_members(path)})
837

    
838
    @backend_method
839
    def get_object_public(self, user, account, container, name):
840
        """Return the public id of the object if applicable."""
841

    
842
        logger.debug(
843
            "get_object_public: %s %s %s %s", user, account, container, name)
844
        self._can_read(user, account, container, name)
845
        path = self._lookup_object(account, container, name)[0]
846
        p = self.permissions.public_get(path)
847
        return p
848

    
849
    @backend_method
850
    def update_object_public(self, user, account, container, name, public):
851
        """Update the public status of the object."""
852

    
853
        logger.debug("update_object_public: %s %s %s %s %s", user,
854
                     account, container, name, public)
855
        self._can_write(user, account, container, name)
856
        path = self._lookup_object(account, container, name)[0]
857
        if not public:
858
            self.permissions.public_unset(path)
859
        else:
860
            self.permissions.public_set(
861
                path, self.public_url_security, self.public_url_alphabet
862
            )
863

    
864
    @backend_method
865
    def get_object_hashmap(self, user, account, container, name, version=None):
866
        """Return the object's size and a list with partial hashes."""
867

    
868
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
869
                     account, container, name, version)
870
        self._can_read(user, account, container, name)
871
        path, node = self._lookup_object(account, container, name)
872
        props = self._get_version(node, version)
873
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
874
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
875

    
876
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
877
        if permissions is not None and user != account:
878
            raise NotAllowedError
879
        self._can_write(user, account, container, name)
880
        if permissions is not None:
881
            path = '/'.join((account, container, name))
882
            self._check_permissions(path, permissions)
883

    
884
        account_path, account_node = self._lookup_account(account, True)
885
        container_path, container_node = self._lookup_container(
886
            account, container)
887

    
888
        path, node = self._put_object_node(
889
            container_path, container_node, name)
890
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
891

    
892
        # Handle meta.
893
        if src_version_id is None:
894
            src_version_id = pre_version_id
895
        self._put_metadata_duplicate(
896
            src_version_id, dest_version_id, domain, meta, replace_meta)
897

    
898
        del_size = self._apply_versioning(account, container, pre_version_id)
899
        size_delta = size - del_size
900
        if size_delta > 0:
901
            # Check account quota.
902
            if not self.using_external_quotaholder:
903
                account_quota = long(
904
                    self._get_policy(account_node, is_account_policy=True
905
                    )['quota']
906
                )
907
                account_usage = self._get_statistics(account_node)[1]
908
                if (account_quota > 0 and account_usage > account_quota):
909
                    raise QuotaError(
910
                        'Account quota exceeded: limit: %s, usage: %s' % (
911
                            account_quota, account_usage
912
                        )
913
                    )
914

    
915
            # Check container quota.
916
            container_quota = long(
917
                self._get_policy(container_node, is_account_policy=False
918
                )['quota']
919
            )
920
            container_usage = self._get_statistics(container_node)[1]
921
            if (container_quota > 0 and container_usage > container_quota):
922
                # This must be executed in a transaction, so the version is
923
                # never created if it fails.
924
                raise QuotaError(
925
                    'Container quota exceeded: limit: %s, usage: %s' % (
926
                        container_quota, container_usage
927
                    )
928
                )
929

    
930
        self._report_size_change(user, account, size_delta,
931
                                 {'action': 'object update', 'path': path,
932
                                  'versions': ','.join([str(dest_version_id)])})
933
        if permissions is not None:
934
            self.permissions.access_set(path, permissions)
935
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
936

    
937
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
938
        return dest_version_id
939

    
940
    @backend_method
941
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
942
        """Create/update an object with the specified size and partial hashes."""
943

    
944
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
945
                     account, container, name, size, type, hashmap, checksum)
946
        meta = meta or {}
947
        if size == 0:  # No such thing as an empty hashmap.
948
            hashmap = [self.put_block('')]
949
        map = HashMap(self.block_size, self.hash_algorithm)
950
        map.extend([binascii.unhexlify(x) for x in hashmap])
951
        missing = self.store.block_search(map)
952
        if missing:
953
            ie = IndexError()
954
            ie.data = [binascii.hexlify(x) for x in missing]
955
            raise ie
956

    
957
        hash = map.hash()
958
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
959
        self.store.map_put(hash, map)
960
        return dest_version_id
961

    
962
    @backend_method
963
    def update_object_checksum(self, user, account, container, name, version, checksum):
964
        """Update an object's checksum."""
965

    
966
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
967
                     user, account, container, name, version, checksum)
968
        # Update objects with greater version and same hashmap and size (fix metadata updates).
969
        self._can_write(user, account, container, name)
970
        path, node = self._lookup_object(account, container, name)
971
        props = self._get_version(node, version)
972
        versions = self.node.node_get_versions(node)
973
        for x in versions:
974
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
975
                self.node.version_put_property(
976
                    x[self.SERIAL], 'checksum', checksum)
977

    
978
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
979
        dest_meta = dest_meta or {}
980
        dest_version_ids = []
981
        self._can_read(user, src_account, src_container, src_name)
982
        path, node = self._lookup_object(src_account, src_container, src_name)
983
        # TODO: Will do another fetch of the properties in duplicate version...
984
        props = self._get_version(
985
            node, src_version)  # Check to see if source exists.
986
        src_version_id = props[self.SERIAL]
987
        hash = props[self.HASH]
988
        size = props[self.SIZE]
989
        is_copy = not is_move and (src_account, src_container, src_name) != (
990
            dest_account, dest_container, dest_name)  # New uuid.
991
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
992
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
993
            self._delete_object(user, src_account, src_container, src_name)
994

    
995
        if delimiter:
996
            prefix = src_name + \
997
                delimiter if not src_name.endswith(delimiter) else src_name
998
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
999
            src_names.sort(key=lambda x: x[2])  # order by nodes
1000
            paths = [elem[0] for elem in src_names]
1001
            nodes = [elem[2] for elem in src_names]
1002
            # TODO: Will do another fetch of the properties in duplicate version...
1003
            props = self._get_versions(nodes)  # Check to see if source exists.
1004

    
1005
            for prop, path, node in zip(props, paths, nodes):
1006
                src_version_id = prop[self.SERIAL]
1007
                hash = prop[self.HASH]
1008
                vtype = prop[self.TYPE]
1009
                size = prop[self.SIZE]
1010
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1011
                    delimiter) else dest_name
1012
                vdest_name = path.replace(prefix, dest_prefix, 1)
1013
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1014
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1015
                    self._delete_object(user, src_account, src_container, path)
1016
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1017

    
1018
    @backend_method
1019
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1020
        """Copy an object's data and metadata."""
1021

    
1022
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1023
        meta = meta or {}
1024
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1025
        return dest_version_id
1026

    
1027
    @backend_method
1028
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1029
        """Move an object's data and metadata."""
1030

    
1031
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1032
        meta = meta or {}
1033
        if user != src_account:
1034
            raise NotAllowedError
1035
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1036
        return dest_version_id
1037

    
1038
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1039
        if user != account:
1040
            raise NotAllowedError
1041

    
1042
        if until is not None:
1043
            path = '/'.join((account, container, name))
1044
            node = self.node.node_lookup(path)
1045
            if node is None:
1046
                return
1047
            hashes = []
1048
            size = 0
1049
            serials = []
1050
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1051
            hashes += h
1052
            size += s
1053
            serials += v
1054
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1055
            hashes += h
1056
            if not self.free_versioning:
1057
                size += s
1058
            serials += v
1059
            for h in hashes:
1060
                self.store.map_delete(h)
1061
            self.node.node_purge(node, until, CLUSTER_DELETED)
1062
            try:
1063
                props = self._get_version(node)
1064
            except NameError:
1065
                self.permissions.access_clear(path)
1066
            self._report_size_change(
1067
                user, account, -size, {
1068
                    'action': 'object purge',
1069
                    'path': path,
1070
                    'versions': ','.join(str(i) for i in serials)
1071
                }
1072
            )
1073
            return
1074

    
1075
        path, node = self._lookup_object(account, container, name)
1076
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1077
        del_size = self._apply_versioning(account, container, src_version_id)
1078
        self._report_size_change(user, account, -del_size,
1079
                                 {'action': 'object delete', 'path': path,
1080
                                  'versions': ','.join([str(dest_version_id)])})
1081
        self._report_object_change(
1082
            user, account, path, details={'action': 'object delete'})
1083
        self.permissions.access_clear(path)
1084

    
1085
        if delimiter:
1086
            prefix = name + delimiter if not name.endswith(delimiter) else name
1087
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1088
            paths = []
1089
            for t in src_names:
1090
                path = '/'.join((account, container, t[0]))
1091
                node = t[2]
1092
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1093
                del_size = self._apply_versioning(
1094
                    account, container, src_version_id)
1095
                self._report_size_change(user, account, -del_size,
1096
                                         {'action': 'object delete',
1097
                                          'path': path,
1098
                                          'versions': ','.join([str(dest_version_id)])})
1099
                self._report_object_change(
1100
                    user, account, path, details={'action': 'object delete'})
1101
                paths.append(path)
1102
            self.permissions.access_clear_bulk(paths)
1103

    
1104
    @backend_method
1105
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1106
        """Delete/purge an object."""
1107

    
1108
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1109
                     account, container, name, until, prefix, delimiter)
1110
        self._delete_object(user, account, container, name, until, delimiter)
1111

    
1112
    @backend_method
1113
    def list_versions(self, user, account, container, name):
1114
        """Return a list of all (version, version_timestamp) tuples for an object."""
1115

    
1116
        logger.debug(
1117
            "list_versions: %s %s %s %s", user, account, container, name)
1118
        self._can_read(user, account, container, name)
1119
        path, node = self._lookup_object(account, container, name)
1120
        versions = self.node.node_get_versions(node)
1121
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1122

    
1123
    @backend_method
1124
    def get_uuid(self, user, uuid):
1125
        """Return the (account, container, name) for the UUID given."""
1126

    
1127
        logger.debug("get_uuid: %s %s", user, uuid)
1128
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1129
        if info is None:
1130
            raise NameError
1131
        path, serial = info
1132
        account, container, name = path.split('/', 2)
1133
        self._can_read(user, account, container, name)
1134
        return (account, container, name)
1135

    
1136
    @backend_method
1137
    def get_public(self, user, public):
1138
        """Return the (account, container, name) for the public id given."""
1139

    
1140
        logger.debug("get_public: %s %s", user, public)
1141
        path = self.permissions.public_path(public)
1142
        if path is None:
1143
            raise NameError
1144
        account, container, name = path.split('/', 2)
1145
        self._can_read(user, account, container, name)
1146
        return (account, container, name)
1147

    
1148
    @backend_method(autocommit=0)
1149
    def get_block(self, hash):
1150
        """Return a block's data."""
1151

    
1152
        logger.debug("get_block: %s", hash)
1153
        block = self.store.block_get(binascii.unhexlify(hash))
1154
        if not block:
1155
            raise ItemNotExists('Block does not exist')
1156
        return block
1157

    
1158
    @backend_method(autocommit=0)
1159
    def put_block(self, data):
1160
        """Store a block and return the hash."""
1161

    
1162
        logger.debug("put_block: %s", len(data))
1163
        return binascii.hexlify(self.store.block_put(data))
1164

    
1165
    @backend_method(autocommit=0)
1166
    def update_block(self, hash, data, offset=0):
1167
        """Update a known block and return the hash."""
1168

    
1169
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1170
        if offset == 0 and len(data) == self.block_size:
1171
            return self.put_block(data)
1172
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1173
        return binascii.hexlify(h)
1174

    
1175
    # Path functions.
1176

    
1177
    def _generate_uuid(self):
1178
        return str(uuidlib.uuid4())
1179

    
1180
    def _put_object_node(self, path, parent, name):
1181
        path = '/'.join((path, name))
1182
        node = self.node.node_lookup(path)
1183
        if node is None:
1184
            node = self.node.node_create(parent, path)
1185
        return path, node
1186

    
1187
    def _put_path(self, user, parent, path):
1188
        node = self.node.node_create(parent, path)
1189
        self.node.version_create(node, None, 0, '', None, user,
1190
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1191
        return node
1192

    
1193
    def _lookup_account(self, account, create=True):
1194
        node = self.node.node_lookup(account)
1195
        if node is None and create:
1196
            node = self._put_path(
1197
                account, self.ROOTNODE, account)  # User is account.
1198
        return account, node
1199

    
1200
    def _lookup_container(self, account, container):
1201
        path = '/'.join((account, container))
1202
        node = self.node.node_lookup(path)
1203
        if node is None:
1204
            raise ItemNotExists('Container does not exist')
1205
        return path, node
1206

    
1207
    def _lookup_object(self, account, container, name):
1208
        path = '/'.join((account, container, name))
1209
        node = self.node.node_lookup(path)
1210
        if node is None:
1211
            raise ItemNotExists('Object does not exist')
1212
        return path, node
1213

    
1214
    def _lookup_objects(self, paths):
1215
        nodes = self.node.node_lookup_bulk(paths)
1216
        return paths, nodes
1217

    
1218
    def _get_properties(self, node, until=None):
1219
        """Return properties until the timestamp given."""
1220

    
1221
        before = until if until is not None else inf
1222
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1223
        if props is None and until is not None:
1224
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1225
        if props is None:
1226
            raise ItemNotExists('Path does not exist')
1227
        return props
1228

    
1229
    def _get_statistics(self, node, until=None):
1230
        """Return count, sum of size and latest timestamp of everything under node."""
1231

    
1232
        if until is None:
1233
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1234
        else:
1235
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1236
        if stats is None:
1237
            stats = (0, 0, 0)
1238
        return stats
1239

    
1240
    def _get_version(self, node, version=None):
1241
        if version is None:
1242
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1243
            if props is None:
1244
                raise ItemNotExists('Object does not exist')
1245
        else:
1246
            try:
1247
                version = int(version)
1248
            except ValueError:
1249
                raise VersionNotExists('Version does not exist')
1250
            props = self.node.version_get_properties(version)
1251
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1252
                raise VersionNotExists('Version does not exist')
1253
        return props
1254

    
1255
    def _get_versions(self, nodes):
1256
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1257

    
1258
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1259
        """Create a new version of the node."""
1260

    
1261
        props = self.node.version_lookup(
1262
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1263
        if props is not None:
1264
            src_version_id = props[self.SERIAL]
1265
            src_hash = props[self.HASH]
1266
            src_size = props[self.SIZE]
1267
            src_type = props[self.TYPE]
1268
            src_checksum = props[self.CHECKSUM]
1269
        else:
1270
            src_version_id = None
1271
            src_hash = None
1272
            src_size = 0
1273
            src_type = ''
1274
            src_checksum = ''
1275
        if size is None:  # Set metadata.
1276
            hash = src_hash  # This way hash can be set to None (account or container).
1277
            size = src_size
1278
        if type is None:
1279
            type = src_type
1280
        if checksum is None:
1281
            checksum = src_checksum
1282
        uuid = self._generate_uuid(
1283
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1284

    
1285
        if src_node is None:
1286
            pre_version_id = src_version_id
1287
        else:
1288
            pre_version_id = None
1289
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1290
            if props is not None:
1291
                pre_version_id = props[self.SERIAL]
1292
        if pre_version_id is not None:
1293
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1294

    
1295
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1296
        return pre_version_id, dest_version_id
1297

    
1298
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1299
        if src_version_id is not None:
1300
            self.node.attribute_copy(src_version_id, dest_version_id)
1301
        if not replace:
1302
            self.node.attribute_del(dest_version_id, domain, (
1303
                k for k, v in meta.iteritems() if v == ''))
1304
            self.node.attribute_set(dest_version_id, domain, (
1305
                (k, v) for k, v in meta.iteritems() if v != ''))
1306
        else:
1307
            self.node.attribute_del(dest_version_id, domain)
1308
            self.node.attribute_set(dest_version_id, domain, ((
1309
                k, v) for k, v in meta.iteritems()))
1310

    
1311
    def _put_metadata(self, user, node, domain, meta, replace=False):
1312
        """Create a new version and store metadata."""
1313

    
1314
        src_version_id, dest_version_id = self._put_version_duplicate(
1315
            user, node)
1316
        self._put_metadata_duplicate(
1317
            src_version_id, dest_version_id, domain, meta, replace)
1318
        return src_version_id, dest_version_id
1319

    
1320
    def _list_limits(self, listing, marker, limit):
1321
        start = 0
1322
        if marker:
1323
            try:
1324
                start = listing.index(marker) + 1
1325
            except ValueError:
1326
                pass
1327
        if not limit or limit > 10000:
1328
            limit = 10000
1329
        return start, limit
1330

    
1331
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1332
        keys = keys or []
1333
        allowed = allowed or []
1334
        cont_prefix = path + '/'
1335
        prefix = cont_prefix + prefix
1336
        start = cont_prefix + marker if marker else None
1337
        before = until if until is not None else inf
1338
        filterq = keys if domain else []
1339
        sizeq = size_range
1340

    
1341
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1342
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1343
        objects.sort(key=lambda x: x[0])
1344
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1345
        return objects
1346

    
1347
    # Reporting functions.
1348

    
1349
    def _report_size_change(self, user, account, size, details=None):
1350
        details = details or {}
1351

    
1352
        if size == 0:
1353
            return
1354

    
1355
        account_node = self._lookup_account(account, True)[1]
1356
        total = self._get_statistics(account_node)[1]
1357
        details.update({'user': user, 'total': total})
1358
        logger.debug(
1359
            "_report_size_change: %s %s %s %s", user, account, size, details)
1360
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1361
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1362
                              float(size), details))
1363

    
1364
        if not self.using_external_quotaholder:
1365
            return
1366

    
1367
        try:
1368
            serial = self.quotaholder.issue_commission(
1369
                    context     =   {},
1370
                    target      =   account,
1371
                    key         =   '1',
1372
                    clientkey   =   'pithos',
1373
                    ownerkey    =   '',
1374
                    name        =   details['path'] if 'path' in details else '',
1375
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1376
            )
1377
        except BaseException, e:
1378
            raise QuotaError(e)
1379
        else:
1380
            self.serials.append(serial)
1381

    
1382
    def _report_object_change(self, user, account, path, details=None):
1383
        details = details or {}
1384
        details.update({'user': user})
1385
        logger.debug("_report_object_change: %s %s %s %s", user,
1386
                     account, path, details)
1387
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1388
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1389

    
1390
    def _report_sharing_change(self, user, account, path, details=None):
1391
        logger.debug("_report_permissions_change: %s %s %s %s",
1392
                     user, account, path, details)
1393
        details = details or {}
1394
        details.update({'user': user})
1395
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1396
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1397

    
1398
    # Policy functions.
1399

    
1400
    def _check_policy(self, policy, is_account_policy=True):
1401
        default_policy = self.default_account_policy \
1402
            if is_account_policy else self.default_container_policy
1403
        for k in policy.keys():
1404
            if policy[k] == '':
1405
                policy[k] = default_policy.get(k)
1406
        for k, v in policy.iteritems():
1407
            if k == 'quota':
1408
                q = int(v)  # May raise ValueError.
1409
                if q < 0:
1410
                    raise ValueError
1411
            elif k == 'versioning':
1412
                if v not in ['auto', 'none']:
1413
                    raise ValueError
1414
            else:
1415
                raise ValueError
1416

    
1417
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1418
        default_policy = self.default_account_policy \
1419
            if is_account_policy else self.default_container_policy
1420
        if replace:
1421
            for k, v in default_policy.iteritems():
1422
                if k not in policy:
1423
                    policy[k] = v
1424
        self.node.policy_set(node, policy)
1425

    
1426
    def _get_policy(self, node, is_account_policy=True):
1427
        default_policy = self.default_account_policy \
1428
            if is_account_policy else self.default_container_policy
1429
        policy = default_policy.copy()
1430
        policy.update(self.node.policy_get(node))
1431
        return policy
1432

    
1433
    def _apply_versioning(self, account, container, version_id):
1434
        """Delete the provided version if such is the policy.
1435
           Return size of object removed.
1436
        """
1437

    
1438
        if version_id is None:
1439
            return 0
1440
        path, node = self._lookup_container(account, container)
1441
        versioning = self._get_policy(
1442
            node, is_account_policy=False)['versioning']
1443
        if versioning != 'auto':
1444
            hash, size = self.node.version_remove(version_id)
1445
            self.store.map_delete(hash)
1446
            return size
1447
        elif self.free_versioning:
1448
            return self.node.version_get_properties(
1449
                version_id, keys=('size',))[0]
1450
        return 0
1451

    
1452
    # Access control functions.
1453

    
1454
    def _check_groups(self, groups):
1455
        # raise ValueError('Bad characters in groups')
1456
        pass
1457

    
1458
    def _check_permissions(self, path, permissions):
1459
        # raise ValueError('Bad characters in permissions')
1460
        pass
1461

    
1462
    def _get_formatted_paths(self, paths):
1463
        formatted = []
1464
        for p in paths:
1465
            node = self.node.node_lookup(p)
1466
            props = None
1467
            if node is not None:
1468
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1469
            if props is not None:
1470
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1471
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1472
                formatted.append((p, self.MATCH_EXACT))
1473
        return formatted
1474

    
1475
    def _get_permissions_path(self, account, container, name):
1476
        path = '/'.join((account, container, name))
1477
        permission_paths = self.permissions.access_inherit(path)
1478
        permission_paths.sort()
1479
        permission_paths.reverse()
1480
        for p in permission_paths:
1481
            if p == path:
1482
                return p
1483
            else:
1484
                if p.count('/') < 2:
1485
                    continue
1486
                node = self.node.node_lookup(p)
1487
                props = None
1488
                if node is not None:
1489
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1490
                if props is not None:
1491
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1492
                        return p
1493
        return None
1494

    
1495
    def _can_read(self, user, account, container, name):
1496
        if user == account:
1497
            return True
1498
        path = '/'.join((account, container, name))
1499
        if self.permissions.public_get(path) is not None:
1500
            return True
1501
        path = self._get_permissions_path(account, container, name)
1502
        if not path:
1503
            raise NotAllowedError
1504
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1505
            raise NotAllowedError
1506

    
1507
    def _can_write(self, user, account, container, name):
1508
        if user == account:
1509
            return True
1510
        path = '/'.join((account, container, name))
1511
        path = self._get_permissions_path(account, container, name)
1512
        if not path:
1513
            raise NotAllowedError
1514
        if not self.permissions.access_check(path, self.WRITE, user):
1515
            raise NotAllowedError
1516

    
1517
    def _allowed_accounts(self, user):
1518
        allow = set()
1519
        for path in self.permissions.access_list_paths(user):
1520
            allow.add(path.split('/', 1)[0])
1521
        return sorted(allow)
1522

    
1523
    def _allowed_containers(self, user, account):
1524
        allow = set()
1525
        for path in self.permissions.access_list_paths(user, account):
1526
            allow.add(path.split('/', 2)[1])
1527
        return sorted(allow)