Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 0a92ff85

History | View | Annotate | Download (63.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from synnefo.lib.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_enabled=False,
149
                 quotaholder_url=None, quotaholder_token=None,
150
                 quotaholder_client_poolsize=None,
151
                 free_versioning=True, block_params=None):
152
        db_module = db_module or DEFAULT_DB_MODULE
153
        db_connection = db_connection or DEFAULT_DB_CONNECTION
154
        block_module = block_module or DEFAULT_BLOCK_MODULE
155
        block_path = block_path or DEFAULT_BLOCK_PATH
156
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
157
        block_params = block_params or DEFAULT_BLOCK_PARAMS
158
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
159

    
160
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.hash_algorithm = 'sha256'
165
        self.block_size = 4 * 1024 * 1024  # 4MB
166
        self.free_versioning = free_versioning
167

    
168
        self.default_policy = {'quota': DEFAULT_QUOTA,
169
                               'versioning': DEFAULT_VERSIONING}
170

    
171
        def load_module(m):
172
            __import__(m)
173
            return sys.modules[m]
174

    
175
        self.db_module = load_module(db_module)
176
        self.wrapper = self.db_module.DBWrapper(db_connection)
177
        params = {'wrapper': self.wrapper}
178
        self.permissions = self.db_module.Permissions(**params)
179
        self.config = self.db_module.Config(**params)
180
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
181
        for x in ['READ', 'WRITE']:
182
            setattr(self, x, getattr(self.db_module, x))
183
        self.node = self.db_module.Node(**params)
184
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
185
            setattr(self, x, getattr(self.db_module, x))
186

    
187
        self.block_module = load_module(block_module)
188
        self.block_params = block_params
189
        params = {'path': block_path,
190
                  'block_size': self.block_size,
191
                  'hash_algorithm': self.hash_algorithm,
192
                  'umask': block_umask}
193
        params.update(self.block_params)
194
        self.store = self.block_module.Store(**params)
195

    
196
        if queue_module and queue_hosts:
197
            self.queue_module = load_module(queue_module)
198
            params = {'hosts': queue_hosts,
199
                      'exchange': queue_exchange,
200
                      'client_id': QUEUE_CLIENT_ID}
201
            self.queue = self.queue_module.Queue(**params)
202
        else:
203
            class NoQueue:
204
                def send(self, *args):
205
                    pass
206

    
207
                def close(self):
208
                    pass
209

    
210
            self.queue = NoQueue()
211

    
212
        self.quotaholder_enabled = quotaholder_enabled
213
        if quotaholder_enabled:
214
            self.quotaholder_url = quotaholder_url
215
            self.quotaholder_token = quotaholder_token
216
            self.quotaholder = QuotaholderClient(
217
                                    quotaholder_url,
218
                                    token=quotaholder_token,
219
                                    poolsize=quotaholder_client_poolsize)
220

    
221
        self.serials = []
222
        self.messages = []
223

    
224
    def close(self):
225
        self.wrapper.close()
226
        self.queue.close()
227

    
228
    @property
229
    def using_external_quotaholder(self):
230
        return self.quotaholder_enabled
231

    
232
    @backend_method
233
    def list_accounts(self, user, marker=None, limit=10000):
234
        """Return a list of accounts the user can access."""
235

    
236
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
237
        allowed = self._allowed_accounts(user)
238
        start, limit = self._list_limits(allowed, marker, limit)
239
        return allowed[start:start + limit]
240

    
241
    @backend_method
242
    def get_account_meta(
243
            self, user, account, domain, until=None, include_user_defined=True,
244
            external_quota=None):
245
        """Return a dictionary with the account metadata for the domain."""
246

    
247
        logger.debug(
248
            "get_account_meta: %s %s %s %s", user, account, domain, until)
249
        path, node = self._lookup_account(account, user == account)
250
        if user != account:
251
            if until or node is None or account not in self._allowed_accounts(user):
252
                raise NotAllowedError
253
        try:
254
            props = self._get_properties(node, until)
255
            mtime = props[self.MTIME]
256
        except NameError:
257
            props = None
258
            mtime = until
259
        count, bytes, tstamp = self._get_statistics(node, until)
260
        tstamp = max(tstamp, mtime)
261
        if until is None:
262
            modified = tstamp
263
        else:
264
            modified = self._get_statistics(
265
                node)[2]  # Overall last modification.
266
            modified = max(modified, mtime)
267

    
268
        if user != account:
269
            meta = {'name': account}
270
        else:
271
            meta = {}
272
            if props is not None and include_user_defined:
273
                meta.update(
274
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
275
            if until is not None:
276
                meta.update({'until_timestamp': tstamp})
277
            meta.update({'name': account, 'count': count, 'bytes': bytes})
278
            if self.using_external_quotaholder:
279
                external_quota = external_quota or {}
280
                meta['bytes'] = external_quota.get('currValue', 0)
281
        meta.update({'modified': modified})
282
        return meta
283

    
284
    @backend_method
285
    def update_account_meta(self, user, account, domain, meta, replace=False):
286
        """Update the metadata associated with the account for the domain."""
287

    
288
        logger.debug("update_account_meta: %s %s %s %s %s", user,
289
                     account, domain, meta, replace)
290
        if user != account:
291
            raise NotAllowedError
292
        path, node = self._lookup_account(account, True)
293
        self._put_metadata(user, node, domain, meta, replace)
294

    
295
    @backend_method
296
    def get_account_groups(self, user, account):
297
        """Return a dictionary with the user groups defined for this account."""
298

    
299
        logger.debug("get_account_groups: %s %s", user, account)
300
        if user != account:
301
            if account not in self._allowed_accounts(user):
302
                raise NotAllowedError
303
            return {}
304
        self._lookup_account(account, True)
305
        return self.permissions.group_dict(account)
306

    
307
    @backend_method
308
    def update_account_groups(self, user, account, groups, replace=False):
309
        """Update the groups associated with the account."""
310

    
311
        logger.debug("update_account_groups: %s %s %s %s", user,
312
                     account, groups, replace)
313
        if user != account:
314
            raise NotAllowedError
315
        self._lookup_account(account, True)
316
        self._check_groups(groups)
317
        if replace:
318
            self.permissions.group_destroy(account)
319
        for k, v in groups.iteritems():
320
            if not replace:  # If not already deleted.
321
                self.permissions.group_delete(account, k)
322
            if v:
323
                self.permissions.group_addmany(account, k, v)
324

    
325
    @backend_method
326
    def get_account_policy(self, user, account, external_quota=None):
327
        """Return a dictionary with the account policy."""
328

    
329
        logger.debug("get_account_policy: %s %s", user, account)
330
        if user != account:
331
            if account not in self._allowed_accounts(user):
332
                raise NotAllowedError
333
            return {}
334
        path, node = self._lookup_account(account, True)
335
        policy = self._get_policy(node)
336
        if self.using_external_quotaholder:
337
            external_quota = external_quota or {}
338
            policy['quota'] = external_quota.get('maxValue', 0)
339
        return policy
340

    
341
    @backend_method
342
    def update_account_policy(self, user, account, policy, replace=False):
343
        """Update the policy associated with the account."""
344

    
345
        logger.debug("update_account_policy: %s %s %s %s", user,
346
                     account, policy, replace)
347
        if user != account:
348
            raise NotAllowedError
349
        path, node = self._lookup_account(account, True)
350
        self._check_policy(policy)
351
        self._put_policy(node, policy, replace)
352

    
353
    @backend_method
354
    def put_account(self, user, account, policy={}):
355
        """Create a new account with the given name."""
356

    
357
        logger.debug("put_account: %s %s %s", user, account, policy)
358
        if user != account:
359
            raise NotAllowedError
360
        node = self.node.node_lookup(account)
361
        if node is not None:
362
            raise AccountExists('Account already exists')
363
        if policy:
364
            self._check_policy(policy)
365
        node = self._put_path(user, self.ROOTNODE, account)
366
        self._put_policy(node, policy, True)
367

    
368
    @backend_method
369
    def delete_account(self, user, account):
370
        """Delete the account with the given name."""
371

    
372
        logger.debug("delete_account: %s %s", user, account)
373
        if user != account:
374
            raise NotAllowedError
375
        node = self.node.node_lookup(account)
376
        if node is None:
377
            return
378
        if not self.node.node_remove(node):
379
            raise AccountNotEmpty('Account is not empty')
380
        self.permissions.group_destroy(account)
381

    
382
    @backend_method
383
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
384
        """Return a list of containers existing under an account."""
385

    
386
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
387
                     account, marker, limit, shared, until, public)
388
        if user != account:
389
            if until or account not in self._allowed_accounts(user):
390
                raise NotAllowedError
391
            allowed = self._allowed_containers(user, account)
392
            start, limit = self._list_limits(allowed, marker, limit)
393
            return allowed[start:start + limit]
394
        if shared or public:
395
            allowed = set()
396
            if shared:
397
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
398
            if public:
399
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
400
            allowed = sorted(allowed)
401
            start, limit = self._list_limits(allowed, marker, limit)
402
            return allowed[start:start + limit]
403
        node = self.node.node_lookup(account)
404
        containers = [x[0] for x in self._list_object_properties(
405
            node, account, '', '/', marker, limit, False, None, [], until)]
406
        start, limit = self._list_limits(
407
            [x[0] for x in containers], marker, limit)
408
        return containers[start:start + limit]
409

    
410
    @backend_method
411
    def list_container_meta(self, user, account, container, domain, until=None):
412
        """Return a list with all the container's object meta keys for the domain."""
413

    
414
        logger.debug("list_container_meta: %s %s %s %s %s", user,
415
                     account, container, domain, until)
416
        allowed = []
417
        if user != account:
418
            if until:
419
                raise NotAllowedError
420
            allowed = self.permissions.access_list_paths(
421
                user, '/'.join((account, container)))
422
            if not allowed:
423
                raise NotAllowedError
424
        path, node = self._lookup_container(account, container)
425
        before = until if until is not None else inf
426
        allowed = self._get_formatted_paths(allowed)
427
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
428

    
429
    @backend_method
430
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
431
        """Return a dictionary with the container metadata for the domain."""
432

    
433
        logger.debug("get_container_meta: %s %s %s %s %s", user,
434
                     account, container, domain, until)
435
        if user != account:
436
            if until or container not in self._allowed_containers(user, account):
437
                raise NotAllowedError
438
        path, node = self._lookup_container(account, container)
439
        props = self._get_properties(node, until)
440
        mtime = props[self.MTIME]
441
        count, bytes, tstamp = self._get_statistics(node, until)
442
        tstamp = max(tstamp, mtime)
443
        if until is None:
444
            modified = tstamp
445
        else:
446
            modified = self._get_statistics(
447
                node)[2]  # Overall last modification.
448
            modified = max(modified, mtime)
449

    
450
        if user != account:
451
            meta = {'name': container}
452
        else:
453
            meta = {}
454
            if include_user_defined:
455
                meta.update(
456
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
457
            if until is not None:
458
                meta.update({'until_timestamp': tstamp})
459
            meta.update({'name': container, 'count': count, 'bytes': bytes})
460
        meta.update({'modified': modified})
461
        return meta
462

    
463
    @backend_method
464
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
465
        """Update the metadata associated with the container for the domain."""
466

    
467
        logger.debug("update_container_meta: %s %s %s %s %s %s",
468
                     user, account, container, domain, meta, replace)
469
        if user != account:
470
            raise NotAllowedError
471
        path, node = self._lookup_container(account, container)
472
        src_version_id, dest_version_id = self._put_metadata(
473
            user, node, domain, meta, replace)
474
        if src_version_id is not None:
475
            versioning = self._get_policy(node)['versioning']
476
            if versioning != 'auto':
477
                self.node.version_remove(src_version_id)
478

    
479
    @backend_method
480
    def get_container_policy(self, user, account, container):
481
        """Return a dictionary with the container policy."""
482

    
483
        logger.debug(
484
            "get_container_policy: %s %s %s", user, account, container)
485
        if user != account:
486
            if container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
            return {}
489
        path, node = self._lookup_container(account, container)
490
        return self._get_policy(node)
491

    
492
    @backend_method
493
    def update_container_policy(self, user, account, container, policy, replace=False):
494
        """Update the policy associated with the container."""
495

    
496
        logger.debug("update_container_policy: %s %s %s %s %s",
497
                     user, account, container, policy, replace)
498
        if user != account:
499
            raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501
        self._check_policy(policy)
502
        self._put_policy(node, policy, replace)
503

    
504
    @backend_method
505
    def put_container(self, user, account, container, policy={}):
506
        """Create a new container with the given name."""
507

    
508
        logger.debug(
509
            "put_container: %s %s %s %s", user, account, container, policy)
510
        if user != account:
511
            raise NotAllowedError
512
        try:
513
            path, node = self._lookup_container(account, container)
514
        except NameError:
515
            pass
516
        else:
517
            raise ContainerExists('Container already exists')
518
        if policy:
519
            self._check_policy(policy)
520
        path = '/'.join((account, container))
521
        node = self._put_path(
522
            user, self._lookup_account(account, True)[1], path)
523
        self._put_policy(node, policy, True)
524

    
525
    @backend_method
526
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
527
        """Delete/purge the container with the given name."""
528

    
529
        logger.debug("delete_container: %s %s %s %s %s %s", user,
530
                     account, container, until, prefix, delimiter)
531
        if user != account:
532
            raise NotAllowedError
533
        path, node = self._lookup_container(account, container)
534

    
535
        if until is not None:
536
            hashes, size, serials = self.node.node_purge_children(
537
                node, until, CLUSTER_HISTORY)
538
            for h in hashes:
539
                self.store.map_delete(h)
540
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
541
            if not self.free_versioning:
542
                self._report_size_change(
543
                    user, account, -size, {
544
                        'action':'container purge',
545
                        'path': path,
546
                        'versions': ','.join(str(i) for i in serials)
547
                    }
548
                )
549
            return
550

    
551
        if not delimiter:
552
            if self._get_statistics(node)[0] > 0:
553
                raise ContainerNotEmpty('Container is not empty')
554
            hashes, size, serials = self.node.node_purge_children(
555
                node, inf, CLUSTER_HISTORY)
556
            for h in hashes:
557
                self.store.map_delete(h)
558
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
559
            self.node.node_remove(node)
560
            if not self.free_versioning:
561
                self._report_size_change(
562
                    user, account, -size, {
563
                        'action':'container purge',
564
                        'path': path,
565
                        'versions': ','.join(str(i) for i in serials)
566
                    }
567
                )
568
        else:
569
            # remove only contents
570
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
571
            paths = []
572
            for t in src_names:
573
                path = '/'.join((account, container, t[0]))
574
                node = t[2]
575
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
576
                del_size = self._apply_versioning(
577
                    account, container, src_version_id)
578
                self._report_size_change(
579
                        user, account, -del_size, {
580
                                'action': 'object delete',
581
                                'path': path,
582
                        'versions': ','.join([str(dest_version_id)])
583
                     }
584
                )
585
                self._report_object_change(
586
                    user, account, path, details={'action': 'object delete'})
587
                paths.append(path)
588
            self.permissions.access_clear_bulk(paths)
589

    
590
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
591
        if user != account and until:
592
            raise NotAllowedError
593
        if shared and public:
594
            # get shared first
595
            shared = self._list_object_permissions(
596
                user, account, container, prefix, shared=True, public=False)
597
            objects = set()
598
            if shared:
599
                path, node = self._lookup_container(account, container)
600
                shared = self._get_formatted_paths(shared)
601
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
602

    
603
            # get public
604
            objects |= set(self._list_public_object_properties(
605
                user, account, container, prefix, all_props))
606
            objects = list(objects)
607

    
608
            objects.sort(key=lambda x: x[0])
609
            start, limit = self._list_limits(
610
                [x[0] for x in objects], marker, limit)
611
            return objects[start:start + limit]
612
        elif public:
613
            objects = self._list_public_object_properties(
614
                user, account, container, prefix, all_props)
615
            start, limit = self._list_limits(
616
                [x[0] for x in objects], marker, limit)
617
            return objects[start:start + limit]
618

    
619
        allowed = self._list_object_permissions(
620
            user, account, container, prefix, shared, public)
621
        if shared and not allowed:
622
            return []
623
        path, node = self._lookup_container(account, container)
624
        allowed = self._get_formatted_paths(allowed)
625
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
626
        start, limit = self._list_limits(
627
            [x[0] for x in objects], marker, limit)
628
        return objects[start:start + limit]
629

    
630
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
631
        public = self._list_object_permissions(
632
            user, account, container, prefix, shared=False, public=True)
633
        paths, nodes = self._lookup_objects(public)
634
        path = '/'.join((account, container))
635
        cont_prefix = path + '/'
636
        paths = [x[len(cont_prefix):] for x in paths]
637
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
638
        objects = [(path,) + props for path, props in zip(paths, props)]
639
        return objects
640

    
641
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
642
        objects = []
643
        while True:
644
            marker = objects[-1] if objects else None
645
            limit = 10000
646
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
647
            objects.extend(l)
648
            if not l or len(l) < limit:
649
                break
650
        return objects
651

    
652
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
653
        allowed = []
654
        path = '/'.join((account, container, prefix)).rstrip('/')
655
        if user != account:
656
            allowed = self.permissions.access_list_paths(user, path)
657
            if not allowed:
658
                raise NotAllowedError
659
        else:
660
            allowed = set()
661
            if shared:
662
                allowed.update(self.permissions.access_list_shared(path))
663
            if public:
664
                allowed.update(
665
                    [x[0] for x in self.permissions.public_list(path)])
666
            allowed = sorted(allowed)
667
            if not allowed:
668
                return []
669
        return allowed
670

    
671
    @backend_method
672
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
673
        """Return a list of object (name, version_id) tuples existing under a container."""
674

    
675
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
676
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
677

    
678
    @backend_method
679
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
680
        """Return a list of object metadata dicts existing under a container."""
681

    
682
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
683
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
684
        objects = []
685
        for p in props:
686
            if len(p) == 2:
687
                objects.append({'subdir': p[0]})
688
            else:
689
                objects.append({'name': p[0],
690
                                'bytes': p[self.SIZE + 1],
691
                                'type': p[self.TYPE + 1],
692
                                'hash': p[self.HASH + 1],
693
                                'version': p[self.SERIAL + 1],
694
                                'version_timestamp': p[self.MTIME + 1],
695
                                'modified': p[self.MTIME + 1] if until is None else None,
696
                                'modified_by': p[self.MUSER + 1],
697
                                'uuid': p[self.UUID + 1],
698
                                'checksum': p[self.CHECKSUM + 1]})
699
        return objects
700

    
701
    @backend_method
702
    def list_object_permissions(self, user, account, container, prefix=''):
703
        """Return a list of paths that enforce permissions under a container."""
704

    
705
        logger.debug("list_object_permissions: %s %s %s %s", user,
706
                     account, container, prefix)
707
        return self._list_object_permissions(user, account, container, prefix, True, False)
708

    
709
    @backend_method
710
    def list_object_public(self, user, account, container, prefix=''):
711
        """Return a dict mapping paths to public ids for objects that are public under a container."""
712

    
713
        logger.debug("list_object_public: %s %s %s %s", user,
714
                     account, container, prefix)
715
        public = {}
716
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
717
            public[path] = p + ULTIMATE_ANSWER
718
        return public
719

    
720
    @backend_method
721
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
722
        """Return a dictionary with the object metadata for the domain."""
723

    
724
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
725
                     account, container, name, domain, version)
726
        self._can_read(user, account, container, name)
727
        path, node = self._lookup_object(account, container, name)
728
        props = self._get_version(node, version)
729
        if version is None:
730
            modified = props[self.MTIME]
731
        else:
732
            try:
733
                modified = self._get_version(
734
                    node)[self.MTIME]  # Overall last modification.
735
            except NameError:  # Object may be deleted.
736
                del_props = self.node.version_lookup(
737
                    node, inf, CLUSTER_DELETED)
738
                if del_props is None:
739
                    raise ItemNotExists('Object does not exist')
740
                modified = del_props[self.MTIME]
741

    
742
        meta = {}
743
        if include_user_defined:
744
            meta.update(
745
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
746
        meta.update({'name': name,
747
                     'bytes': props[self.SIZE],
748
                     'type': props[self.TYPE],
749
                     'hash': props[self.HASH],
750
                     'version': props[self.SERIAL],
751
                     'version_timestamp': props[self.MTIME],
752
                     'modified': modified,
753
                     'modified_by': props[self.MUSER],
754
                     'uuid': props[self.UUID],
755
                     'checksum': props[self.CHECKSUM]})
756
        return meta
757

    
758
    @backend_method
759
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
760
        """Update the metadata associated with the object for the domain and return the new version."""
761

    
762
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
763
                     user, account, container, name, domain, meta, replace)
764
        self._can_write(user, account, container, name)
765
        path, node = self._lookup_object(account, container, name)
766
        src_version_id, dest_version_id = self._put_metadata(
767
            user, node, domain, meta, replace)
768
        self._apply_versioning(account, container, src_version_id)
769
        return dest_version_id
770

    
771
    @backend_method
772
    def get_object_permissions(self, user, account, container, name):
773
        """Return the action allowed on the object, the path
774
        from which the object gets its permissions from,
775
        along with a dictionary containing the permissions."""
776

    
777
        logger.debug("get_object_permissions: %s %s %s %s", user,
778
                     account, container, name)
779
        allowed = 'write'
780
        permissions_path = self._get_permissions_path(account, container, name)
781
        if user != account:
782
            if self.permissions.access_check(permissions_path, self.WRITE, user):
783
                allowed = 'write'
784
            elif self.permissions.access_check(permissions_path, self.READ, user):
785
                allowed = 'read'
786
            else:
787
                raise NotAllowedError
788
        self._lookup_object(account, container, name)
789
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
790

    
791
    @backend_method
792
    def update_object_permissions(self, user, account, container, name, permissions):
793
        """Update the permissions associated with the object."""
794

    
795
        logger.debug("update_object_permissions: %s %s %s %s %s",
796
                     user, account, container, name, permissions)
797
        if user != account:
798
            raise NotAllowedError
799
        path = self._lookup_object(account, container, name)[0]
800
        self._check_permissions(path, permissions)
801
        self.permissions.access_set(path, permissions)
802
        self._report_sharing_change(user, account, path, {'members':
803
                                    self.permissions.access_members(path)})
804

    
805
    @backend_method
806
    def get_object_public(self, user, account, container, name):
807
        """Return the public id of the object if applicable."""
808

    
809
        logger.debug(
810
            "get_object_public: %s %s %s %s", user, account, container, name)
811
        self._can_read(user, account, container, name)
812
        path = self._lookup_object(account, container, name)[0]
813
        p = self.permissions.public_get(path)
814
        if p is not None:
815
            p += ULTIMATE_ANSWER
816
        return p
817

    
818
    @backend_method
819
    def update_object_public(self, user, account, container, name, public):
820
        """Update the public status of the object."""
821

    
822
        logger.debug("update_object_public: %s %s %s %s %s", user,
823
                     account, container, name, public)
824
        self._can_write(user, account, container, name)
825
        path = self._lookup_object(account, container, name)[0]
826
        if not public:
827
            self.permissions.public_unset(path)
828
        else:
829
            self.permissions.public_set(path)
830

    
831
    @backend_method
832
    def get_object_hashmap(self, user, account, container, name, version=None):
833
        """Return the object's size and a list with partial hashes."""
834

    
835
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
836
                     account, container, name, version)
837
        self._can_read(user, account, container, name)
838
        path, node = self._lookup_object(account, container, name)
839
        props = self._get_version(node, version)
840
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
841
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
842

    
843
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
844
        if permissions is not None and user != account:
845
            raise NotAllowedError
846
        self._can_write(user, account, container, name)
847
        if permissions is not None:
848
            path = '/'.join((account, container, name))
849
            self._check_permissions(path, permissions)
850

    
851
        account_path, account_node = self._lookup_account(account, True)
852
        container_path, container_node = self._lookup_container(
853
            account, container)
854
        path, node = self._put_object_node(
855
            container_path, container_node, name)
856
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
857

    
858
        # Handle meta.
859
        if src_version_id is None:
860
            src_version_id = pre_version_id
861
        self._put_metadata_duplicate(
862
            src_version_id, dest_version_id, domain, meta, replace_meta)
863

    
864
        del_size = self._apply_versioning(account, container, pre_version_id)
865
        size_delta = size - del_size
866
        if not self.using_external_quotaholder: # Check account quota.
867
            if size_delta > 0:
868
                account_quota = long(self._get_policy(account_node)['quota'])
869
                account_usage = self._get_statistics(account_node)[1] + size_delta
870
                if (account_quota > 0 and account_usage > account_quota):
871
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
872
                        account_quota, account_usage
873
                    ))
874

    
875
        # Check container quota.
876
        container_quota = long(self._get_policy(container_node)['quota'])
877
        container_usage = self._get_statistics(container_node)[1] + size_delta
878
        if (container_quota > 0 and container_usage > container_quota):
879
            # This must be executed in a transaction, so the version is
880
            # never created if it fails.
881
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
882
                container_quota, container_usage
883
            ))
884

    
885
        self._report_size_change(user, account, size_delta,
886
                                 {'action': 'object update', 'path': path,
887
                                  'versions': ','.join([str(dest_version_id)])})
888
        if permissions is not None:
889
            self.permissions.access_set(path, permissions)
890
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
891

    
892
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
893
        return dest_version_id
894

    
895
    @backend_method
896
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
897
        """Create/update an object with the specified size and partial hashes."""
898

    
899
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
900
                     account, container, name, size, type, hashmap, checksum)
901
        if size == 0:  # No such thing as an empty hashmap.
902
            hashmap = [self.put_block('')]
903
        map = HashMap(self.block_size, self.hash_algorithm)
904
        map.extend([binascii.unhexlify(x) for x in hashmap])
905
        missing = self.store.block_search(map)
906
        if missing:
907
            ie = IndexError()
908
            ie.data = [binascii.hexlify(x) for x in missing]
909
            raise ie
910

    
911
        hash = map.hash()
912
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
913
        self.store.map_put(hash, map)
914
        return dest_version_id
915

    
916
    @backend_method
917
    def update_object_checksum(self, user, account, container, name, version, checksum):
918
        """Update an object's checksum."""
919

    
920
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
921
                     user, account, container, name, version, checksum)
922
        # Update objects with greater version and same hashmap and size (fix metadata updates).
923
        self._can_write(user, account, container, name)
924
        path, node = self._lookup_object(account, container, name)
925
        props = self._get_version(node, version)
926
        versions = self.node.node_get_versions(node)
927
        for x in versions:
928
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
929
                self.node.version_put_property(
930
                    x[self.SERIAL], 'checksum', checksum)
931

    
932
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
933
        dest_version_ids = []
934
        self._can_read(user, src_account, src_container, src_name)
935
        path, node = self._lookup_object(src_account, src_container, src_name)
936
        # TODO: Will do another fetch of the properties in duplicate version...
937
        props = self._get_version(
938
            node, src_version)  # Check to see if source exists.
939
        src_version_id = props[self.SERIAL]
940
        hash = props[self.HASH]
941
        size = props[self.SIZE]
942
        is_copy = not is_move and (src_account, src_container, src_name) != (
943
            dest_account, dest_container, dest_name)  # New uuid.
944
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
945
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
946
            self._delete_object(user, src_account, src_container, src_name)
947

    
948
        if delimiter:
949
            prefix = src_name + \
950
                delimiter if not src_name.endswith(delimiter) else src_name
951
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
952
            src_names.sort(key=lambda x: x[2])  # order by nodes
953
            paths = [elem[0] for elem in src_names]
954
            nodes = [elem[2] for elem in src_names]
955
            # TODO: Will do another fetch of the properties in duplicate version...
956
            props = self._get_versions(nodes)  # Check to see if source exists.
957

    
958
            for prop, path, node in zip(props, paths, nodes):
959
                src_version_id = prop[self.SERIAL]
960
                hash = prop[self.HASH]
961
                vtype = prop[self.TYPE]
962
                size = prop[self.SIZE]
963
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
964
                    delimiter) else dest_name
965
                vdest_name = path.replace(prefix, dest_prefix, 1)
966
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
967
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
968
                    self._delete_object(user, src_account, src_container, path)
969
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
970

    
971
    @backend_method
972
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
973
        """Copy an object's data and metadata."""
974

    
975
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
976
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
977
        return dest_version_id
978

    
979
    @backend_method
980
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
981
        """Move an object's data and metadata."""
982

    
983
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
984
        if user != src_account:
985
            raise NotAllowedError
986
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
987
        return dest_version_id
988

    
989
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
990
        if user != account:
991
            raise NotAllowedError
992

    
993
        if until is not None:
994
            path = '/'.join((account, container, name))
995
            node = self.node.node_lookup(path)
996
            if node is None:
997
                return
998
            hashes = []
999
            size = 0
1000
            serials = []
1001
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1002
            hashes += h
1003
            size += s
1004
            serials += v
1005
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1006
            hashes += h
1007
            if not self.free_versioning:
1008
                size += s
1009
            serials += v
1010
            for h in hashes:
1011
                self.store.map_delete(h)
1012
            self.node.node_purge(node, until, CLUSTER_DELETED)
1013
            try:
1014
                props = self._get_version(node)
1015
            except NameError:
1016
                self.permissions.access_clear(path)
1017
            self._report_size_change(
1018
                user, account, -size, {
1019
                    'action': 'object purge',
1020
                    'path': path,
1021
                    'versions': ','.join(str(i) for i in serials)
1022
                }
1023
            )
1024
            return
1025

    
1026
        path, node = self._lookup_object(account, container, name)
1027
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1028
        del_size = self._apply_versioning(account, container, src_version_id)
1029
        self._report_size_change(user, account, -del_size,
1030
                                 {'action': 'object delete', 'path': path,
1031
                                  'versions': ','.join([str(dest_version_id)])})
1032
        self._report_object_change(
1033
            user, account, path, details={'action': 'object delete'})
1034
        self.permissions.access_clear(path)
1035

    
1036
        if delimiter:
1037
            prefix = name + delimiter if not name.endswith(delimiter) else name
1038
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1039
            paths = []
1040
            for t in src_names:
1041
                path = '/'.join((account, container, t[0]))
1042
                node = t[2]
1043
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1044
                del_size = self._apply_versioning(
1045
                    account, container, src_version_id)
1046
                self._report_size_change(user, account, -del_size,
1047
                                         {'action': 'object delete',
1048
                                          'path': path,
1049
                                          'versions': ','.join([str(dest_version_id)])})
1050
                self._report_object_change(
1051
                    user, account, path, details={'action': 'object delete'})
1052
                paths.append(path)
1053
            self.permissions.access_clear_bulk(paths)
1054

    
1055
    @backend_method
1056
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1057
        """Delete/purge an object."""
1058

    
1059
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1060
                     account, container, name, until, prefix, delimiter)
1061
        self._delete_object(user, account, container, name, until, delimiter)
1062

    
1063
    @backend_method
1064
    def list_versions(self, user, account, container, name):
1065
        """Return a list of all (version, version_timestamp) tuples for an object."""
1066

    
1067
        logger.debug(
1068
            "list_versions: %s %s %s %s", user, account, container, name)
1069
        self._can_read(user, account, container, name)
1070
        path, node = self._lookup_object(account, container, name)
1071
        versions = self.node.node_get_versions(node)
1072
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1073

    
1074
    @backend_method
1075
    def get_uuid(self, user, uuid):
1076
        """Return the (account, container, name) for the UUID given."""
1077

    
1078
        logger.debug("get_uuid: %s %s", user, uuid)
1079
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1080
        if info is None:
1081
            raise NameError
1082
        path, serial = info
1083
        account, container, name = path.split('/', 2)
1084
        self._can_read(user, account, container, name)
1085
        return (account, container, name)
1086

    
1087
    @backend_method
1088
    def get_public(self, user, public):
1089
        """Return the (account, container, name) for the public id given."""
1090

    
1091
        logger.debug("get_public: %s %s", user, public)
1092
        if public is None or public < ULTIMATE_ANSWER:
1093
            raise NameError
1094
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1095
        if path is None:
1096
            raise NameError
1097
        account, container, name = path.split('/', 2)
1098
        self._can_read(user, account, container, name)
1099
        return (account, container, name)
1100

    
1101
    @backend_method(autocommit=0)
1102
    def get_block(self, hash):
1103
        """Return a block's data."""
1104

    
1105
        logger.debug("get_block: %s", hash)
1106
        block = self.store.block_get(binascii.unhexlify(hash))
1107
        if not block:
1108
            raise ItemNotExists('Block does not exist')
1109
        return block
1110

    
1111
    @backend_method(autocommit=0)
1112
    def put_block(self, data):
1113
        """Store a block and return the hash."""
1114

    
1115
        logger.debug("put_block: %s", len(data))
1116
        return binascii.hexlify(self.store.block_put(data))
1117

    
1118
    @backend_method(autocommit=0)
1119
    def update_block(self, hash, data, offset=0):
1120
        """Update a known block and return the hash."""
1121

    
1122
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1123
        if offset == 0 and len(data) == self.block_size:
1124
            return self.put_block(data)
1125
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1126
        return binascii.hexlify(h)
1127

    
1128
    # Path functions.
1129

    
1130
    def _generate_uuid(self):
1131
        return str(uuidlib.uuid4())
1132

    
1133
    def _put_object_node(self, path, parent, name):
1134
        path = '/'.join((path, name))
1135
        node = self.node.node_lookup(path)
1136
        if node is None:
1137
            node = self.node.node_create(parent, path)
1138
        return path, node
1139

    
1140
    def _put_path(self, user, parent, path):
1141
        node = self.node.node_create(parent, path)
1142
        self.node.version_create(node, None, 0, '', None, user,
1143
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1144
        return node
1145

    
1146
    def _lookup_account(self, account, create=True):
1147
        node = self.node.node_lookup(account)
1148
        if node is None and create:
1149
            node = self._put_path(
1150
                account, self.ROOTNODE, account)  # User is account.
1151
        return account, node
1152

    
1153
    def _lookup_container(self, account, container):
1154
        path = '/'.join((account, container))
1155
        node = self.node.node_lookup(path)
1156
        if node is None:
1157
            raise ItemNotExists('Container does not exist')
1158
        return path, node
1159

    
1160
    def _lookup_object(self, account, container, name):
1161
        path = '/'.join((account, container, name))
1162
        node = self.node.node_lookup(path)
1163
        if node is None:
1164
            raise ItemNotExists('Object does not exist')
1165
        return path, node
1166

    
1167
    def _lookup_objects(self, paths):
1168
        nodes = self.node.node_lookup_bulk(paths)
1169
        return paths, nodes
1170

    
1171
    def _get_properties(self, node, until=None):
1172
        """Return properties until the timestamp given."""
1173

    
1174
        before = until if until is not None else inf
1175
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1176
        if props is None and until is not None:
1177
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1178
        if props is None:
1179
            raise ItemNotExists('Path does not exist')
1180
        return props
1181

    
1182
    def _get_statistics(self, node, until=None):
1183
        """Return count, sum of size and latest timestamp of everything under node."""
1184

    
1185
        if until is None:
1186
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1187
        else:
1188
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1189
        if stats is None:
1190
            stats = (0, 0, 0)
1191
        return stats
1192

    
1193
    def _get_version(self, node, version=None):
1194
        if version is None:
1195
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1196
            if props is None:
1197
                raise ItemNotExists('Object does not exist')
1198
        else:
1199
            try:
1200
                version = int(version)
1201
            except ValueError:
1202
                raise VersionNotExists('Version does not exist')
1203
            props = self.node.version_get_properties(version)
1204
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1205
                raise VersionNotExists('Version does not exist')
1206
        return props
1207

    
1208
    def _get_versions(self, nodes):
1209
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1210

    
1211
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1212
        """Create a new version of the node."""
1213

    
1214
        props = self.node.version_lookup(
1215
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1216
        if props is not None:
1217
            src_version_id = props[self.SERIAL]
1218
            src_hash = props[self.HASH]
1219
            src_size = props[self.SIZE]
1220
            src_type = props[self.TYPE]
1221
            src_checksum = props[self.CHECKSUM]
1222
        else:
1223
            src_version_id = None
1224
            src_hash = None
1225
            src_size = 0
1226
            src_type = ''
1227
            src_checksum = ''
1228
        if size is None:  # Set metadata.
1229
            hash = src_hash  # This way hash can be set to None (account or container).
1230
            size = src_size
1231
        if type is None:
1232
            type = src_type
1233
        if checksum is None:
1234
            checksum = src_checksum
1235
        uuid = self._generate_uuid(
1236
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1237

    
1238
        if src_node is None:
1239
            pre_version_id = src_version_id
1240
        else:
1241
            pre_version_id = None
1242
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1243
            if props is not None:
1244
                pre_version_id = props[self.SERIAL]
1245
        if pre_version_id is not None:
1246
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1247

    
1248
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1249
        return pre_version_id, dest_version_id
1250

    
1251
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1252
        if src_version_id is not None:
1253
            self.node.attribute_copy(src_version_id, dest_version_id)
1254
        if not replace:
1255
            self.node.attribute_del(dest_version_id, domain, (
1256
                k for k, v in meta.iteritems() if v == ''))
1257
            self.node.attribute_set(dest_version_id, domain, (
1258
                (k, v) for k, v in meta.iteritems() if v != ''))
1259
        else:
1260
            self.node.attribute_del(dest_version_id, domain)
1261
            self.node.attribute_set(dest_version_id, domain, ((
1262
                k, v) for k, v in meta.iteritems()))
1263

    
1264
    def _put_metadata(self, user, node, domain, meta, replace=False):
1265
        """Create a new version and store metadata."""
1266

    
1267
        src_version_id, dest_version_id = self._put_version_duplicate(
1268
            user, node)
1269
        self._put_metadata_duplicate(
1270
            src_version_id, dest_version_id, domain, meta, replace)
1271
        return src_version_id, dest_version_id
1272

    
1273
    def _list_limits(self, listing, marker, limit):
1274
        start = 0
1275
        if marker:
1276
            try:
1277
                start = listing.index(marker) + 1
1278
            except ValueError:
1279
                pass
1280
        if not limit or limit > 10000:
1281
            limit = 10000
1282
        return start, limit
1283

    
1284
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1285
        cont_prefix = path + '/'
1286
        prefix = cont_prefix + prefix
1287
        start = cont_prefix + marker if marker else None
1288
        before = until if until is not None else inf
1289
        filterq = keys if domain else []
1290
        sizeq = size_range
1291

    
1292
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1293
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1294
        objects.sort(key=lambda x: x[0])
1295
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1296
        return objects
1297

    
1298
    # Reporting functions.
1299

    
1300
    def _report_size_change(self, user, account, size, details={}):
1301
        if size == 0:
1302
            return
1303

    
1304
        account_node = self._lookup_account(account, True)[1]
1305
        total = self._get_statistics(account_node)[1]
1306
        details.update({'user': user, 'total': total})
1307
        logger.debug(
1308
            "_report_size_change: %s %s %s %s", user, account, size, details)
1309
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1310
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1311
                              float(size), details))
1312

    
1313
        if not self.using_external_quotaholder:
1314
            return
1315

    
1316
        try:
1317
            serial = self.quotaholder.issue_commission(
1318
                    context     =   {},
1319
                    target      =   account,
1320
                    key         =   '1',
1321
                    clientkey   =   'pithos',
1322
                    ownerkey    =   '',
1323
                    name        =   details['path'] if 'path' in details else '',
1324
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1325
            )
1326
        except BaseException, e:
1327
            raise QuotaError(e)
1328
        else:
1329
            self.serials.append(serial)
1330

    
1331
    def _report_object_change(self, user, account, path, details={}):
1332
        details.update({'user': user})
1333
        logger.debug("_report_object_change: %s %s %s %s", user,
1334
                     account, path, details)
1335
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1336
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1337

    
1338
    def _report_sharing_change(self, user, account, path, details={}):
1339
        logger.debug("_report_permissions_change: %s %s %s %s",
1340
                     user, account, path, details)
1341
        details.update({'user': user})
1342
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1343
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1344

    
1345
    # Policy functions.
1346

    
1347
    def _check_policy(self, policy):
1348
        for k in policy.keys():
1349
            if policy[k] == '':
1350
                policy[k] = self.default_policy.get(k)
1351
        for k, v in policy.iteritems():
1352
            if k == 'quota':
1353
                q = int(v)  # May raise ValueError.
1354
                if q < 0:
1355
                    raise ValueError
1356
            elif k == 'versioning':
1357
                if v not in ['auto', 'none']:
1358
                    raise ValueError
1359
            else:
1360
                raise ValueError
1361

    
1362
    def _put_policy(self, node, policy, replace):
1363
        if replace:
1364
            for k, v in self.default_policy.iteritems():
1365
                if k not in policy:
1366
                    policy[k] = v
1367
        self.node.policy_set(node, policy)
1368

    
1369
    def _get_policy(self, node):
1370
        policy = self.default_policy.copy()
1371
        policy.update(self.node.policy_get(node))
1372
        return policy
1373

    
1374
    def _apply_versioning(self, account, container, version_id):
1375
        """Delete the provided version if such is the policy.
1376
           Return size of object removed.
1377
        """
1378

    
1379
        if version_id is None:
1380
            return 0
1381
        path, node = self._lookup_container(account, container)
1382
        versioning = self._get_policy(node)['versioning']
1383
        if versioning != 'auto':
1384
            hash, size = self.node.version_remove(version_id)
1385
            self.store.map_delete(hash)
1386
            return size
1387
        elif self.free_versioning:
1388
            return self.node.version_get_properties(
1389
                version_id, keys=('size',))[0]
1390
        return 0
1391

    
1392
    # Access control functions.
1393

    
1394
    def _check_groups(self, groups):
1395
        # raise ValueError('Bad characters in groups')
1396
        pass
1397

    
1398
    def _check_permissions(self, path, permissions):
1399
        # raise ValueError('Bad characters in permissions')
1400
        pass
1401

    
1402
    def _get_formatted_paths(self, paths):
1403
        formatted = []
1404
        for p in paths:
1405
            node = self.node.node_lookup(p)
1406
            props = None
1407
            if node is not None:
1408
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1409
            if props is not None:
1410
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1411
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1412
                formatted.append((p, self.MATCH_EXACT))
1413
        return formatted
1414

    
1415
    def _get_permissions_path(self, account, container, name):
1416
        path = '/'.join((account, container, name))
1417
        permission_paths = self.permissions.access_inherit(path)
1418
        permission_paths.sort()
1419
        permission_paths.reverse()
1420
        for p in permission_paths:
1421
            if p == path:
1422
                return p
1423
            else:
1424
                if p.count('/') < 2:
1425
                    continue
1426
                node = self.node.node_lookup(p)
1427
                props = None
1428
                if node is not None:
1429
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1430
                if props is not None:
1431
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1432
                        return p
1433
        return None
1434

    
1435
    def _can_read(self, user, account, container, name):
1436
        if user == account:
1437
            return True
1438
        path = '/'.join((account, container, name))
1439
        if self.permissions.public_get(path) is not None:
1440
            return True
1441
        path = self._get_permissions_path(account, container, name)
1442
        if not path:
1443
            raise NotAllowedError
1444
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1445
            raise NotAllowedError
1446

    
1447
    def _can_write(self, user, account, container, name):
1448
        if user == account:
1449
            return True
1450
        path = '/'.join((account, container, name))
1451
        path = self._get_permissions_path(account, container, name)
1452
        if not path:
1453
            raise NotAllowedError
1454
        if not self.permissions.access_check(path, self.WRITE, user):
1455
            raise NotAllowedError
1456

    
1457
    def _allowed_accounts(self, user):
1458
        allow = set()
1459
        for path in self.permissions.access_list_paths(user):
1460
            allow.add(path.split('/', 1)[0])
1461
        return sorted(allow)
1462

    
1463
    def _allowed_containers(self, user, account):
1464
        allow = set()
1465
        for path in self.permissions.access_list_paths(user, account):
1466
            allow.add(path.split('/', 2)[1])
1467
        return sorted(allow)