Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 78348987

History | View | Annotate | Download (63.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from synnefo.lib.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_enabled=False,
149
                 quotaholder_url=None, quotaholder_token=None,
150
                 quotaholder_client_poolsize=None,
151
                 free_versioning=True, block_params=None):
152
        db_module = db_module or DEFAULT_DB_MODULE
153
        db_connection = db_connection or DEFAULT_DB_CONNECTION
154
        block_module = block_module or DEFAULT_BLOCK_MODULE
155
        block_path = block_path or DEFAULT_BLOCK_PATH
156
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
157
        block_params = block_params or DEFAULT_BLOCK_PARAMS
158
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
159

    
160
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.hash_algorithm = 'sha256'
165
        self.block_size = 4 * 1024 * 1024  # 4MB
166
        self.free_versioning = free_versioning
167

    
168
        self.default_policy = {'quota': DEFAULT_QUOTA,
169
                               'versioning': DEFAULT_VERSIONING}
170

    
171
        def load_module(m):
172
            __import__(m)
173
            return sys.modules[m]
174

    
175
        self.db_module = load_module(db_module)
176
        self.wrapper = self.db_module.DBWrapper(db_connection)
177
        params = {'wrapper': self.wrapper}
178
        self.permissions = self.db_module.Permissions(**params)
179
        self.config = self.db_module.Config(**params)
180
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
181
        for x in ['READ', 'WRITE']:
182
            setattr(self, x, getattr(self.db_module, x))
183
        self.node = self.db_module.Node(**params)
184
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
185
            setattr(self, x, getattr(self.db_module, x))
186

    
187
        self.block_module = load_module(block_module)
188
        self.block_params = block_params
189
        params = {'path': block_path,
190
                  'block_size': self.block_size,
191
                  'hash_algorithm': self.hash_algorithm,
192
                  'umask': block_umask}
193
        params.update(self.block_params)
194
        self.store = self.block_module.Store(**params)
195

    
196
        if queue_module and queue_hosts:
197
            self.queue_module = load_module(queue_module)
198
            params = {'hosts': queue_hosts,
199
                      'exchange': queue_exchange,
200
                      'client_id': QUEUE_CLIENT_ID}
201
            self.queue = self.queue_module.Queue(**params)
202
        else:
203
            class NoQueue:
204
                def send(self, *args):
205
                    pass
206

    
207
                def close(self):
208
                    pass
209

    
210
            self.queue = NoQueue()
211

    
212
        self.quotaholder_enabled = quotaholder_enabled
213
        if quotaholder_enabled:
214
            self.quotaholder_url = quotaholder_url
215
            self.quotaholder_token = quotaholder_token
216
            self.quotaholder = QuotaholderClient(
217
                                    quotaholder_url,
218
                                    token=quotaholder_token,
219
                                    poolsize=quotaholder_client_poolsize)
220

    
221
        self.serials = []
222
        self.messages = []
223

    
224
    def close(self):
225
        self.wrapper.close()
226
        self.queue.close()
227

    
228
    @property
229
    def using_external_quotaholder(self):
230
        return self.quotaholder_enabled
231

    
232
    @backend_method
233
    def list_accounts(self, user, marker=None, limit=10000):
234
        """Return a list of accounts the user can access."""
235

    
236
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
237
        allowed = self._allowed_accounts(user)
238
        start, limit = self._list_limits(allowed, marker, limit)
239
        return allowed[start:start + limit]
240

    
241
    @backend_method
242
    def get_account_meta(
243
            self, user, account, domain, until=None, include_user_defined=True,
244
            external_quota=None):
245
        """Return a dictionary with the account metadata for the domain."""
246

    
247
        logger.debug(
248
            "get_account_meta: %s %s %s %s", user, account, domain, until)
249
        path, node = self._lookup_account(account, user == account)
250
        if user != account:
251
            if until or node is None or account not in self._allowed_accounts(user):
252
                raise NotAllowedError
253
        try:
254
            props = self._get_properties(node, until)
255
            mtime = props[self.MTIME]
256
        except NameError:
257
            props = None
258
            mtime = until
259
        count, bytes, tstamp = self._get_statistics(node, until)
260
        tstamp = max(tstamp, mtime)
261
        if until is None:
262
            modified = tstamp
263
        else:
264
            modified = self._get_statistics(
265
                node)[2]  # Overall last modification.
266
            modified = max(modified, mtime)
267

    
268
        if user != account:
269
            meta = {'name': account}
270
        else:
271
            meta = {}
272
            if props is not None and include_user_defined:
273
                meta.update(
274
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
275
            if until is not None:
276
                meta.update({'until_timestamp': tstamp})
277
            meta.update({'name': account, 'count': count, 'bytes': bytes})
278
            if self.using_external_quotaholder:
279
                external_quota = external_quota or {}
280
                meta['bytes'] = external_quota.get('currValue', 0)
281
        meta.update({'modified': modified})
282
        return meta
283

    
284
    @backend_method
285
    def update_account_meta(self, user, account, domain, meta, replace=False):
286
        """Update the metadata associated with the account for the domain."""
287

    
288
        logger.debug("update_account_meta: %s %s %s %s %s", user,
289
                     account, domain, meta, replace)
290
        if user != account:
291
            raise NotAllowedError
292
        path, node = self._lookup_account(account, True)
293
        self._put_metadata(user, node, domain, meta, replace)
294

    
295
    @backend_method
296
    def get_account_groups(self, user, account):
297
        """Return a dictionary with the user groups defined for this account."""
298

    
299
        logger.debug("get_account_groups: %s %s", user, account)
300
        if user != account:
301
            if account not in self._allowed_accounts(user):
302
                raise NotAllowedError
303
            return {}
304
        self._lookup_account(account, True)
305
        return self.permissions.group_dict(account)
306

    
307
    @backend_method
308
    def update_account_groups(self, user, account, groups, replace=False):
309
        """Update the groups associated with the account."""
310

    
311
        logger.debug("update_account_groups: %s %s %s %s", user,
312
                     account, groups, replace)
313
        if user != account:
314
            raise NotAllowedError
315
        self._lookup_account(account, True)
316
        self._check_groups(groups)
317
        if replace:
318
            self.permissions.group_destroy(account)
319
        for k, v in groups.iteritems():
320
            if not replace:  # If not already deleted.
321
                self.permissions.group_delete(account, k)
322
            if v:
323
                self.permissions.group_addmany(account, k, v)
324

    
325
    @backend_method
326
    def get_account_policy(self, user, account, external_quota=None):
327
        """Return a dictionary with the account policy."""
328

    
329
        logger.debug("get_account_policy: %s %s", user, account)
330
        if user != account:
331
            if account not in self._allowed_accounts(user):
332
                raise NotAllowedError
333
            return {}
334
        path, node = self._lookup_account(account, True)
335
        policy = self._get_policy(node)
336
        if self.using_external_quotaholder:
337
            external_quota = external_quota or {}
338
            policy['quota'] = external_quota.get('maxValue', 0)
339
        return policy
340

    
341
    @backend_method
342
    def update_account_policy(self, user, account, policy, replace=False):
343
        """Update the policy associated with the account."""
344

    
345
        logger.debug("update_account_policy: %s %s %s %s", user,
346
                     account, policy, replace)
347
        if user != account:
348
            raise NotAllowedError
349
        path, node = self._lookup_account(account, True)
350
        self._check_policy(policy)
351
        self._put_policy(node, policy, replace)
352

    
353
    @backend_method
354
    def put_account(self, user, account, policy=None):
355
        """Create a new account with the given name."""
356

    
357
        logger.debug("put_account: %s %s %s", user, account, policy)
358
        policy = policy or {}
359
        if user != account:
360
            raise NotAllowedError
361
        node = self.node.node_lookup(account)
362
        if node is not None:
363
            raise AccountExists('Account already exists')
364
        if policy:
365
            self._check_policy(policy)
366
        node = self._put_path(user, self.ROOTNODE, account)
367
        self._put_policy(node, policy, True)
368

    
369
    @backend_method
370
    def delete_account(self, user, account):
371
        """Delete the account with the given name."""
372

    
373
        logger.debug("delete_account: %s %s", user, account)
374
        if user != account:
375
            raise NotAllowedError
376
        node = self.node.node_lookup(account)
377
        if node is None:
378
            return
379
        if not self.node.node_remove(node):
380
            raise AccountNotEmpty('Account is not empty')
381
        self.permissions.group_destroy(account)
382

    
383
    @backend_method
384
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
385
        """Return a list of containers existing under an account."""
386

    
387
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
388
                     account, marker, limit, shared, until, public)
389
        if user != account:
390
            if until or account not in self._allowed_accounts(user):
391
                raise NotAllowedError
392
            allowed = self._allowed_containers(user, account)
393
            start, limit = self._list_limits(allowed, marker, limit)
394
            return allowed[start:start + limit]
395
        if shared or public:
396
            allowed = set()
397
            if shared:
398
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
399
            if public:
400
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
401
            allowed = sorted(allowed)
402
            start, limit = self._list_limits(allowed, marker, limit)
403
            return allowed[start:start + limit]
404
        node = self.node.node_lookup(account)
405
        containers = [x[0] for x in self._list_object_properties(
406
            node, account, '', '/', marker, limit, False, None, [], until)]
407
        start, limit = self._list_limits(
408
            [x[0] for x in containers], marker, limit)
409
        return containers[start:start + limit]
410

    
411
    @backend_method
412
    def list_container_meta(self, user, account, container, domain, until=None):
413
        """Return a list with all the container's object meta keys for the domain."""
414

    
415
        logger.debug("list_container_meta: %s %s %s %s %s", user,
416
                     account, container, domain, until)
417
        allowed = []
418
        if user != account:
419
            if until:
420
                raise NotAllowedError
421
            allowed = self.permissions.access_list_paths(
422
                user, '/'.join((account, container)))
423
            if not allowed:
424
                raise NotAllowedError
425
        path, node = self._lookup_container(account, container)
426
        before = until if until is not None else inf
427
        allowed = self._get_formatted_paths(allowed)
428
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
429

    
430
    @backend_method
431
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
432
        """Return a dictionary with the container metadata for the domain."""
433

    
434
        logger.debug("get_container_meta: %s %s %s %s %s", user,
435
                     account, container, domain, until)
436
        if user != account:
437
            if until or container not in self._allowed_containers(user, account):
438
                raise NotAllowedError
439
        path, node = self._lookup_container(account, container)
440
        props = self._get_properties(node, until)
441
        mtime = props[self.MTIME]
442
        count, bytes, tstamp = self._get_statistics(node, until)
443
        tstamp = max(tstamp, mtime)
444
        if until is None:
445
            modified = tstamp
446
        else:
447
            modified = self._get_statistics(
448
                node)[2]  # Overall last modification.
449
            modified = max(modified, mtime)
450

    
451
        if user != account:
452
            meta = {'name': container}
453
        else:
454
            meta = {}
455
            if include_user_defined:
456
                meta.update(
457
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
458
            if until is not None:
459
                meta.update({'until_timestamp': tstamp})
460
            meta.update({'name': container, 'count': count, 'bytes': bytes})
461
        meta.update({'modified': modified})
462
        return meta
463

    
464
    @backend_method
465
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
466
        """Update the metadata associated with the container for the domain."""
467

    
468
        logger.debug("update_container_meta: %s %s %s %s %s %s",
469
                     user, account, container, domain, meta, replace)
470
        if user != account:
471
            raise NotAllowedError
472
        path, node = self._lookup_container(account, container)
473
        src_version_id, dest_version_id = self._put_metadata(
474
            user, node, domain, meta, replace)
475
        if src_version_id is not None:
476
            versioning = self._get_policy(node)['versioning']
477
            if versioning != 'auto':
478
                self.node.version_remove(src_version_id)
479

    
480
    @backend_method
481
    def get_container_policy(self, user, account, container):
482
        """Return a dictionary with the container policy."""
483

    
484
        logger.debug(
485
            "get_container_policy: %s %s %s", user, account, container)
486
        if user != account:
487
            if container not in self._allowed_containers(user, account):
488
                raise NotAllowedError
489
            return {}
490
        path, node = self._lookup_container(account, container)
491
        return self._get_policy(node)
492

    
493
    @backend_method
494
    def update_container_policy(self, user, account, container, policy, replace=False):
495
        """Update the policy associated with the container."""
496

    
497
        logger.debug("update_container_policy: %s %s %s %s %s",
498
                     user, account, container, policy, replace)
499
        if user != account:
500
            raise NotAllowedError
501
        path, node = self._lookup_container(account, container)
502
        self._check_policy(policy)
503
        self._put_policy(node, policy, replace)
504

    
505
    @backend_method
506
    def put_container(self, user, account, container, policy=None):
507
        """Create a new container with the given name."""
508

    
509
        logger.debug(
510
            "put_container: %s %s %s %s", user, account, container, policy)
511
        policy = policy or {}
512
        if user != account:
513
            raise NotAllowedError
514
        try:
515
            path, node = self._lookup_container(account, container)
516
        except NameError:
517
            pass
518
        else:
519
            raise ContainerExists('Container already exists')
520
        if policy:
521
            self._check_policy(policy)
522
        path = '/'.join((account, container))
523
        node = self._put_path(
524
            user, self._lookup_account(account, True)[1], path)
525
        self._put_policy(node, policy, True)
526

    
527
    @backend_method
528
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
529
        """Delete/purge the container with the given name."""
530

    
531
        logger.debug("delete_container: %s %s %s %s %s %s", user,
532
                     account, container, until, prefix, delimiter)
533
        if user != account:
534
            raise NotAllowedError
535
        path, node = self._lookup_container(account, container)
536

    
537
        if until is not None:
538
            hashes, size, serials = self.node.node_purge_children(
539
                node, until, CLUSTER_HISTORY)
540
            for h in hashes:
541
                self.store.map_delete(h)
542
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
543
            if not self.free_versioning:
544
                self._report_size_change(
545
                    user, account, -size, {
546
                        'action':'container purge',
547
                        'path': path,
548
                        'versions': ','.join(str(i) for i in serials)
549
                    }
550
                )
551
            return
552

    
553
        if not delimiter:
554
            if self._get_statistics(node)[0] > 0:
555
                raise ContainerNotEmpty('Container is not empty')
556
            hashes, size, serials = self.node.node_purge_children(
557
                node, inf, CLUSTER_HISTORY)
558
            for h in hashes:
559
                self.store.map_delete(h)
560
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
561
            self.node.node_remove(node)
562
            if not self.free_versioning:
563
                self._report_size_change(
564
                    user, account, -size, {
565
                        'action':'container purge',
566
                        'path': path,
567
                        'versions': ','.join(str(i) for i in serials)
568
                    }
569
                )
570
        else:
571
            # remove only contents
572
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
573
            paths = []
574
            for t in src_names:
575
                path = '/'.join((account, container, t[0]))
576
                node = t[2]
577
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
578
                del_size = self._apply_versioning(
579
                    account, container, src_version_id)
580
                self._report_size_change(
581
                        user, account, -del_size, {
582
                                'action': 'object delete',
583
                                'path': path,
584
                        'versions': ','.join([str(dest_version_id)])
585
                     }
586
                )
587
                self._report_object_change(
588
                    user, account, path, details={'action': 'object delete'})
589
                paths.append(path)
590
            self.permissions.access_clear_bulk(paths)
591

    
592
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
593
        if user != account and until:
594
            raise NotAllowedError
595
        if shared and public:
596
            # get shared first
597
            shared = self._list_object_permissions(
598
                user, account, container, prefix, shared=True, public=False)
599
            objects = set()
600
            if shared:
601
                path, node = self._lookup_container(account, container)
602
                shared = self._get_formatted_paths(shared)
603
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
604

    
605
            # get public
606
            objects |= set(self._list_public_object_properties(
607
                user, account, container, prefix, all_props))
608
            objects = list(objects)
609

    
610
            objects.sort(key=lambda x: x[0])
611
            start, limit = self._list_limits(
612
                [x[0] for x in objects], marker, limit)
613
            return objects[start:start + limit]
614
        elif public:
615
            objects = self._list_public_object_properties(
616
                user, account, container, prefix, all_props)
617
            start, limit = self._list_limits(
618
                [x[0] for x in objects], marker, limit)
619
            return objects[start:start + limit]
620

    
621
        allowed = self._list_object_permissions(
622
            user, account, container, prefix, shared, public)
623
        if shared and not allowed:
624
            return []
625
        path, node = self._lookup_container(account, container)
626
        allowed = self._get_formatted_paths(allowed)
627
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
628
        start, limit = self._list_limits(
629
            [x[0] for x in objects], marker, limit)
630
        return objects[start:start + limit]
631

    
632
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
633
        public = self._list_object_permissions(
634
            user, account, container, prefix, shared=False, public=True)
635
        paths, nodes = self._lookup_objects(public)
636
        path = '/'.join((account, container))
637
        cont_prefix = path + '/'
638
        paths = [x[len(cont_prefix):] for x in paths]
639
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
640
        objects = [(path,) + props for path, props in zip(paths, props)]
641
        return objects
642

    
643
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
644
        objects = []
645
        while True:
646
            marker = objects[-1] if objects else None
647
            limit = 10000
648
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
649
            objects.extend(l)
650
            if not l or len(l) < limit:
651
                break
652
        return objects
653

    
654
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
655
        allowed = []
656
        path = '/'.join((account, container, prefix)).rstrip('/')
657
        if user != account:
658
            allowed = self.permissions.access_list_paths(user, path)
659
            if not allowed:
660
                raise NotAllowedError
661
        else:
662
            allowed = set()
663
            if shared:
664
                allowed.update(self.permissions.access_list_shared(path))
665
            if public:
666
                allowed.update(
667
                    [x[0] for x in self.permissions.public_list(path)])
668
            allowed = sorted(allowed)
669
            if not allowed:
670
                return []
671
        return allowed
672

    
673
    @backend_method
674
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
675
        """Return a list of object (name, version_id) tuples existing under a container."""
676

    
677
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
678
        keys = keys or []
679
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
680

    
681
    @backend_method
682
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
683
        """Return a list of object metadata dicts existing under a container."""
684

    
685
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
686
        keys = keys or []
687
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
688
        objects = []
689
        for p in props:
690
            if len(p) == 2:
691
                objects.append({'subdir': p[0]})
692
            else:
693
                objects.append({'name': p[0],
694
                                'bytes': p[self.SIZE + 1],
695
                                'type': p[self.TYPE + 1],
696
                                'hash': p[self.HASH + 1],
697
                                'version': p[self.SERIAL + 1],
698
                                'version_timestamp': p[self.MTIME + 1],
699
                                'modified': p[self.MTIME + 1] if until is None else None,
700
                                'modified_by': p[self.MUSER + 1],
701
                                'uuid': p[self.UUID + 1],
702
                                'checksum': p[self.CHECKSUM + 1]})
703
        return objects
704

    
705
    @backend_method
706
    def list_object_permissions(self, user, account, container, prefix=''):
707
        """Return a list of paths that enforce permissions under a container."""
708

    
709
        logger.debug("list_object_permissions: %s %s %s %s", user,
710
                     account, container, prefix)
711
        return self._list_object_permissions(user, account, container, prefix, True, False)
712

    
713
    @backend_method
714
    def list_object_public(self, user, account, container, prefix=''):
715
        """Return a dict mapping paths to public ids for objects that are public under a container."""
716

    
717
        logger.debug("list_object_public: %s %s %s %s", user,
718
                     account, container, prefix)
719
        public = {}
720
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
721
            public[path] = p + ULTIMATE_ANSWER
722
        return public
723

    
724
    @backend_method
725
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
726
        """Return a dictionary with the object metadata for the domain."""
727

    
728
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
729
                     account, container, name, domain, version)
730
        self._can_read(user, account, container, name)
731
        path, node = self._lookup_object(account, container, name)
732
        props = self._get_version(node, version)
733
        if version is None:
734
            modified = props[self.MTIME]
735
        else:
736
            try:
737
                modified = self._get_version(
738
                    node)[self.MTIME]  # Overall last modification.
739
            except NameError:  # Object may be deleted.
740
                del_props = self.node.version_lookup(
741
                    node, inf, CLUSTER_DELETED)
742
                if del_props is None:
743
                    raise ItemNotExists('Object does not exist')
744
                modified = del_props[self.MTIME]
745

    
746
        meta = {}
747
        if include_user_defined:
748
            meta.update(
749
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
750
        meta.update({'name': name,
751
                     'bytes': props[self.SIZE],
752
                     'type': props[self.TYPE],
753
                     'hash': props[self.HASH],
754
                     'version': props[self.SERIAL],
755
                     'version_timestamp': props[self.MTIME],
756
                     'modified': modified,
757
                     'modified_by': props[self.MUSER],
758
                     'uuid': props[self.UUID],
759
                     'checksum': props[self.CHECKSUM]})
760
        return meta
761

    
762
    @backend_method
763
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
764
        """Update the metadata associated with the object for the domain and return the new version."""
765

    
766
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
767
                     user, account, container, name, domain, meta, replace)
768
        self._can_write(user, account, container, name)
769
        path, node = self._lookup_object(account, container, name)
770
        src_version_id, dest_version_id = self._put_metadata(
771
            user, node, domain, meta, replace)
772
        self._apply_versioning(account, container, src_version_id)
773
        return dest_version_id
774

    
775
    @backend_method
776
    def get_object_permissions(self, user, account, container, name):
777
        """Return the action allowed on the object, the path
778
        from which the object gets its permissions from,
779
        along with a dictionary containing the permissions."""
780

    
781
        logger.debug("get_object_permissions: %s %s %s %s", user,
782
                     account, container, name)
783
        allowed = 'write'
784
        permissions_path = self._get_permissions_path(account, container, name)
785
        if user != account:
786
            if self.permissions.access_check(permissions_path, self.WRITE, user):
787
                allowed = 'write'
788
            elif self.permissions.access_check(permissions_path, self.READ, user):
789
                allowed = 'read'
790
            else:
791
                raise NotAllowedError
792
        self._lookup_object(account, container, name)
793
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
794

    
795
    @backend_method
796
    def update_object_permissions(self, user, account, container, name, permissions):
797
        """Update the permissions associated with the object."""
798

    
799
        logger.debug("update_object_permissions: %s %s %s %s %s",
800
                     user, account, container, name, permissions)
801
        if user != account:
802
            raise NotAllowedError
803
        path = self._lookup_object(account, container, name)[0]
804
        self._check_permissions(path, permissions)
805
        self.permissions.access_set(path, permissions)
806
        self._report_sharing_change(user, account, path, {'members':
807
                                    self.permissions.access_members(path)})
808

    
809
    @backend_method
810
    def get_object_public(self, user, account, container, name):
811
        """Return the public id of the object if applicable."""
812

    
813
        logger.debug(
814
            "get_object_public: %s %s %s %s", user, account, container, name)
815
        self._can_read(user, account, container, name)
816
        path = self._lookup_object(account, container, name)[0]
817
        p = self.permissions.public_get(path)
818
        if p is not None:
819
            p += ULTIMATE_ANSWER
820
        return p
821

    
822
    @backend_method
823
    def update_object_public(self, user, account, container, name, public):
824
        """Update the public status of the object."""
825

    
826
        logger.debug("update_object_public: %s %s %s %s %s", user,
827
                     account, container, name, public)
828
        self._can_write(user, account, container, name)
829
        path = self._lookup_object(account, container, name)[0]
830
        if not public:
831
            self.permissions.public_unset(path)
832
        else:
833
            self.permissions.public_set(path)
834

    
835
    @backend_method
836
    def get_object_hashmap(self, user, account, container, name, version=None):
837
        """Return the object's size and a list with partial hashes."""
838

    
839
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
840
                     account, container, name, version)
841
        self._can_read(user, account, container, name)
842
        path, node = self._lookup_object(account, container, name)
843
        props = self._get_version(node, version)
844
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
845
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
846

    
847
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
848
        if permissions is not None and user != account:
849
            raise NotAllowedError
850
        self._can_write(user, account, container, name)
851
        if permissions is not None:
852
            path = '/'.join((account, container, name))
853
            self._check_permissions(path, permissions)
854

    
855
        account_path, account_node = self._lookup_account(account, True)
856
        container_path, container_node = self._lookup_container(
857
            account, container)
858
        path, node = self._put_object_node(
859
            container_path, container_node, name)
860
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
861

    
862
        # Handle meta.
863
        if src_version_id is None:
864
            src_version_id = pre_version_id
865
        self._put_metadata_duplicate(
866
            src_version_id, dest_version_id, domain, meta, replace_meta)
867

    
868
        del_size = self._apply_versioning(account, container, pre_version_id)
869
        size_delta = size - del_size
870
        if not self.using_external_quotaholder: # Check account quota.
871
            if size_delta > 0:
872
                account_quota = long(self._get_policy(account_node)['quota'])
873
                account_usage = self._get_statistics(account_node)[1] + size_delta
874
                if (account_quota > 0 and account_usage > account_quota):
875
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
876
                        account_quota, account_usage
877
                    ))
878

    
879
        # Check container quota.
880
        container_quota = long(self._get_policy(container_node)['quota'])
881
        container_usage = self._get_statistics(container_node)[1] + size_delta
882
        if (container_quota > 0 and container_usage > container_quota):
883
            # This must be executed in a transaction, so the version is
884
            # never created if it fails.
885
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
886
                container_quota, container_usage
887
            ))
888

    
889
        self._report_size_change(user, account, size_delta,
890
                                 {'action': 'object update', 'path': path,
891
                                  'versions': ','.join([str(dest_version_id)])})
892
        if permissions is not None:
893
            self.permissions.access_set(path, permissions)
894
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
895

    
896
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
897
        return dest_version_id
898

    
899
    @backend_method
900
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
901
        """Create/update an object with the specified size and partial hashes."""
902

    
903
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
904
                     account, container, name, size, type, hashmap, checksum)
905
        meta = meta or {}
906
        if size == 0:  # No such thing as an empty hashmap.
907
            hashmap = [self.put_block('')]
908
        map = HashMap(self.block_size, self.hash_algorithm)
909
        map.extend([binascii.unhexlify(x) for x in hashmap])
910
        missing = self.store.block_search(map)
911
        if missing:
912
            ie = IndexError()
913
            ie.data = [binascii.hexlify(x) for x in missing]
914
            raise ie
915

    
916
        hash = map.hash()
917
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
918
        self.store.map_put(hash, map)
919
        return dest_version_id
920

    
921
    @backend_method
922
    def update_object_checksum(self, user, account, container, name, version, checksum):
923
        """Update an object's checksum."""
924

    
925
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
926
                     user, account, container, name, version, checksum)
927
        # Update objects with greater version and same hashmap and size (fix metadata updates).
928
        self._can_write(user, account, container, name)
929
        path, node = self._lookup_object(account, container, name)
930
        props = self._get_version(node, version)
931
        versions = self.node.node_get_versions(node)
932
        for x in versions:
933
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
934
                self.node.version_put_property(
935
                    x[self.SERIAL], 'checksum', checksum)
936

    
937
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
938
        dest_meta = dest_meta or {}
939
        dest_version_ids = []
940
        self._can_read(user, src_account, src_container, src_name)
941
        path, node = self._lookup_object(src_account, src_container, src_name)
942
        # TODO: Will do another fetch of the properties in duplicate version...
943
        props = self._get_version(
944
            node, src_version)  # Check to see if source exists.
945
        src_version_id = props[self.SERIAL]
946
        hash = props[self.HASH]
947
        size = props[self.SIZE]
948
        is_copy = not is_move and (src_account, src_container, src_name) != (
949
            dest_account, dest_container, dest_name)  # New uuid.
950
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
951
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
952
            self._delete_object(user, src_account, src_container, src_name)
953

    
954
        if delimiter:
955
            prefix = src_name + \
956
                delimiter if not src_name.endswith(delimiter) else src_name
957
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
958
            src_names.sort(key=lambda x: x[2])  # order by nodes
959
            paths = [elem[0] for elem in src_names]
960
            nodes = [elem[2] for elem in src_names]
961
            # TODO: Will do another fetch of the properties in duplicate version...
962
            props = self._get_versions(nodes)  # Check to see if source exists.
963

    
964
            for prop, path, node in zip(props, paths, nodes):
965
                src_version_id = prop[self.SERIAL]
966
                hash = prop[self.HASH]
967
                vtype = prop[self.TYPE]
968
                size = prop[self.SIZE]
969
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
970
                    delimiter) else dest_name
971
                vdest_name = path.replace(prefix, dest_prefix, 1)
972
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
973
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
974
                    self._delete_object(user, src_account, src_container, path)
975
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
976

    
977
    @backend_method
978
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
979
        """Copy an object's data and metadata."""
980

    
981
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
982
        meta = meta or {}
983
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
984
        return dest_version_id
985

    
986
    @backend_method
987
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
988
        """Move an object's data and metadata."""
989

    
990
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
991
        meta = meta or {}
992
        if user != src_account:
993
            raise NotAllowedError
994
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
995
        return dest_version_id
996

    
997
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
998
        if user != account:
999
            raise NotAllowedError
1000

    
1001
        if until is not None:
1002
            path = '/'.join((account, container, name))
1003
            node = self.node.node_lookup(path)
1004
            if node is None:
1005
                return
1006
            hashes = []
1007
            size = 0
1008
            serials = []
1009
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1010
            hashes += h
1011
            size += s
1012
            serials += v
1013
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1014
            hashes += h
1015
            if not self.free_versioning:
1016
                size += s
1017
            serials += v
1018
            for h in hashes:
1019
                self.store.map_delete(h)
1020
            self.node.node_purge(node, until, CLUSTER_DELETED)
1021
            try:
1022
                props = self._get_version(node)
1023
            except NameError:
1024
                self.permissions.access_clear(path)
1025
            self._report_size_change(
1026
                user, account, -size, {
1027
                    'action': 'object purge',
1028
                    'path': path,
1029
                    'versions': ','.join(str(i) for i in serials)
1030
                }
1031
            )
1032
            return
1033

    
1034
        path, node = self._lookup_object(account, container, name)
1035
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1036
        del_size = self._apply_versioning(account, container, src_version_id)
1037
        self._report_size_change(user, account, -del_size,
1038
                                 {'action': 'object delete', 'path': path,
1039
                                  'versions': ','.join([str(dest_version_id)])})
1040
        self._report_object_change(
1041
            user, account, path, details={'action': 'object delete'})
1042
        self.permissions.access_clear(path)
1043

    
1044
        if delimiter:
1045
            prefix = name + delimiter if not name.endswith(delimiter) else name
1046
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1047
            paths = []
1048
            for t in src_names:
1049
                path = '/'.join((account, container, t[0]))
1050
                node = t[2]
1051
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1052
                del_size = self._apply_versioning(
1053
                    account, container, src_version_id)
1054
                self._report_size_change(user, account, -del_size,
1055
                                         {'action': 'object delete',
1056
                                          'path': path,
1057
                                          'versions': ','.join([str(dest_version_id)])})
1058
                self._report_object_change(
1059
                    user, account, path, details={'action': 'object delete'})
1060
                paths.append(path)
1061
            self.permissions.access_clear_bulk(paths)
1062

    
1063
    @backend_method
1064
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1065
        """Delete/purge an object."""
1066

    
1067
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1068
                     account, container, name, until, prefix, delimiter)
1069
        self._delete_object(user, account, container, name, until, delimiter)
1070

    
1071
    @backend_method
1072
    def list_versions(self, user, account, container, name):
1073
        """Return a list of all (version, version_timestamp) tuples for an object."""
1074

    
1075
        logger.debug(
1076
            "list_versions: %s %s %s %s", user, account, container, name)
1077
        self._can_read(user, account, container, name)
1078
        path, node = self._lookup_object(account, container, name)
1079
        versions = self.node.node_get_versions(node)
1080
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1081

    
1082
    @backend_method
1083
    def get_uuid(self, user, uuid):
1084
        """Return the (account, container, name) for the UUID given."""
1085

    
1086
        logger.debug("get_uuid: %s %s", user, uuid)
1087
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1088
        if info is None:
1089
            raise NameError
1090
        path, serial = info
1091
        account, container, name = path.split('/', 2)
1092
        self._can_read(user, account, container, name)
1093
        return (account, container, name)
1094

    
1095
    @backend_method
1096
    def get_public(self, user, public):
1097
        """Return the (account, container, name) for the public id given."""
1098

    
1099
        logger.debug("get_public: %s %s", user, public)
1100
        if public is None or public < ULTIMATE_ANSWER:
1101
            raise NameError
1102
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1103
        if path is None:
1104
            raise NameError
1105
        account, container, name = path.split('/', 2)
1106
        self._can_read(user, account, container, name)
1107
        return (account, container, name)
1108

    
1109
    @backend_method(autocommit=0)
1110
    def get_block(self, hash):
1111
        """Return a block's data."""
1112

    
1113
        logger.debug("get_block: %s", hash)
1114
        block = self.store.block_get(binascii.unhexlify(hash))
1115
        if not block:
1116
            raise ItemNotExists('Block does not exist')
1117
        return block
1118

    
1119
    @backend_method(autocommit=0)
1120
    def put_block(self, data):
1121
        """Store a block and return the hash."""
1122

    
1123
        logger.debug("put_block: %s", len(data))
1124
        return binascii.hexlify(self.store.block_put(data))
1125

    
1126
    @backend_method(autocommit=0)
1127
    def update_block(self, hash, data, offset=0):
1128
        """Update a known block and return the hash."""
1129

    
1130
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1131
        if offset == 0 and len(data) == self.block_size:
1132
            return self.put_block(data)
1133
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1134
        return binascii.hexlify(h)
1135

    
1136
    # Path functions.
1137

    
1138
    def _generate_uuid(self):
1139
        return str(uuidlib.uuid4())
1140

    
1141
    def _put_object_node(self, path, parent, name):
1142
        path = '/'.join((path, name))
1143
        node = self.node.node_lookup(path)
1144
        if node is None:
1145
            node = self.node.node_create(parent, path)
1146
        return path, node
1147

    
1148
    def _put_path(self, user, parent, path):
1149
        node = self.node.node_create(parent, path)
1150
        self.node.version_create(node, None, 0, '', None, user,
1151
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1152
        return node
1153

    
1154
    def _lookup_account(self, account, create=True):
1155
        node = self.node.node_lookup(account)
1156
        if node is None and create:
1157
            node = self._put_path(
1158
                account, self.ROOTNODE, account)  # User is account.
1159
        return account, node
1160

    
1161
    def _lookup_container(self, account, container):
1162
        path = '/'.join((account, container))
1163
        node = self.node.node_lookup(path)
1164
        if node is None:
1165
            raise ItemNotExists('Container does not exist')
1166
        return path, node
1167

    
1168
    def _lookup_object(self, account, container, name):
1169
        path = '/'.join((account, container, name))
1170
        node = self.node.node_lookup(path)
1171
        if node is None:
1172
            raise ItemNotExists('Object does not exist')
1173
        return path, node
1174

    
1175
    def _lookup_objects(self, paths):
1176
        nodes = self.node.node_lookup_bulk(paths)
1177
        return paths, nodes
1178

    
1179
    def _get_properties(self, node, until=None):
1180
        """Return properties until the timestamp given."""
1181

    
1182
        before = until if until is not None else inf
1183
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1184
        if props is None and until is not None:
1185
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1186
        if props is None:
1187
            raise ItemNotExists('Path does not exist')
1188
        return props
1189

    
1190
    def _get_statistics(self, node, until=None):
1191
        """Return count, sum of size and latest timestamp of everything under node."""
1192

    
1193
        if until is None:
1194
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1195
        else:
1196
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1197
        if stats is None:
1198
            stats = (0, 0, 0)
1199
        return stats
1200

    
1201
    def _get_version(self, node, version=None):
1202
        if version is None:
1203
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1204
            if props is None:
1205
                raise ItemNotExists('Object does not exist')
1206
        else:
1207
            try:
1208
                version = int(version)
1209
            except ValueError:
1210
                raise VersionNotExists('Version does not exist')
1211
            props = self.node.version_get_properties(version)
1212
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1213
                raise VersionNotExists('Version does not exist')
1214
        return props
1215

    
1216
    def _get_versions(self, nodes):
1217
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1218

    
1219
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1220
        """Create a new version of the node."""
1221

    
1222
        props = self.node.version_lookup(
1223
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1224
        if props is not None:
1225
            src_version_id = props[self.SERIAL]
1226
            src_hash = props[self.HASH]
1227
            src_size = props[self.SIZE]
1228
            src_type = props[self.TYPE]
1229
            src_checksum = props[self.CHECKSUM]
1230
        else:
1231
            src_version_id = None
1232
            src_hash = None
1233
            src_size = 0
1234
            src_type = ''
1235
            src_checksum = ''
1236
        if size is None:  # Set metadata.
1237
            hash = src_hash  # This way hash can be set to None (account or container).
1238
            size = src_size
1239
        if type is None:
1240
            type = src_type
1241
        if checksum is None:
1242
            checksum = src_checksum
1243
        uuid = self._generate_uuid(
1244
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1245

    
1246
        if src_node is None:
1247
            pre_version_id = src_version_id
1248
        else:
1249
            pre_version_id = None
1250
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1251
            if props is not None:
1252
                pre_version_id = props[self.SERIAL]
1253
        if pre_version_id is not None:
1254
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1255

    
1256
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1257
        return pre_version_id, dest_version_id
1258

    
1259
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1260
        if src_version_id is not None:
1261
            self.node.attribute_copy(src_version_id, dest_version_id)
1262
        if not replace:
1263
            self.node.attribute_del(dest_version_id, domain, (
1264
                k for k, v in meta.iteritems() if v == ''))
1265
            self.node.attribute_set(dest_version_id, domain, (
1266
                (k, v) for k, v in meta.iteritems() if v != ''))
1267
        else:
1268
            self.node.attribute_del(dest_version_id, domain)
1269
            self.node.attribute_set(dest_version_id, domain, ((
1270
                k, v) for k, v in meta.iteritems()))
1271

    
1272
    def _put_metadata(self, user, node, domain, meta, replace=False):
1273
        """Create a new version and store metadata."""
1274

    
1275
        src_version_id, dest_version_id = self._put_version_duplicate(
1276
            user, node)
1277
        self._put_metadata_duplicate(
1278
            src_version_id, dest_version_id, domain, meta, replace)
1279
        return src_version_id, dest_version_id
1280

    
1281
    def _list_limits(self, listing, marker, limit):
1282
        start = 0
1283
        if marker:
1284
            try:
1285
                start = listing.index(marker) + 1
1286
            except ValueError:
1287
                pass
1288
        if not limit or limit > 10000:
1289
            limit = 10000
1290
        return start, limit
1291

    
1292
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1293
        keys = keys or []
1294
        allowed = allowed or []
1295
        cont_prefix = path + '/'
1296
        prefix = cont_prefix + prefix
1297
        start = cont_prefix + marker if marker else None
1298
        before = until if until is not None else inf
1299
        filterq = keys if domain else []
1300
        sizeq = size_range
1301

    
1302
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1303
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1304
        objects.sort(key=lambda x: x[0])
1305
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1306
        return objects
1307

    
1308
    # Reporting functions.
1309

    
1310
    def _report_size_change(self, user, account, size, details=None):
1311
        details = details or {}
1312

    
1313
        if size == 0:
1314
            return
1315

    
1316
        account_node = self._lookup_account(account, True)[1]
1317
        total = self._get_statistics(account_node)[1]
1318
        details.update({'user': user, 'total': total})
1319
        logger.debug(
1320
            "_report_size_change: %s %s %s %s", user, account, size, details)
1321
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1322
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1323
                              float(size), details))
1324

    
1325
        if not self.using_external_quotaholder:
1326
            return
1327

    
1328
        try:
1329
            serial = self.quotaholder.issue_commission(
1330
                    context     =   {},
1331
                    target      =   account,
1332
                    key         =   '1',
1333
                    clientkey   =   'pithos',
1334
                    ownerkey    =   '',
1335
                    name        =   details['path'] if 'path' in details else '',
1336
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1337
            )
1338
        except BaseException, e:
1339
            raise QuotaError(e)
1340
        else:
1341
            self.serials.append(serial)
1342

    
1343
    def _report_object_change(self, user, account, path, details=None):
1344
        details = details or {}
1345
        details.update({'user': user})
1346
        logger.debug("_report_object_change: %s %s %s %s", user,
1347
                     account, path, details)
1348
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1349
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1350

    
1351
    def _report_sharing_change(self, user, account, path, details=None):
1352
        logger.debug("_report_permissions_change: %s %s %s %s",
1353
                     user, account, path, details)
1354
        details = details or {}
1355
        details.update({'user': user})
1356
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1357
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1358

    
1359
    # Policy functions.
1360

    
1361
    def _check_policy(self, policy):
1362
        for k in policy.keys():
1363
            if policy[k] == '':
1364
                policy[k] = self.default_policy.get(k)
1365
        for k, v in policy.iteritems():
1366
            if k == 'quota':
1367
                q = int(v)  # May raise ValueError.
1368
                if q < 0:
1369
                    raise ValueError
1370
            elif k == 'versioning':
1371
                if v not in ['auto', 'none']:
1372
                    raise ValueError
1373
            else:
1374
                raise ValueError
1375

    
1376
    def _put_policy(self, node, policy, replace):
1377
        if replace:
1378
            for k, v in self.default_policy.iteritems():
1379
                if k not in policy:
1380
                    policy[k] = v
1381
        self.node.policy_set(node, policy)
1382

    
1383
    def _get_policy(self, node):
1384
        policy = self.default_policy.copy()
1385
        policy.update(self.node.policy_get(node))
1386
        return policy
1387

    
1388
    def _apply_versioning(self, account, container, version_id):
1389
        """Delete the provided version if such is the policy.
1390
           Return size of object removed.
1391
        """
1392

    
1393
        if version_id is None:
1394
            return 0
1395
        path, node = self._lookup_container(account, container)
1396
        versioning = self._get_policy(node)['versioning']
1397
        if versioning != 'auto':
1398
            hash, size = self.node.version_remove(version_id)
1399
            self.store.map_delete(hash)
1400
            return size
1401
        elif self.free_versioning:
1402
            return self.node.version_get_properties(
1403
                version_id, keys=('size',))[0]
1404
        return 0
1405

    
1406
    # Access control functions.
1407

    
1408
    def _check_groups(self, groups):
1409
        # raise ValueError('Bad characters in groups')
1410
        pass
1411

    
1412
    def _check_permissions(self, path, permissions):
1413
        # raise ValueError('Bad characters in permissions')
1414
        pass
1415

    
1416
    def _get_formatted_paths(self, paths):
1417
        formatted = []
1418
        for p in paths:
1419
            node = self.node.node_lookup(p)
1420
            props = None
1421
            if node is not None:
1422
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1423
            if props is not None:
1424
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1425
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1426
                formatted.append((p, self.MATCH_EXACT))
1427
        return formatted
1428

    
1429
    def _get_permissions_path(self, account, container, name):
1430
        path = '/'.join((account, container, name))
1431
        permission_paths = self.permissions.access_inherit(path)
1432
        permission_paths.sort()
1433
        permission_paths.reverse()
1434
        for p in permission_paths:
1435
            if p == path:
1436
                return p
1437
            else:
1438
                if p.count('/') < 2:
1439
                    continue
1440
                node = self.node.node_lookup(p)
1441
                props = None
1442
                if node is not None:
1443
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1444
                if props is not None:
1445
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1446
                        return p
1447
        return None
1448

    
1449
    def _can_read(self, user, account, container, name):
1450
        if user == account:
1451
            return True
1452
        path = '/'.join((account, container, name))
1453
        if self.permissions.public_get(path) is not None:
1454
            return True
1455
        path = self._get_permissions_path(account, container, name)
1456
        if not path:
1457
            raise NotAllowedError
1458
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1459
            raise NotAllowedError
1460

    
1461
    def _can_write(self, user, account, container, name):
1462
        if user == account:
1463
            return True
1464
        path = '/'.join((account, container, name))
1465
        path = self._get_permissions_path(account, container, name)
1466
        if not path:
1467
            raise NotAllowedError
1468
        if not self.permissions.access_check(path, self.WRITE, user):
1469
            raise NotAllowedError
1470

    
1471
    def _allowed_accounts(self, user):
1472
        allow = set()
1473
        for path in self.permissions.access_list_paths(user):
1474
            allow.add(path.split('/', 1)[0])
1475
        return sorted(allow)
1476

    
1477
    def _allowed_containers(self, user, account):
1478
        allow = set()
1479
        for path in self.permissions.access_list_paths(user, account):
1480
            allow.add(path.split('/', 2)[1])
1481
        return sorted(allow)