Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b336e6fa

History | View | Annotate | Download (63 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from synnefo.lib.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_enabled=False,
149
                 quotaholder_url=None, quotaholder_token=None,
150
                 quotaholder_client_poolsize=None,
151
                 free_versioning=True, block_params=None):
152
        db_module = db_module or DEFAULT_DB_MODULE
153
        db_connection = db_connection or DEFAULT_DB_CONNECTION
154
        block_module = block_module or DEFAULT_BLOCK_MODULE
155
        block_path = block_path or DEFAULT_BLOCK_PATH
156
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
157
        block_params = block_params or DEFAULT_BLOCK_PARAMS
158
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
159

    
160
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.hash_algorithm = 'sha256'
165
        self.block_size = 4 * 1024 * 1024  # 4MB
166
        self.free_versioning = free_versioning
167

    
168
        self.default_policy = {'quota': DEFAULT_QUOTA,
169
                               'versioning': DEFAULT_VERSIONING}
170

    
171
        def load_module(m):
172
            __import__(m)
173
            return sys.modules[m]
174

    
175
        self.db_module = load_module(db_module)
176
        self.wrapper = self.db_module.DBWrapper(db_connection)
177
        params = {'wrapper': self.wrapper}
178
        self.permissions = self.db_module.Permissions(**params)
179
        self.config = self.db_module.Config(**params)
180
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
181
        for x in ['READ', 'WRITE']:
182
            setattr(self, x, getattr(self.db_module, x))
183
        self.node = self.db_module.Node(**params)
184
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
185
            setattr(self, x, getattr(self.db_module, x))
186

    
187
        self.block_module = load_module(block_module)
188
        self.block_params = block_params
189
        params = {'path': block_path,
190
                  'block_size': self.block_size,
191
                  'hash_algorithm': self.hash_algorithm,
192
                  'umask': block_umask}
193
        params.update(self.block_params)
194
        self.store = self.block_module.Store(**params)
195

    
196
        if queue_module and queue_hosts:
197
            self.queue_module = load_module(queue_module)
198
            params = {'hosts': queue_hosts,
199
                      'exchange': queue_exchange,
200
                      'client_id': QUEUE_CLIENT_ID}
201
            self.queue = self.queue_module.Queue(**params)
202
        else:
203
            class NoQueue:
204
                def send(self, *args):
205
                    pass
206

    
207
                def close(self):
208
                    pass
209

    
210
            self.queue = NoQueue()
211

    
212
        self.quotaholder_enabled = quotaholder_enabled
213
        if quotaholder_enabled:
214
            self.quotaholder_url = quotaholder_url
215
            self.quotaholder_token = quotaholder_token
216
            self.quotaholder = QuotaholderClient(
217
                                    quotaholder_url,
218
                                    token=quotaholder_token,
219
                                    poolsize=quotaholder_client_poolsize)
220

    
221
        self.serials = []
222
        self.messages = []
223

    
224
    def close(self):
225
        self.wrapper.close()
226
        self.queue.close()
227

    
228
    @property
229
    def using_external_quotaholder(self):
230
        return self.quotaholder_enabled
231

    
232
    @backend_method
233
    def list_accounts(self, user, marker=None, limit=10000):
234
        """Return a list of accounts the user can access."""
235

    
236
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
237
        allowed = self._allowed_accounts(user)
238
        start, limit = self._list_limits(allowed, marker, limit)
239
        return allowed[start:start + limit]
240

    
241
    @backend_method
242
    def get_account_meta(
243
            self, user, account, domain, until=None, include_user_defined=True,
244
            external_quota=None):
245
        """Return a dictionary with the account metadata for the domain."""
246

    
247
        logger.debug(
248
            "get_account_meta: %s %s %s %s", user, account, domain, until)
249
        path, node = self._lookup_account(account, user == account)
250
        if user != account:
251
            if until or node is None or account not in self._allowed_accounts(user):
252
                raise NotAllowedError
253
        try:
254
            props = self._get_properties(node, until)
255
            mtime = props[self.MTIME]
256
        except NameError:
257
            props = None
258
            mtime = until
259
        count, bytes, tstamp = self._get_statistics(node, until)
260
        tstamp = max(tstamp, mtime)
261
        if until is None:
262
            modified = tstamp
263
        else:
264
            modified = self._get_statistics(
265
                node)[2]  # Overall last modification.
266
            modified = max(modified, mtime)
267

    
268
        if user != account:
269
            meta = {'name': account}
270
        else:
271
            meta = {}
272
            if props is not None and include_user_defined:
273
                meta.update(
274
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
275
            if until is not None:
276
                meta.update({'until_timestamp': tstamp})
277
            meta.update({'name': account, 'count': count, 'bytes': bytes})
278
            if self.using_external_quotaholder:
279
                external_quota = external_quota or {}
280
                meta['bytes'] = external_quota.get('currValue', 0)
281
        meta.update({'modified': modified})
282
        return meta
283

    
284
    @backend_method
285
    def update_account_meta(self, user, account, domain, meta, replace=False):
286
        """Update the metadata associated with the account for the domain."""
287

    
288
        logger.debug("update_account_meta: %s %s %s %s %s", user,
289
                     account, domain, meta, replace)
290
        if user != account:
291
            raise NotAllowedError
292
        path, node = self._lookup_account(account, True)
293
        self._put_metadata(user, node, domain, meta, replace)
294

    
295
    @backend_method
296
    def get_account_groups(self, user, account):
297
        """Return a dictionary with the user groups defined for this account."""
298

    
299
        logger.debug("get_account_groups: %s %s", user, account)
300
        if user != account:
301
            if account not in self._allowed_accounts(user):
302
                raise NotAllowedError
303
            return {}
304
        self._lookup_account(account, True)
305
        return self.permissions.group_dict(account)
306

    
307
    @backend_method
308
    def update_account_groups(self, user, account, groups, replace=False):
309
        """Update the groups associated with the account."""
310

    
311
        logger.debug("update_account_groups: %s %s %s %s", user,
312
                     account, groups, replace)
313
        if user != account:
314
            raise NotAllowedError
315
        self._lookup_account(account, True)
316
        self._check_groups(groups)
317
        if replace:
318
            self.permissions.group_destroy(account)
319
        for k, v in groups.iteritems():
320
            if not replace:  # If not already deleted.
321
                self.permissions.group_delete(account, k)
322
            if v:
323
                self.permissions.group_addmany(account, k, v)
324

    
325
    @backend_method
326
    def get_account_policy(self, user, account, external_quota=None):
327
        """Return a dictionary with the account policy."""
328

    
329
        logger.debug("get_account_policy: %s %s", user, account)
330
        if user != account:
331
            if account not in self._allowed_accounts(user):
332
                raise NotAllowedError
333
            return {}
334
        path, node = self._lookup_account(account, True)
335
        policy = self._get_policy(node)
336
        if self.using_external_quotaholder:
337
            external_quota = external_quota or {}
338
            policy['quota'] = external_quota.get('maxValue', 0)
339
        return policy
340

    
341
    @backend_method
342
    def update_account_policy(self, user, account, policy, replace=False):
343
        """Update the policy associated with the account."""
344

    
345
        logger.debug("update_account_policy: %s %s %s %s", user,
346
                     account, policy, replace)
347
        if user != account:
348
            raise NotAllowedError
349
        path, node = self._lookup_account(account, True)
350
        self._check_policy(policy)
351
        self._put_policy(node, policy, replace)
352

    
353
    @backend_method
354
    def put_account(self, user, account, policy={}):
355
        """Create a new account with the given name."""
356

    
357
        logger.debug("put_account: %s %s %s", user, account, policy)
358
        if user != account:
359
            raise NotAllowedError
360
        node = self.node.node_lookup(account)
361
        if node is not None:
362
            raise AccountExists('Account already exists')
363
        if policy:
364
            self._check_policy(policy)
365
        node = self._put_path(user, self.ROOTNODE, account)
366
        self._put_policy(node, policy, True)
367

    
368
    @backend_method
369
    def delete_account(self, user, account):
370
        """Delete the account with the given name."""
371

    
372
        logger.debug("delete_account: %s %s", user, account)
373
        if user != account:
374
            raise NotAllowedError
375
        node = self.node.node_lookup(account)
376
        if node is None:
377
            return
378
        if not self.node.node_remove(node):
379
            raise AccountNotEmpty('Account is not empty')
380
        self.permissions.group_destroy(account)
381

    
382
    @backend_method
383
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
384
        """Return a list of containers existing under an account."""
385

    
386
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
387
                     account, marker, limit, shared, until, public)
388
        if user != account:
389
            if until or account not in self._allowed_accounts(user):
390
                raise NotAllowedError
391
            allowed = self._allowed_containers(user, account)
392
            start, limit = self._list_limits(allowed, marker, limit)
393
            return allowed[start:start + limit]
394
        if shared or public:
395
            allowed = set()
396
            if shared:
397
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
398
            if public:
399
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
400
            allowed = sorted(allowed)
401
            start, limit = self._list_limits(allowed, marker, limit)
402
            return allowed[start:start + limit]
403
        node = self.node.node_lookup(account)
404
        containers = [x[0] for x in self._list_object_properties(
405
            node, account, '', '/', marker, limit, False, None, [], until)]
406
        start, limit = self._list_limits(
407
            [x[0] for x in containers], marker, limit)
408
        return containers[start:start + limit]
409

    
410
    @backend_method
411
    def list_container_meta(self, user, account, container, domain, until=None):
412
        """Return a list with all the container's object meta keys for the domain."""
413

    
414
        logger.debug("list_container_meta: %s %s %s %s %s", user,
415
                     account, container, domain, until)
416
        allowed = []
417
        if user != account:
418
            if until:
419
                raise NotAllowedError
420
            allowed = self.permissions.access_list_paths(
421
                user, '/'.join((account, container)))
422
            if not allowed:
423
                raise NotAllowedError
424
        path, node = self._lookup_container(account, container)
425
        before = until if until is not None else inf
426
        allowed = self._get_formatted_paths(allowed)
427
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
428

    
429
    @backend_method
430
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
431
        """Return a dictionary with the container metadata for the domain."""
432

    
433
        logger.debug("get_container_meta: %s %s %s %s %s", user,
434
                     account, container, domain, until)
435
        if user != account:
436
            if until or container not in self._allowed_containers(user, account):
437
                raise NotAllowedError
438
        path, node = self._lookup_container(account, container)
439
        props = self._get_properties(node, until)
440
        mtime = props[self.MTIME]
441
        count, bytes, tstamp = self._get_statistics(node, until)
442
        tstamp = max(tstamp, mtime)
443
        if until is None:
444
            modified = tstamp
445
        else:
446
            modified = self._get_statistics(
447
                node)[2]  # Overall last modification.
448
            modified = max(modified, mtime)
449

    
450
        if user != account:
451
            meta = {'name': container}
452
        else:
453
            meta = {}
454
            if include_user_defined:
455
                meta.update(
456
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
457
            if until is not None:
458
                meta.update({'until_timestamp': tstamp})
459
            meta.update({'name': container, 'count': count, 'bytes': bytes})
460
        meta.update({'modified': modified})
461
        return meta
462

    
463
    @backend_method
464
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
465
        """Update the metadata associated with the container for the domain."""
466

    
467
        logger.debug("update_container_meta: %s %s %s %s %s %s",
468
                     user, account, container, domain, meta, replace)
469
        if user != account:
470
            raise NotAllowedError
471
        path, node = self._lookup_container(account, container)
472
        src_version_id, dest_version_id = self._put_metadata(
473
            user, node, domain, meta, replace)
474
        if src_version_id is not None:
475
            versioning = self._get_policy(node)['versioning']
476
            if versioning != 'auto':
477
                self.node.version_remove(src_version_id)
478

    
479
    @backend_method
480
    def get_container_policy(self, user, account, container):
481
        """Return a dictionary with the container policy."""
482

    
483
        logger.debug(
484
            "get_container_policy: %s %s %s", user, account, container)
485
        if user != account:
486
            if container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
            return {}
489
        path, node = self._lookup_container(account, container)
490
        return self._get_policy(node)
491

    
492
    @backend_method
493
    def update_container_policy(self, user, account, container, policy, replace=False):
494
        """Update the policy associated with the container."""
495

    
496
        logger.debug("update_container_policy: %s %s %s %s %s",
497
                     user, account, container, policy, replace)
498
        if user != account:
499
            raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501
        self._check_policy(policy)
502
        self._put_policy(node, policy, replace)
503

    
504
    @backend_method
505
    def put_container(self, user, account, container, policy={}):
506
        """Create a new container with the given name."""
507

    
508
        logger.debug(
509
            "put_container: %s %s %s %s", user, account, container, policy)
510
        if user != account:
511
            raise NotAllowedError
512
        try:
513
            path, node = self._lookup_container(account, container)
514
        except NameError:
515
            pass
516
        else:
517
            raise ContainerExists('Container already exists')
518
        if policy:
519
            self._check_policy(policy)
520
        path = '/'.join((account, container))
521
        node = self._put_path(
522
            user, self._lookup_account(account, True)[1], path)
523
        self._put_policy(node, policy, True)
524

    
525
    @backend_method
526
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
527
        """Delete/purge the container with the given name."""
528

    
529
        logger.debug("delete_container: %s %s %s %s %s %s", user,
530
                     account, container, until, prefix, delimiter)
531
        if user != account:
532
            raise NotAllowedError
533
        path, node = self._lookup_container(account, container)
534

    
535
        if until is not None:
536
            hashes, size, serials = self.node.node_purge_children(
537
                node, until, CLUSTER_HISTORY)
538
            for h in hashes:
539
                self.store.map_delete(h)
540
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
541
            self._report_size_change(user, account, -size,
542
                                     {'action':'container purge', 'path': path,
543
                                      'versions': ','.join(str(i) for i in serials)})
544
            return
545

    
546
        if not delimiter:
547
            if self._get_statistics(node)[0] > 0:
548
                raise ContainerNotEmpty('Container is not empty')
549
            hashes, size, serials = self.node.node_purge_children(
550
                node, inf, CLUSTER_HISTORY)
551
            for h in hashes:
552
                self.store.map_delete(h)
553
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
554
            self.node.node_remove(node)
555
            self._report_size_change(user, account, -size,
556
                                     {'action': 'container delete',
557
                                      'path': path,
558
                                      'versions': ','.join(str(i) for i in serials)})
559
        else:
560
            # remove only contents
561
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
562
            paths = []
563
            for t in src_names:
564
                path = '/'.join((account, container, t[0]))
565
                node = t[2]
566
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
567
                del_size = self._apply_versioning(
568
                    account, container, src_version_id)
569
                self._report_size_change(
570
                        user, account, -del_size, {
571
                                'action': 'object delete',
572
                                'path': path,
573
                        'versions': ','.join([str(dest_version_id)])
574
                     }
575
                )
576
                self._report_object_change(
577
                    user, account, path, details={'action': 'object delete'})
578
                paths.append(path)
579
            self.permissions.access_clear_bulk(paths)
580

    
581
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
582
        if user != account and until:
583
            raise NotAllowedError
584
        if shared and public:
585
            # get shared first
586
            shared = self._list_object_permissions(
587
                user, account, container, prefix, shared=True, public=False)
588
            objects = set()
589
            if shared:
590
                path, node = self._lookup_container(account, container)
591
                shared = self._get_formatted_paths(shared)
592
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
593

    
594
            # get public
595
            objects |= set(self._list_public_object_properties(
596
                user, account, container, prefix, all_props))
597
            objects = list(objects)
598

    
599
            objects.sort(key=lambda x: x[0])
600
            start, limit = self._list_limits(
601
                [x[0] for x in objects], marker, limit)
602
            return objects[start:start + limit]
603
        elif public:
604
            objects = self._list_public_object_properties(
605
                user, account, container, prefix, all_props)
606
            start, limit = self._list_limits(
607
                [x[0] for x in objects], marker, limit)
608
            return objects[start:start + limit]
609

    
610
        allowed = self._list_object_permissions(
611
            user, account, container, prefix, shared, public)
612
        if shared and not allowed:
613
            return []
614
        path, node = self._lookup_container(account, container)
615
        allowed = self._get_formatted_paths(allowed)
616
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
617
        start, limit = self._list_limits(
618
            [x[0] for x in objects], marker, limit)
619
        return objects[start:start + limit]
620

    
621
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
622
        public = self._list_object_permissions(
623
            user, account, container, prefix, shared=False, public=True)
624
        paths, nodes = self._lookup_objects(public)
625
        path = '/'.join((account, container))
626
        cont_prefix = path + '/'
627
        paths = [x[len(cont_prefix):] for x in paths]
628
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
629
        objects = [(path,) + props for path, props in zip(paths, props)]
630
        return objects
631

    
632
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
633
        objects = []
634
        while True:
635
            marker = objects[-1] if objects else None
636
            limit = 10000
637
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
638
            objects.extend(l)
639
            if not l or len(l) < limit:
640
                break
641
        return objects
642

    
643
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
644
        allowed = []
645
        path = '/'.join((account, container, prefix)).rstrip('/')
646
        if user != account:
647
            allowed = self.permissions.access_list_paths(user, path)
648
            if not allowed:
649
                raise NotAllowedError
650
        else:
651
            allowed = set()
652
            if shared:
653
                allowed.update(self.permissions.access_list_shared(path))
654
            if public:
655
                allowed.update(
656
                    [x[0] for x in self.permissions.public_list(path)])
657
            allowed = sorted(allowed)
658
            if not allowed:
659
                return []
660
        return allowed
661

    
662
    @backend_method
663
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
664
        """Return a list of object (name, version_id) tuples existing under a container."""
665

    
666
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
667
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
668

    
669
    @backend_method
670
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
671
        """Return a list of object metadata dicts existing under a container."""
672

    
673
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
674
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
675
        objects = []
676
        for p in props:
677
            if len(p) == 2:
678
                objects.append({'subdir': p[0]})
679
            else:
680
                objects.append({'name': p[0],
681
                                'bytes': p[self.SIZE + 1],
682
                                'type': p[self.TYPE + 1],
683
                                'hash': p[self.HASH + 1],
684
                                'version': p[self.SERIAL + 1],
685
                                'version_timestamp': p[self.MTIME + 1],
686
                                'modified': p[self.MTIME + 1] if until is None else None,
687
                                'modified_by': p[self.MUSER + 1],
688
                                'uuid': p[self.UUID + 1],
689
                                'checksum': p[self.CHECKSUM + 1]})
690
        return objects
691

    
692
    @backend_method
693
    def list_object_permissions(self, user, account, container, prefix=''):
694
        """Return a list of paths that enforce permissions under a container."""
695

    
696
        logger.debug("list_object_permissions: %s %s %s %s", user,
697
                     account, container, prefix)
698
        return self._list_object_permissions(user, account, container, prefix, True, False)
699

    
700
    @backend_method
701
    def list_object_public(self, user, account, container, prefix=''):
702
        """Return a dict mapping paths to public ids for objects that are public under a container."""
703

    
704
        logger.debug("list_object_public: %s %s %s %s", user,
705
                     account, container, prefix)
706
        public = {}
707
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
708
            public[path] = p + ULTIMATE_ANSWER
709
        return public
710

    
711
    @backend_method
712
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
713
        """Return a dictionary with the object metadata for the domain."""
714

    
715
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
716
                     account, container, name, domain, version)
717
        self._can_read(user, account, container, name)
718
        path, node = self._lookup_object(account, container, name)
719
        props = self._get_version(node, version)
720
        if version is None:
721
            modified = props[self.MTIME]
722
        else:
723
            try:
724
                modified = self._get_version(
725
                    node)[self.MTIME]  # Overall last modification.
726
            except NameError:  # Object may be deleted.
727
                del_props = self.node.version_lookup(
728
                    node, inf, CLUSTER_DELETED)
729
                if del_props is None:
730
                    raise ItemNotExists('Object does not exist')
731
                modified = del_props[self.MTIME]
732

    
733
        meta = {}
734
        if include_user_defined:
735
            meta.update(
736
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
737
        meta.update({'name': name,
738
                     'bytes': props[self.SIZE],
739
                     'type': props[self.TYPE],
740
                     'hash': props[self.HASH],
741
                     'version': props[self.SERIAL],
742
                     'version_timestamp': props[self.MTIME],
743
                     'modified': modified,
744
                     'modified_by': props[self.MUSER],
745
                     'uuid': props[self.UUID],
746
                     'checksum': props[self.CHECKSUM]})
747
        return meta
748

    
749
    @backend_method
750
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
751
        """Update the metadata associated with the object for the domain and return the new version."""
752

    
753
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
754
                     user, account, container, name, domain, meta, replace)
755
        self._can_write(user, account, container, name)
756
        path, node = self._lookup_object(account, container, name)
757
        src_version_id, dest_version_id = self._put_metadata(
758
            user, node, domain, meta, replace)
759
        self._apply_versioning(account, container, src_version_id)
760
        return dest_version_id
761

    
762
    @backend_method
763
    def get_object_permissions(self, user, account, container, name):
764
        """Return the action allowed on the object, the path
765
        from which the object gets its permissions from,
766
        along with a dictionary containing the permissions."""
767

    
768
        logger.debug("get_object_permissions: %s %s %s %s", user,
769
                     account, container, name)
770
        allowed = 'write'
771
        permissions_path = self._get_permissions_path(account, container, name)
772
        if user != account:
773
            if self.permissions.access_check(permissions_path, self.WRITE, user):
774
                allowed = 'write'
775
            elif self.permissions.access_check(permissions_path, self.READ, user):
776
                allowed = 'read'
777
            else:
778
                raise NotAllowedError
779
        self._lookup_object(account, container, name)
780
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
781

    
782
    @backend_method
783
    def update_object_permissions(self, user, account, container, name, permissions):
784
        """Update the permissions associated with the object."""
785

    
786
        logger.debug("update_object_permissions: %s %s %s %s %s",
787
                     user, account, container, name, permissions)
788
        if user != account:
789
            raise NotAllowedError
790
        path = self._lookup_object(account, container, name)[0]
791
        self._check_permissions(path, permissions)
792
        self.permissions.access_set(path, permissions)
793
        self._report_sharing_change(user, account, path, {'members':
794
                                    self.permissions.access_members(path)})
795

    
796
    @backend_method
797
    def get_object_public(self, user, account, container, name):
798
        """Return the public id of the object if applicable."""
799

    
800
        logger.debug(
801
            "get_object_public: %s %s %s %s", user, account, container, name)
802
        self._can_read(user, account, container, name)
803
        path = self._lookup_object(account, container, name)[0]
804
        p = self.permissions.public_get(path)
805
        if p is not None:
806
            p += ULTIMATE_ANSWER
807
        return p
808

    
809
    @backend_method
810
    def update_object_public(self, user, account, container, name, public):
811
        """Update the public status of the object."""
812

    
813
        logger.debug("update_object_public: %s %s %s %s %s", user,
814
                     account, container, name, public)
815
        self._can_write(user, account, container, name)
816
        path = self._lookup_object(account, container, name)[0]
817
        if not public:
818
            self.permissions.public_unset(path)
819
        else:
820
            self.permissions.public_set(path)
821

    
822
    @backend_method
823
    def get_object_hashmap(self, user, account, container, name, version=None):
824
        """Return the object's size and a list with partial hashes."""
825

    
826
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
827
                     account, container, name, version)
828
        self._can_read(user, account, container, name)
829
        path, node = self._lookup_object(account, container, name)
830
        props = self._get_version(node, version)
831
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
832
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
833

    
834
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
835
        if permissions is not None and user != account:
836
            raise NotAllowedError
837
        self._can_write(user, account, container, name)
838
        if permissions is not None:
839
            path = '/'.join((account, container, name))
840
            self._check_permissions(path, permissions)
841

    
842
        account_path, account_node = self._lookup_account(account, True)
843
        container_path, container_node = self._lookup_container(
844
            account, container)
845
        path, node = self._put_object_node(
846
            container_path, container_node, name)
847
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
848

    
849
        # Handle meta.
850
        if src_version_id is None:
851
            src_version_id = pre_version_id
852
        self._put_metadata_duplicate(
853
            src_version_id, dest_version_id, domain, meta, replace_meta)
854

    
855
        del_size = self._apply_versioning(account, container, pre_version_id)
856
        size_delta = size - del_size
857
        if not self.using_external_quotaholder: # Check account quota.
858
            if size_delta > 0:
859
                account_quota = long(self._get_policy(account_node)['quota'])
860
                account_usage = self._get_statistics(account_node)[1] + size_delta
861
                if (account_quota > 0 and account_usage > account_quota):
862
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
863
                        account_quota, account_usage
864
                    ))
865

    
866
        # Check container quota.
867
        container_quota = long(self._get_policy(container_node)['quota'])
868
        container_usage = self._get_statistics(container_node)[1] + size_delta
869
        if (container_quota > 0 and container_usage > container_quota):
870
            # This must be executed in a transaction, so the version is
871
            # never created if it fails.
872
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
873
                container_quota, container_usage
874
            ))
875

    
876
        self._report_size_change(user, account, size_delta,
877
                                 {'action': 'object update', 'path': path,
878
                                  'versions': ','.join([str(dest_version_id)])})
879
        if permissions is not None:
880
            self.permissions.access_set(path, permissions)
881
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
882

    
883
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
884
        return dest_version_id
885

    
886
    @backend_method
887
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
888
        """Create/update an object with the specified size and partial hashes."""
889

    
890
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
891
                     account, container, name, size, type, hashmap, checksum)
892
        if size == 0:  # No such thing as an empty hashmap.
893
            hashmap = [self.put_block('')]
894
        map = HashMap(self.block_size, self.hash_algorithm)
895
        map.extend([binascii.unhexlify(x) for x in hashmap])
896
        missing = self.store.block_search(map)
897
        if missing:
898
            ie = IndexError()
899
            ie.data = [binascii.hexlify(x) for x in missing]
900
            raise ie
901

    
902
        hash = map.hash()
903
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
904
        self.store.map_put(hash, map)
905
        return dest_version_id
906

    
907
    @backend_method
908
    def update_object_checksum(self, user, account, container, name, version, checksum):
909
        """Update an object's checksum."""
910

    
911
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
912
                     user, account, container, name, version, checksum)
913
        # Update objects with greater version and same hashmap and size (fix metadata updates).
914
        self._can_write(user, account, container, name)
915
        path, node = self._lookup_object(account, container, name)
916
        props = self._get_version(node, version)
917
        versions = self.node.node_get_versions(node)
918
        for x in versions:
919
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
920
                self.node.version_put_property(
921
                    x[self.SERIAL], 'checksum', checksum)
922

    
923
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
924
        dest_version_ids = []
925
        self._can_read(user, src_account, src_container, src_name)
926
        path, node = self._lookup_object(src_account, src_container, src_name)
927
        # TODO: Will do another fetch of the properties in duplicate version...
928
        props = self._get_version(
929
            node, src_version)  # Check to see if source exists.
930
        src_version_id = props[self.SERIAL]
931
        hash = props[self.HASH]
932
        size = props[self.SIZE]
933
        is_copy = not is_move and (src_account, src_container, src_name) != (
934
            dest_account, dest_container, dest_name)  # New uuid.
935
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
936
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
937
            self._delete_object(user, src_account, src_container, src_name)
938

    
939
        if delimiter:
940
            prefix = src_name + \
941
                delimiter if not src_name.endswith(delimiter) else src_name
942
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
943
            src_names.sort(key=lambda x: x[2])  # order by nodes
944
            paths = [elem[0] for elem in src_names]
945
            nodes = [elem[2] for elem in src_names]
946
            # TODO: Will do another fetch of the properties in duplicate version...
947
            props = self._get_versions(nodes)  # Check to see if source exists.
948

    
949
            for prop, path, node in zip(props, paths, nodes):
950
                src_version_id = prop[self.SERIAL]
951
                hash = prop[self.HASH]
952
                vtype = prop[self.TYPE]
953
                size = prop[self.SIZE]
954
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
955
                    delimiter) else dest_name
956
                vdest_name = path.replace(prefix, dest_prefix, 1)
957
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
958
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
959
                    self._delete_object(user, src_account, src_container, path)
960
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
961

    
962
    @backend_method
963
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
964
        """Copy an object's data and metadata."""
965

    
966
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
967
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
968
        return dest_version_id
969

    
970
    @backend_method
971
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
972
        """Move an object's data and metadata."""
973

    
974
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
975
        if user != src_account:
976
            raise NotAllowedError
977
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
978
        return dest_version_id
979

    
980
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
981
        if user != account:
982
            raise NotAllowedError
983

    
984
        if until is not None:
985
            path = '/'.join((account, container, name))
986
            node = self.node.node_lookup(path)
987
            if node is None:
988
                return
989
            hashes = []
990
            size = 0
991
            serials = []
992
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
993
            hashes += h
994
            size += s
995
            serials += v
996
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
997
            hashes += h
998
            size += s
999
            serials += v
1000
            for h in hashes:
1001
                self.store.map_delete(h)
1002
            self.node.node_purge(node, until, CLUSTER_DELETED)
1003
            try:
1004
                props = self._get_version(node)
1005
            except NameError:
1006
                self.permissions.access_clear(path)
1007
            self._report_size_change(user, account, -size,
1008
                                    {'action': 'object purge', 'path': path,
1009
                                     'versions': ','.join(str(i) for i in serials)})
1010
            return
1011

    
1012
        path, node = self._lookup_object(account, container, name)
1013
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1014
        del_size = self._apply_versioning(account, container, src_version_id)
1015
        self._report_size_change(user, account, -del_size,
1016
                                 {'action': 'object delete', 'path': path,
1017
                                  'versions': ','.join([str(dest_version_id)])})
1018
        self._report_object_change(
1019
            user, account, path, details={'action': 'object delete'})
1020
        self.permissions.access_clear(path)
1021

    
1022
        if delimiter:
1023
            prefix = name + delimiter if not name.endswith(delimiter) else name
1024
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1025
            paths = []
1026
            for t in src_names:
1027
                path = '/'.join((account, container, t[0]))
1028
                node = t[2]
1029
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1030
                del_size = self._apply_versioning(
1031
                    account, container, src_version_id)
1032
                self._report_size_change(user, account, -del_size,
1033
                                         {'action': 'object delete',
1034
                                          'path': path,
1035
                                          'versions': ','.join([str(dest_version_id)])})
1036
                self._report_object_change(
1037
                    user, account, path, details={'action': 'object delete'})
1038
                paths.append(path)
1039
            self.permissions.access_clear_bulk(paths)
1040

    
1041
    @backend_method
1042
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1043
        """Delete/purge an object."""
1044

    
1045
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1046
                     account, container, name, until, prefix, delimiter)
1047
        self._delete_object(user, account, container, name, until, delimiter)
1048

    
1049
    @backend_method
1050
    def list_versions(self, user, account, container, name):
1051
        """Return a list of all (version, version_timestamp) tuples for an object."""
1052

    
1053
        logger.debug(
1054
            "list_versions: %s %s %s %s", user, account, container, name)
1055
        self._can_read(user, account, container, name)
1056
        path, node = self._lookup_object(account, container, name)
1057
        versions = self.node.node_get_versions(node)
1058
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1059

    
1060
    @backend_method
1061
    def get_uuid(self, user, uuid):
1062
        """Return the (account, container, name) for the UUID given."""
1063

    
1064
        logger.debug("get_uuid: %s %s", user, uuid)
1065
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1066
        if info is None:
1067
            raise NameError
1068
        path, serial = info
1069
        account, container, name = path.split('/', 2)
1070
        self._can_read(user, account, container, name)
1071
        return (account, container, name)
1072

    
1073
    @backend_method
1074
    def get_public(self, user, public):
1075
        """Return the (account, container, name) for the public id given."""
1076

    
1077
        logger.debug("get_public: %s %s", user, public)
1078
        if public is None or public < ULTIMATE_ANSWER:
1079
            raise NameError
1080
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1081
        if path is None:
1082
            raise NameError
1083
        account, container, name = path.split('/', 2)
1084
        self._can_read(user, account, container, name)
1085
        return (account, container, name)
1086

    
1087
    @backend_method(autocommit=0)
1088
    def get_block(self, hash):
1089
        """Return a block's data."""
1090

    
1091
        logger.debug("get_block: %s", hash)
1092
        block = self.store.block_get(binascii.unhexlify(hash))
1093
        if not block:
1094
            raise ItemNotExists('Block does not exist')
1095
        return block
1096

    
1097
    @backend_method(autocommit=0)
1098
    def put_block(self, data):
1099
        """Store a block and return the hash."""
1100

    
1101
        logger.debug("put_block: %s", len(data))
1102
        return binascii.hexlify(self.store.block_put(data))
1103

    
1104
    @backend_method(autocommit=0)
1105
    def update_block(self, hash, data, offset=0):
1106
        """Update a known block and return the hash."""
1107

    
1108
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1109
        if offset == 0 and len(data) == self.block_size:
1110
            return self.put_block(data)
1111
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1112
        return binascii.hexlify(h)
1113

    
1114
    # Path functions.
1115

    
1116
    def _generate_uuid(self):
1117
        return str(uuidlib.uuid4())
1118

    
1119
    def _put_object_node(self, path, parent, name):
1120
        path = '/'.join((path, name))
1121
        node = self.node.node_lookup(path)
1122
        if node is None:
1123
            node = self.node.node_create(parent, path)
1124
        return path, node
1125

    
1126
    def _put_path(self, user, parent, path):
1127
        node = self.node.node_create(parent, path)
1128
        self.node.version_create(node, None, 0, '', None, user,
1129
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1130
        return node
1131

    
1132
    def _lookup_account(self, account, create=True):
1133
        node = self.node.node_lookup(account)
1134
        if node is None and create:
1135
            node = self._put_path(
1136
                account, self.ROOTNODE, account)  # User is account.
1137
        return account, node
1138

    
1139
    def _lookup_container(self, account, container):
1140
        path = '/'.join((account, container))
1141
        node = self.node.node_lookup(path)
1142
        if node is None:
1143
            raise ItemNotExists('Container does not exist')
1144
        return path, node
1145

    
1146
    def _lookup_object(self, account, container, name):
1147
        path = '/'.join((account, container, name))
1148
        node = self.node.node_lookup(path)
1149
        if node is None:
1150
            raise ItemNotExists('Object does not exist')
1151
        return path, node
1152

    
1153
    def _lookup_objects(self, paths):
1154
        nodes = self.node.node_lookup_bulk(paths)
1155
        return paths, nodes
1156

    
1157
    def _get_properties(self, node, until=None):
1158
        """Return properties until the timestamp given."""
1159

    
1160
        before = until if until is not None else inf
1161
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1162
        if props is None and until is not None:
1163
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1164
        if props is None:
1165
            raise ItemNotExists('Path does not exist')
1166
        return props
1167

    
1168
    def _get_statistics(self, node, until=None):
1169
        """Return count, sum of size and latest timestamp of everything under node."""
1170

    
1171
        if until is None:
1172
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1173
        else:
1174
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1175
        if stats is None:
1176
            stats = (0, 0, 0)
1177
        return stats
1178

    
1179
    def _get_version(self, node, version=None):
1180
        if version is None:
1181
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1182
            if props is None:
1183
                raise ItemNotExists('Object does not exist')
1184
        else:
1185
            try:
1186
                version = int(version)
1187
            except ValueError:
1188
                raise VersionNotExists('Version does not exist')
1189
            props = self.node.version_get_properties(version)
1190
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1191
                raise VersionNotExists('Version does not exist')
1192
        return props
1193

    
1194
    def _get_versions(self, nodes):
1195
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1196

    
1197
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1198
        """Create a new version of the node."""
1199

    
1200
        props = self.node.version_lookup(
1201
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1202
        if props is not None:
1203
            src_version_id = props[self.SERIAL]
1204
            src_hash = props[self.HASH]
1205
            src_size = props[self.SIZE]
1206
            src_type = props[self.TYPE]
1207
            src_checksum = props[self.CHECKSUM]
1208
        else:
1209
            src_version_id = None
1210
            src_hash = None
1211
            src_size = 0
1212
            src_type = ''
1213
            src_checksum = ''
1214
        if size is None:  # Set metadata.
1215
            hash = src_hash  # This way hash can be set to None (account or container).
1216
            size = src_size
1217
        if type is None:
1218
            type = src_type
1219
        if checksum is None:
1220
            checksum = src_checksum
1221
        uuid = self._generate_uuid(
1222
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1223

    
1224
        if src_node is None:
1225
            pre_version_id = src_version_id
1226
        else:
1227
            pre_version_id = None
1228
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1229
            if props is not None:
1230
                pre_version_id = props[self.SERIAL]
1231
        if pre_version_id is not None:
1232
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1233

    
1234
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1235
        return pre_version_id, dest_version_id
1236

    
1237
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1238
        if src_version_id is not None:
1239
            self.node.attribute_copy(src_version_id, dest_version_id)
1240
        if not replace:
1241
            self.node.attribute_del(dest_version_id, domain, (
1242
                k for k, v in meta.iteritems() if v == ''))
1243
            self.node.attribute_set(dest_version_id, domain, (
1244
                (k, v) for k, v in meta.iteritems() if v != ''))
1245
        else:
1246
            self.node.attribute_del(dest_version_id, domain)
1247
            self.node.attribute_set(dest_version_id, domain, ((
1248
                k, v) for k, v in meta.iteritems()))
1249

    
1250
    def _put_metadata(self, user, node, domain, meta, replace=False):
1251
        """Create a new version and store metadata."""
1252

    
1253
        src_version_id, dest_version_id = self._put_version_duplicate(
1254
            user, node)
1255
        self._put_metadata_duplicate(
1256
            src_version_id, dest_version_id, domain, meta, replace)
1257
        return src_version_id, dest_version_id
1258

    
1259
    def _list_limits(self, listing, marker, limit):
1260
        start = 0
1261
        if marker:
1262
            try:
1263
                start = listing.index(marker) + 1
1264
            except ValueError:
1265
                pass
1266
        if not limit or limit > 10000:
1267
            limit = 10000
1268
        return start, limit
1269

    
1270
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1271
        cont_prefix = path + '/'
1272
        prefix = cont_prefix + prefix
1273
        start = cont_prefix + marker if marker else None
1274
        before = until if until is not None else inf
1275
        filterq = keys if domain else []
1276
        sizeq = size_range
1277

    
1278
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1279
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1280
        objects.sort(key=lambda x: x[0])
1281
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1282
        return objects
1283

    
1284
    # Reporting functions.
1285

    
1286
    def _report_size_change(self, user, account, size, details={}):
1287
        if size == 0:
1288
            return
1289

    
1290
        account_node = self._lookup_account(account, True)[1]
1291
        total = self._get_statistics(account_node)[1]
1292
        details.update({'user': user, 'total': total})
1293
        logger.debug(
1294
            "_report_size_change: %s %s %s %s", user, account, size, details)
1295
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1296
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1297
                              float(size), details))
1298

    
1299
        if not self.using_external_quotaholder:
1300
            return
1301

    
1302
        try:
1303
            serial = self.quotaholder.issue_commission(
1304
                    context     =   {},
1305
                    target      =   account,
1306
                    key         =   '1',
1307
                    clientkey   =   'pithos',
1308
                    ownerkey    =   '',
1309
                    name        =   details['path'] if 'path' in details else '',
1310
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1311
            )
1312
        except BaseException, e:
1313
            raise QuotaError(e)
1314
        else:
1315
            self.serials.append(serial)
1316

    
1317
    def _report_object_change(self, user, account, path, details={}):
1318
        details.update({'user': user})
1319
        logger.debug("_report_object_change: %s %s %s %s", user,
1320
                     account, path, details)
1321
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1322
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1323

    
1324
    def _report_sharing_change(self, user, account, path, details={}):
1325
        logger.debug("_report_permissions_change: %s %s %s %s",
1326
                     user, account, path, details)
1327
        details.update({'user': user})
1328
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1329
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1330

    
1331
    # Policy functions.
1332

    
1333
    def _check_policy(self, policy):
1334
        for k in policy.keys():
1335
            if policy[k] == '':
1336
                policy[k] = self.default_policy.get(k)
1337
        for k, v in policy.iteritems():
1338
            if k == 'quota':
1339
                q = int(v)  # May raise ValueError.
1340
                if q < 0:
1341
                    raise ValueError
1342
            elif k == 'versioning':
1343
                if v not in ['auto', 'none']:
1344
                    raise ValueError
1345
            else:
1346
                raise ValueError
1347

    
1348
    def _put_policy(self, node, policy, replace):
1349
        if replace:
1350
            for k, v in self.default_policy.iteritems():
1351
                if k not in policy:
1352
                    policy[k] = v
1353
        self.node.policy_set(node, policy)
1354

    
1355
    def _get_policy(self, node):
1356
        policy = self.default_policy.copy()
1357
        policy.update(self.node.policy_get(node))
1358
        return policy
1359

    
1360
    def _apply_versioning(self, account, container, version_id):
1361
        """Delete the provided version if such is the policy.
1362
           Return size of object removed.
1363
        """
1364

    
1365
        if version_id is None:
1366
            return 0
1367
        path, node = self._lookup_container(account, container)
1368
        versioning = self._get_policy(node)['versioning']
1369
        if versioning != 'auto':
1370
            hash, size = self.node.version_remove(version_id)
1371
            self.store.map_delete(hash)
1372
            return size
1373
        elif self.free_versioning:
1374
            return self.node.version_get_properties(
1375
                version_id, keys=('size',))[0]
1376
        return 0
1377

    
1378
    # Access control functions.
1379

    
1380
    def _check_groups(self, groups):
1381
        # raise ValueError('Bad characters in groups')
1382
        pass
1383

    
1384
    def _check_permissions(self, path, permissions):
1385
        # raise ValueError('Bad characters in permissions')
1386
        pass
1387

    
1388
    def _get_formatted_paths(self, paths):
1389
        formatted = []
1390
        for p in paths:
1391
            node = self.node.node_lookup(p)
1392
            props = None
1393
            if node is not None:
1394
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1395
            if props is not None:
1396
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1397
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1398
                formatted.append((p, self.MATCH_EXACT))
1399
        return formatted
1400

    
1401
    def _get_permissions_path(self, account, container, name):
1402
        path = '/'.join((account, container, name))
1403
        permission_paths = self.permissions.access_inherit(path)
1404
        permission_paths.sort()
1405
        permission_paths.reverse()
1406
        for p in permission_paths:
1407
            if p == path:
1408
                return p
1409
            else:
1410
                if p.count('/') < 2:
1411
                    continue
1412
                node = self.node.node_lookup(p)
1413
                props = None
1414
                if node is not None:
1415
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1416
                if props is not None:
1417
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1418
                        return p
1419
        return None
1420

    
1421
    def _can_read(self, user, account, container, name):
1422
        if user == account:
1423
            return True
1424
        path = '/'.join((account, container, name))
1425
        if self.permissions.public_get(path) is not None:
1426
            return True
1427
        path = self._get_permissions_path(account, container, name)
1428
        if not path:
1429
            raise NotAllowedError
1430
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1431
            raise NotAllowedError
1432

    
1433
    def _can_write(self, user, account, container, name):
1434
        if user == account:
1435
            return True
1436
        path = '/'.join((account, container, name))
1437
        path = self._get_permissions_path(account, container, name)
1438
        if not path:
1439
            raise NotAllowedError
1440
        if not self.permissions.access_check(path, self.WRITE, user):
1441
            raise NotAllowedError
1442

    
1443
    def _allowed_accounts(self, user):
1444
        allow = set()
1445
        for path in self.permissions.access_list_paths(user):
1446
            allow.add(path.split('/', 1)[0])
1447
        return sorted(allow)
1448

    
1449
    def _allowed_containers(self, user, account):
1450
        allow = set()
1451
        for path in self.permissions.access_list_paths(user, account):
1452
            allow.add(path.split('/', 2)[1])
1453
        return sorted(allow)