Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 1f3f907f

History | View | Annotate | Download (62.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_enabled=True,
149
                 quotaholder_url=None, quotaholder_token=None,
150
                 free_versioning=True, block_params=None):
151
        db_module = db_module or DEFAULT_DB_MODULE
152
        db_connection = db_connection or DEFAULT_DB_CONNECTION
153
        block_module = block_module or DEFAULT_BLOCK_MODULE
154
        block_path = block_path or DEFAULT_BLOCK_PATH
155
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
156
        block_params = block_params or DEFAULT_BLOCK_PARAMS
157
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
158

    
159
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
160
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
161
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
162

    
163
        self.hash_algorithm = 'sha256'
164
        self.block_size = 4 * 1024 * 1024  # 4MB
165
        self.free_versioning = free_versioning
166

    
167
        self.default_policy = {'quota': DEFAULT_QUOTA,
168
                               'versioning': DEFAULT_VERSIONING}
169

    
170
        def load_module(m):
171
            __import__(m)
172
            return sys.modules[m]
173

    
174
        self.db_module = load_module(db_module)
175
        self.wrapper = self.db_module.DBWrapper(db_connection)
176
        params = {'wrapper': self.wrapper}
177
        self.permissions = self.db_module.Permissions(**params)
178
        self.config = self.db_module.Config(**params)
179
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
180
        for x in ['READ', 'WRITE']:
181
            setattr(self, x, getattr(self.db_module, x))
182
        self.node = self.db_module.Node(**params)
183
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
184
            setattr(self, x, getattr(self.db_module, x))
185

    
186
        self.block_module = load_module(block_module)
187
        self.block_params = block_params
188
        params = {'path': block_path,
189
                  'block_size': self.block_size,
190
                  'hash_algorithm': self.hash_algorithm,
191
                  'umask': block_umask}
192
        params.update(self.block_params)
193
        self.store = self.block_module.Store(**params)
194

    
195
        if queue_module and queue_hosts:
196
            self.queue_module = load_module(queue_module)
197
            params = {'hosts': queue_hosts,
198
                      'exchange': queue_exchange,
199
                      'client_id': QUEUE_CLIENT_ID}
200
            self.queue = self.queue_module.Queue(**params)
201
        else:
202
            class NoQueue:
203
                def send(self, *args):
204
                    pass
205

    
206
                def close(self):
207
                    pass
208

    
209
            self.queue = NoQueue()
210

    
211
        self.quotaholder_enabled = quotaholder_enabled
212
        self.quotaholder_url = quotaholder_url
213
        self.quotaholder_token = quotaholder_token
214
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
215
        self.serials = []
216
        self.messages = []
217

    
218
    def close(self):
219
        self.wrapper.close()
220
        self.queue.close()
221

    
222
    @property
223
    def using_external_quotaholder(self):
224
        return self.quotaholder_enabled
225

    
226
    @backend_method
227
    def list_accounts(self, user, marker=None, limit=10000):
228
        """Return a list of accounts the user can access."""
229

    
230
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
231
        allowed = self._allowed_accounts(user)
232
        start, limit = self._list_limits(allowed, marker, limit)
233
        return allowed[start:start + limit]
234

    
235
    @backend_method
236
    def get_account_meta(
237
            self, user, account, domain, until=None, include_user_defined=True,
238
            external_quota=None):
239
        """Return a dictionary with the account metadata for the domain."""
240

    
241
        logger.debug(
242
            "get_account_meta: %s %s %s %s", user, account, domain, until)
243
        path, node = self._lookup_account(account, user == account)
244
        if user != account:
245
            if until or node is None or account not in self._allowed_accounts(user):
246
                raise NotAllowedError
247
        try:
248
            props = self._get_properties(node, until)
249
            mtime = props[self.MTIME]
250
        except NameError:
251
            props = None
252
            mtime = until
253
        count, bytes, tstamp = self._get_statistics(node, until)
254
        tstamp = max(tstamp, mtime)
255
        if until is None:
256
            modified = tstamp
257
        else:
258
            modified = self._get_statistics(
259
                node)[2]  # Overall last modification.
260
            modified = max(modified, mtime)
261

    
262
        if user != account:
263
            meta = {'name': account}
264
        else:
265
            meta = {}
266
            if props is not None and include_user_defined:
267
                meta.update(
268
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
269
            if until is not None:
270
                meta.update({'until_timestamp': tstamp})
271
            meta.update({'name': account, 'count': count, 'bytes': bytes})
272
            if self.using_external_quotaholder:
273
                external_quota = external_quota or {}
274
                meta['bytes'] = external_quota.get('currValue', 0)
275
        meta.update({'modified': modified})
276
        return meta
277

    
278
    @backend_method
279
    def update_account_meta(self, user, account, domain, meta, replace=False):
280
        """Update the metadata associated with the account for the domain."""
281

    
282
        logger.debug("update_account_meta: %s %s %s %s %s", user,
283
                     account, domain, meta, replace)
284
        if user != account:
285
            raise NotAllowedError
286
        path, node = self._lookup_account(account, True)
287
        self._put_metadata(user, node, domain, meta, replace)
288

    
289
    @backend_method
290
    def get_account_groups(self, user, account):
291
        """Return a dictionary with the user groups defined for this account."""
292

    
293
        logger.debug("get_account_groups: %s %s", user, account)
294
        if user != account:
295
            if account not in self._allowed_accounts(user):
296
                raise NotAllowedError
297
            return {}
298
        self._lookup_account(account, True)
299
        return self.permissions.group_dict(account)
300

    
301
    @backend_method
302
    def update_account_groups(self, user, account, groups, replace=False):
303
        """Update the groups associated with the account."""
304

    
305
        logger.debug("update_account_groups: %s %s %s %s", user,
306
                     account, groups, replace)
307
        if user != account:
308
            raise NotAllowedError
309
        self._lookup_account(account, True)
310
        self._check_groups(groups)
311
        if replace:
312
            self.permissions.group_destroy(account)
313
        for k, v in groups.iteritems():
314
            if not replace:  # If not already deleted.
315
                self.permissions.group_delete(account, k)
316
            if v:
317
                self.permissions.group_addmany(account, k, v)
318

    
319
    @backend_method
320
    def get_account_policy(self, user, account, external_quota=None):
321
        """Return a dictionary with the account policy."""
322

    
323
        logger.debug("get_account_policy: %s %s", user, account)
324
        if user != account:
325
            if account not in self._allowed_accounts(user):
326
                raise NotAllowedError
327
            return {}
328
        path, node = self._lookup_account(account, True)
329
        policy = self._get_policy(node)
330
        if self.using_external_quotaholder:
331
            external_quota = external_quota or {}
332
            policy['quota'] = external_quota.get('maxValue', 0)
333
        return policy
334

    
335
    @backend_method
336
    def update_account_policy(self, user, account, policy, replace=False):
337
        """Update the policy associated with the account."""
338

    
339
        logger.debug("update_account_policy: %s %s %s %s", user,
340
                     account, policy, replace)
341
        if user != account:
342
            raise NotAllowedError
343
        path, node = self._lookup_account(account, True)
344
        self._check_policy(policy)
345
        self._put_policy(node, policy, replace)
346

    
347
    @backend_method
348
    def put_account(self, user, account, policy={}):
349
        """Create a new account with the given name."""
350

    
351
        logger.debug("put_account: %s %s %s", user, account, policy)
352
        if user != account:
353
            raise NotAllowedError
354
        node = self.node.node_lookup(account)
355
        if node is not None:
356
            raise AccountExists('Account already exists')
357
        if policy:
358
            self._check_policy(policy)
359
        node = self._put_path(user, self.ROOTNODE, account)
360
        self._put_policy(node, policy, True)
361

    
362
    @backend_method
363
    def delete_account(self, user, account):
364
        """Delete the account with the given name."""
365

    
366
        logger.debug("delete_account: %s %s", user, account)
367
        if user != account:
368
            raise NotAllowedError
369
        node = self.node.node_lookup(account)
370
        if node is None:
371
            return
372
        if not self.node.node_remove(node):
373
            raise AccountNotEmpty('Account is not empty')
374
        self.permissions.group_destroy(account)
375

    
376
    @backend_method
377
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
378
        """Return a list of containers existing under an account."""
379

    
380
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
381
                     account, marker, limit, shared, until, public)
382
        if user != account:
383
            if until or account not in self._allowed_accounts(user):
384
                raise NotAllowedError
385
            allowed = self._allowed_containers(user, account)
386
            start, limit = self._list_limits(allowed, marker, limit)
387
            return allowed[start:start + limit]
388
        if shared or public:
389
            allowed = set()
390
            if shared:
391
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
392
            if public:
393
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
394
            allowed = sorted(allowed)
395
            start, limit = self._list_limits(allowed, marker, limit)
396
            return allowed[start:start + limit]
397
        node = self.node.node_lookup(account)
398
        containers = [x[0] for x in self._list_object_properties(
399
            node, account, '', '/', marker, limit, False, None, [], until)]
400
        start, limit = self._list_limits(
401
            [x[0] for x in containers], marker, limit)
402
        return containers[start:start + limit]
403

    
404
    @backend_method
405
    def list_container_meta(self, user, account, container, domain, until=None):
406
        """Return a list with all the container's object meta keys for the domain."""
407

    
408
        logger.debug("list_container_meta: %s %s %s %s %s", user,
409
                     account, container, domain, until)
410
        allowed = []
411
        if user != account:
412
            if until:
413
                raise NotAllowedError
414
            allowed = self.permissions.access_list_paths(
415
                user, '/'.join((account, container)))
416
            if not allowed:
417
                raise NotAllowedError
418
        path, node = self._lookup_container(account, container)
419
        before = until if until is not None else inf
420
        allowed = self._get_formatted_paths(allowed)
421
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
422

    
423
    @backend_method
424
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
425
        """Return a dictionary with the container metadata for the domain."""
426

    
427
        logger.debug("get_container_meta: %s %s %s %s %s", user,
428
                     account, container, domain, until)
429
        if user != account:
430
            if until or container not in self._allowed_containers(user, account):
431
                raise NotAllowedError
432
        path, node = self._lookup_container(account, container)
433
        props = self._get_properties(node, until)
434
        mtime = props[self.MTIME]
435
        count, bytes, tstamp = self._get_statistics(node, until)
436
        tstamp = max(tstamp, mtime)
437
        if until is None:
438
            modified = tstamp
439
        else:
440
            modified = self._get_statistics(
441
                node)[2]  # Overall last modification.
442
            modified = max(modified, mtime)
443

    
444
        if user != account:
445
            meta = {'name': container}
446
        else:
447
            meta = {}
448
            if include_user_defined:
449
                meta.update(
450
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
451
            if until is not None:
452
                meta.update({'until_timestamp': tstamp})
453
            meta.update({'name': container, 'count': count, 'bytes': bytes})
454
        meta.update({'modified': modified})
455
        return meta
456

    
457
    @backend_method
458
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
459
        """Update the metadata associated with the container for the domain."""
460

    
461
        logger.debug("update_container_meta: %s %s %s %s %s %s",
462
                     user, account, container, domain, meta, replace)
463
        if user != account:
464
            raise NotAllowedError
465
        path, node = self._lookup_container(account, container)
466
        src_version_id, dest_version_id = self._put_metadata(
467
            user, node, domain, meta, replace)
468
        if src_version_id is not None:
469
            versioning = self._get_policy(node)['versioning']
470
            if versioning != 'auto':
471
                self.node.version_remove(src_version_id)
472

    
473
    @backend_method
474
    def get_container_policy(self, user, account, container):
475
        """Return a dictionary with the container policy."""
476

    
477
        logger.debug(
478
            "get_container_policy: %s %s %s", user, account, container)
479
        if user != account:
480
            if container not in self._allowed_containers(user, account):
481
                raise NotAllowedError
482
            return {}
483
        path, node = self._lookup_container(account, container)
484
        return self._get_policy(node)
485

    
486
    @backend_method
487
    def update_container_policy(self, user, account, container, policy, replace=False):
488
        """Update the policy associated with the container."""
489

    
490
        logger.debug("update_container_policy: %s %s %s %s %s",
491
                     user, account, container, policy, replace)
492
        if user != account:
493
            raise NotAllowedError
494
        path, node = self._lookup_container(account, container)
495
        self._check_policy(policy)
496
        self._put_policy(node, policy, replace)
497

    
498
    @backend_method
499
    def put_container(self, user, account, container, policy={}):
500
        """Create a new container with the given name."""
501

    
502
        logger.debug(
503
            "put_container: %s %s %s %s", user, account, container, policy)
504
        if user != account:
505
            raise NotAllowedError
506
        try:
507
            path, node = self._lookup_container(account, container)
508
        except NameError:
509
            pass
510
        else:
511
            raise ContainerExists('Container already exists')
512
        if policy:
513
            self._check_policy(policy)
514
        path = '/'.join((account, container))
515
        node = self._put_path(
516
            user, self._lookup_account(account, True)[1], path)
517
        self._put_policy(node, policy, True)
518

    
519
    @backend_method
520
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
521
        """Delete/purge the container with the given name."""
522

    
523
        logger.debug("delete_container: %s %s %s %s %s %s", user,
524
                     account, container, until, prefix, delimiter)
525
        if user != account:
526
            raise NotAllowedError
527
        path, node = self._lookup_container(account, container)
528

    
529
        if until is not None:
530
            hashes, size, serials = self.node.node_purge_children(
531
                node, until, CLUSTER_HISTORY)
532
            for h in hashes:
533
                self.store.map_delete(h)
534
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
535
            self._report_size_change(user, account, -size,
536
                                     {'action':'container purge', 'path': path,
537
                                      'versions': ','.join(str(i) for i in serials)})
538
            return
539

    
540
        if not delimiter:
541
            if self._get_statistics(node)[0] > 0:
542
                raise ContainerNotEmpty('Container is not empty')
543
            hashes, size, serials = self.node.node_purge_children(
544
                node, inf, CLUSTER_HISTORY)
545
            for h in hashes:
546
                self.store.map_delete(h)
547
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
548
            self.node.node_remove(node)
549
            self._report_size_change(user, account, -size,
550
                                     {'action': 'container delete',
551
                                      'path': path,
552
                                      'versions': ','.join(str(i) for i in serials)})
553
        else:
554
            # remove only contents
555
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
556
            paths = []
557
            for t in src_names:
558
                path = '/'.join((account, container, t[0]))
559
                node = t[2]
560
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
561
                del_size = self._apply_versioning(
562
                    account, container, src_version_id)
563
                self._report_size_change(
564
                        user, account, -del_size, {
565
                                'action': 'object delete',
566
                                'path': path,
567
                        'versions': ','.join([str(dest_version_id)])
568
                     }
569
                )
570
                self._report_object_change(
571
                    user, account, path, details={'action': 'object delete'})
572
                paths.append(path)
573
            self.permissions.access_clear_bulk(paths)
574

    
575
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
576
        if user != account and until:
577
            raise NotAllowedError
578
        if shared and public:
579
            # get shared first
580
            shared = self._list_object_permissions(
581
                user, account, container, prefix, shared=True, public=False)
582
            objects = set()
583
            if shared:
584
                path, node = self._lookup_container(account, container)
585
                shared = self._get_formatted_paths(shared)
586
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
587

    
588
            # get public
589
            objects |= set(self._list_public_object_properties(
590
                user, account, container, prefix, all_props))
591
            objects = list(objects)
592

    
593
            objects.sort(key=lambda x: x[0])
594
            start, limit = self._list_limits(
595
                [x[0] for x in objects], marker, limit)
596
            return objects[start:start + limit]
597
        elif public:
598
            objects = self._list_public_object_properties(
599
                user, account, container, prefix, all_props)
600
            start, limit = self._list_limits(
601
                [x[0] for x in objects], marker, limit)
602
            return objects[start:start + limit]
603

    
604
        allowed = self._list_object_permissions(
605
            user, account, container, prefix, shared, public)
606
        if shared and not allowed:
607
            return []
608
        path, node = self._lookup_container(account, container)
609
        allowed = self._get_formatted_paths(allowed)
610
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
611
        start, limit = self._list_limits(
612
            [x[0] for x in objects], marker, limit)
613
        return objects[start:start + limit]
614

    
615
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
616
        public = self._list_object_permissions(
617
            user, account, container, prefix, shared=False, public=True)
618
        paths, nodes = self._lookup_objects(public)
619
        path = '/'.join((account, container))
620
        cont_prefix = path + '/'
621
        paths = [x[len(cont_prefix):] for x in paths]
622
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
623
        objects = [(path,) + props for path, props in zip(paths, props)]
624
        return objects
625

    
626
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
627
        objects = []
628
        while True:
629
            marker = objects[-1] if objects else None
630
            limit = 10000
631
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
632
            objects.extend(l)
633
            if not l or len(l) < limit:
634
                break
635
        return objects
636

    
637
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
638
        allowed = []
639
        path = '/'.join((account, container, prefix)).rstrip('/')
640
        if user != account:
641
            allowed = self.permissions.access_list_paths(user, path)
642
            if not allowed:
643
                raise NotAllowedError
644
        else:
645
            allowed = set()
646
            if shared:
647
                allowed.update(self.permissions.access_list_shared(path))
648
            if public:
649
                allowed.update(
650
                    [x[0] for x in self.permissions.public_list(path)])
651
            allowed = sorted(allowed)
652
            if not allowed:
653
                return []
654
        return allowed
655

    
656
    @backend_method
657
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
658
        """Return a list of object (name, version_id) tuples existing under a container."""
659

    
660
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
661
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
662

    
663
    @backend_method
664
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
665
        """Return a list of object metadata dicts existing under a container."""
666

    
667
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
668
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
669
        objects = []
670
        for p in props:
671
            if len(p) == 2:
672
                objects.append({'subdir': p[0]})
673
            else:
674
                objects.append({'name': p[0],
675
                                'bytes': p[self.SIZE + 1],
676
                                'type': p[self.TYPE + 1],
677
                                'hash': p[self.HASH + 1],
678
                                'version': p[self.SERIAL + 1],
679
                                'version_timestamp': p[self.MTIME + 1],
680
                                'modified': p[self.MTIME + 1] if until is None else None,
681
                                'modified_by': p[self.MUSER + 1],
682
                                'uuid': p[self.UUID + 1],
683
                                'checksum': p[self.CHECKSUM + 1]})
684
        return objects
685

    
686
    @backend_method
687
    def list_object_permissions(self, user, account, container, prefix=''):
688
        """Return a list of paths that enforce permissions under a container."""
689

    
690
        logger.debug("list_object_permissions: %s %s %s %s", user,
691
                     account, container, prefix)
692
        return self._list_object_permissions(user, account, container, prefix, True, False)
693

    
694
    @backend_method
695
    def list_object_public(self, user, account, container, prefix=''):
696
        """Return a dict mapping paths to public ids for objects that are public under a container."""
697

    
698
        logger.debug("list_object_public: %s %s %s %s", user,
699
                     account, container, prefix)
700
        public = {}
701
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
702
            public[path] = p + ULTIMATE_ANSWER
703
        return public
704

    
705
    @backend_method
706
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
707
        """Return a dictionary with the object metadata for the domain."""
708

    
709
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
710
                     account, container, name, domain, version)
711
        self._can_read(user, account, container, name)
712
        path, node = self._lookup_object(account, container, name)
713
        props = self._get_version(node, version)
714
        if version is None:
715
            modified = props[self.MTIME]
716
        else:
717
            try:
718
                modified = self._get_version(
719
                    node)[self.MTIME]  # Overall last modification.
720
            except NameError:  # Object may be deleted.
721
                del_props = self.node.version_lookup(
722
                    node, inf, CLUSTER_DELETED)
723
                if del_props is None:
724
                    raise ItemNotExists('Object does not exist')
725
                modified = del_props[self.MTIME]
726

    
727
        meta = {}
728
        if include_user_defined:
729
            meta.update(
730
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
731
        meta.update({'name': name,
732
                     'bytes': props[self.SIZE],
733
                     'type': props[self.TYPE],
734
                     'hash': props[self.HASH],
735
                     'version': props[self.SERIAL],
736
                     'version_timestamp': props[self.MTIME],
737
                     'modified': modified,
738
                     'modified_by': props[self.MUSER],
739
                     'uuid': props[self.UUID],
740
                     'checksum': props[self.CHECKSUM]})
741
        return meta
742

    
743
    @backend_method
744
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
745
        """Update the metadata associated with the object for the domain and return the new version."""
746

    
747
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
748
                     user, account, container, name, domain, meta, replace)
749
        self._can_write(user, account, container, name)
750
        path, node = self._lookup_object(account, container, name)
751
        src_version_id, dest_version_id = self._put_metadata(
752
            user, node, domain, meta, replace)
753
        self._apply_versioning(account, container, src_version_id)
754
        return dest_version_id
755

    
756
    @backend_method
757
    def get_object_permissions(self, user, account, container, name):
758
        """Return the action allowed on the object, the path
759
        from which the object gets its permissions from,
760
        along with a dictionary containing the permissions."""
761

    
762
        logger.debug("get_object_permissions: %s %s %s %s", user,
763
                     account, container, name)
764
        allowed = 'write'
765
        permissions_path = self._get_permissions_path(account, container, name)
766
        if user != account:
767
            if self.permissions.access_check(permissions_path, self.WRITE, user):
768
                allowed = 'write'
769
            elif self.permissions.access_check(permissions_path, self.READ, user):
770
                allowed = 'read'
771
            else:
772
                raise NotAllowedError
773
        self._lookup_object(account, container, name)
774
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
775

    
776
    @backend_method
777
    def update_object_permissions(self, user, account, container, name, permissions):
778
        """Update the permissions associated with the object."""
779

    
780
        logger.debug("update_object_permissions: %s %s %s %s %s",
781
                     user, account, container, name, permissions)
782
        if user != account:
783
            raise NotAllowedError
784
        path = self._lookup_object(account, container, name)[0]
785
        self._check_permissions(path, permissions)
786
        self.permissions.access_set(path, permissions)
787
        self._report_sharing_change(user, account, path, {'members':
788
                                    self.permissions.access_members(path)})
789

    
790
    @backend_method
791
    def get_object_public(self, user, account, container, name):
792
        """Return the public id of the object if applicable."""
793

    
794
        logger.debug(
795
            "get_object_public: %s %s %s %s", user, account, container, name)
796
        self._can_read(user, account, container, name)
797
        path = self._lookup_object(account, container, name)[0]
798
        p = self.permissions.public_get(path)
799
        if p is not None:
800
            p += ULTIMATE_ANSWER
801
        return p
802

    
803
    @backend_method
804
    def update_object_public(self, user, account, container, name, public):
805
        """Update the public status of the object."""
806

    
807
        logger.debug("update_object_public: %s %s %s %s %s", user,
808
                     account, container, name, public)
809
        self._can_write(user, account, container, name)
810
        path = self._lookup_object(account, container, name)[0]
811
        if not public:
812
            self.permissions.public_unset(path)
813
        else:
814
            self.permissions.public_set(path)
815

    
816
    @backend_method
817
    def get_object_hashmap(self, user, account, container, name, version=None):
818
        """Return the object's size and a list with partial hashes."""
819

    
820
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
821
                     account, container, name, version)
822
        self._can_read(user, account, container, name)
823
        path, node = self._lookup_object(account, container, name)
824
        props = self._get_version(node, version)
825
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
826
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
827

    
828
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
829
        if permissions is not None and user != account:
830
            raise NotAllowedError
831
        self._can_write(user, account, container, name)
832
        if permissions is not None:
833
            path = '/'.join((account, container, name))
834
            self._check_permissions(path, permissions)
835

    
836
        account_path, account_node = self._lookup_account(account, True)
837
        container_path, container_node = self._lookup_container(
838
            account, container)
839
        path, node = self._put_object_node(
840
            container_path, container_node, name)
841
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
842

    
843
        # Handle meta.
844
        if src_version_id is None:
845
            src_version_id = pre_version_id
846
        self._put_metadata_duplicate(
847
            src_version_id, dest_version_id, domain, meta, replace_meta)
848

    
849
        del_size = self._apply_versioning(account, container, pre_version_id)
850
        size_delta = size - del_size
851
        if not self.using_external_quotaholder: # Check account quota.
852
            if size_delta > 0:
853
                account_quota = long(self._get_policy(account_node)['quota'])
854
                account_usage = self._get_statistics(account_node)[1] + size_delta
855
                if (account_quota > 0 and account_usage > account_quota):
856
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
857
                        account_quota, account_usage
858
                    ))
859

    
860
        # Check container quota.
861
        container_quota = long(self._get_policy(container_node)['quota'])
862
        container_usage = self._get_statistics(container_node)[1] + size_delta
863
        if (container_quota > 0 and container_usage > container_quota):
864
            # This must be executed in a transaction, so the version is
865
            # never created if it fails.
866
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
867
                container_quota, container_usage
868
            ))
869

    
870
        self._report_size_change(user, account, size_delta,
871
                                 {'action': 'object update', 'path': path,
872
                                  'versions': ','.join([str(dest_version_id)])})
873
        if permissions is not None:
874
            self.permissions.access_set(path, permissions)
875
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
876

    
877
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
878
        return dest_version_id
879

    
880
    @backend_method
881
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
882
        """Create/update an object with the specified size and partial hashes."""
883

    
884
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
885
                     account, container, name, size, type, hashmap, checksum)
886
        if size == 0:  # No such thing as an empty hashmap.
887
            hashmap = [self.put_block('')]
888
        map = HashMap(self.block_size, self.hash_algorithm)
889
        map.extend([binascii.unhexlify(x) for x in hashmap])
890
        missing = self.store.block_search(map)
891
        if missing:
892
            ie = IndexError()
893
            ie.data = [binascii.hexlify(x) for x in missing]
894
            raise ie
895

    
896
        hash = map.hash()
897
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
898
        self.store.map_put(hash, map)
899
        return dest_version_id
900

    
901
    @backend_method
902
    def update_object_checksum(self, user, account, container, name, version, checksum):
903
        """Update an object's checksum."""
904

    
905
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
906
                     user, account, container, name, version, checksum)
907
        # Update objects with greater version and same hashmap and size (fix metadata updates).
908
        self._can_write(user, account, container, name)
909
        path, node = self._lookup_object(account, container, name)
910
        props = self._get_version(node, version)
911
        versions = self.node.node_get_versions(node)
912
        for x in versions:
913
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
914
                self.node.version_put_property(
915
                    x[self.SERIAL], 'checksum', checksum)
916

    
917
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
918
        dest_version_ids = []
919
        self._can_read(user, src_account, src_container, src_name)
920
        path, node = self._lookup_object(src_account, src_container, src_name)
921
        # TODO: Will do another fetch of the properties in duplicate version...
922
        props = self._get_version(
923
            node, src_version)  # Check to see if source exists.
924
        src_version_id = props[self.SERIAL]
925
        hash = props[self.HASH]
926
        size = props[self.SIZE]
927
        is_copy = not is_move and (src_account, src_container, src_name) != (
928
            dest_account, dest_container, dest_name)  # New uuid.
929
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
930
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
931
            self._delete_object(user, src_account, src_container, src_name)
932

    
933
        if delimiter:
934
            prefix = src_name + \
935
                delimiter if not src_name.endswith(delimiter) else src_name
936
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
937
            src_names.sort(key=lambda x: x[2])  # order by nodes
938
            paths = [elem[0] for elem in src_names]
939
            nodes = [elem[2] for elem in src_names]
940
            # TODO: Will do another fetch of the properties in duplicate version...
941
            props = self._get_versions(nodes)  # Check to see if source exists.
942

    
943
            for prop, path, node in zip(props, paths, nodes):
944
                src_version_id = prop[self.SERIAL]
945
                hash = prop[self.HASH]
946
                vtype = prop[self.TYPE]
947
                size = prop[self.SIZE]
948
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
949
                    delimiter) else dest_name
950
                vdest_name = path.replace(prefix, dest_prefix, 1)
951
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
952
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
953
                    self._delete_object(user, src_account, src_container, path)
954
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
955

    
956
    @backend_method
957
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
958
        """Copy an object's data and metadata."""
959

    
960
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
961
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
962
        return dest_version_id
963

    
964
    @backend_method
965
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
966
        """Move an object's data and metadata."""
967

    
968
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
969
        if user != src_account:
970
            raise NotAllowedError
971
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
972
        return dest_version_id
973

    
974
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
975
        if user != account:
976
            raise NotAllowedError
977

    
978
        if until is not None:
979
            path = '/'.join((account, container, name))
980
            node = self.node.node_lookup(path)
981
            if node is None:
982
                return
983
            hashes = []
984
            size = 0
985
            serials = []
986
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
987
            hashes += h
988
            size += s
989
            serials += v
990
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
991
            hashes += h
992
            size += s
993
            serials += v
994
            for h in hashes:
995
                self.store.map_delete(h)
996
            self.node.node_purge(node, until, CLUSTER_DELETED)
997
            try:
998
                props = self._get_version(node)
999
            except NameError:
1000
                self.permissions.access_clear(path)
1001
            self._report_size_change(user, account, -size,
1002
                                    {'action': 'object purge', 'path': path,
1003
                                     'versions': ','.join(str(i) for i in serials)})
1004
            return
1005

    
1006
        path, node = self._lookup_object(account, container, name)
1007
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1008
        del_size = self._apply_versioning(account, container, src_version_id)
1009
        self._report_size_change(user, account, -del_size,
1010
                                 {'action': 'object delete', 'path': path,
1011
                                  'versions': ','.join([str(dest_version_id)])})
1012
        self._report_object_change(
1013
            user, account, path, details={'action': 'object delete'})
1014
        self.permissions.access_clear(path)
1015

    
1016
        if delimiter:
1017
            prefix = name + delimiter if not name.endswith(delimiter) else name
1018
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1019
            paths = []
1020
            for t in src_names:
1021
                path = '/'.join((account, container, t[0]))
1022
                node = t[2]
1023
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1024
                del_size = self._apply_versioning(
1025
                    account, container, src_version_id)
1026
                self._report_size_change(user, account, -del_size,
1027
                                         {'action': 'object delete',
1028
                                          'path': path,
1029
                                          'versions': ','.join([str(dest_version_id)])})
1030
                self._report_object_change(
1031
                    user, account, path, details={'action': 'object delete'})
1032
                paths.append(path)
1033
            self.permissions.access_clear_bulk(paths)
1034

    
1035
    @backend_method
1036
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1037
        """Delete/purge an object."""
1038

    
1039
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1040
                     account, container, name, until, prefix, delimiter)
1041
        self._delete_object(user, account, container, name, until, delimiter)
1042

    
1043
    @backend_method
1044
    def list_versions(self, user, account, container, name):
1045
        """Return a list of all (version, version_timestamp) tuples for an object."""
1046

    
1047
        logger.debug(
1048
            "list_versions: %s %s %s %s", user, account, container, name)
1049
        self._can_read(user, account, container, name)
1050
        path, node = self._lookup_object(account, container, name)
1051
        versions = self.node.node_get_versions(node)
1052
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1053

    
1054
    @backend_method
1055
    def get_uuid(self, user, uuid):
1056
        """Return the (account, container, name) for the UUID given."""
1057

    
1058
        logger.debug("get_uuid: %s %s", user, uuid)
1059
        info = self.node.latest_uuid(uuid)
1060
        if info is None:
1061
            raise NameError
1062
        path, serial = info
1063
        account, container, name = path.split('/', 2)
1064
        self._can_read(user, account, container, name)
1065
        return (account, container, name)
1066

    
1067
    @backend_method
1068
    def get_public(self, user, public):
1069
        """Return the (account, container, name) for the public id given."""
1070

    
1071
        logger.debug("get_public: %s %s", user, public)
1072
        if public is None or public < ULTIMATE_ANSWER:
1073
            raise NameError
1074
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1075
        if path is None:
1076
            raise NameError
1077
        account, container, name = path.split('/', 2)
1078
        self._can_read(user, account, container, name)
1079
        return (account, container, name)
1080

    
1081
    @backend_method(autocommit=0)
1082
    def get_block(self, hash):
1083
        """Return a block's data."""
1084

    
1085
        logger.debug("get_block: %s", hash)
1086
        block = self.store.block_get(binascii.unhexlify(hash))
1087
        if not block:
1088
            raise ItemNotExists('Block does not exist')
1089
        return block
1090

    
1091
    @backend_method(autocommit=0)
1092
    def put_block(self, data):
1093
        """Store a block and return the hash."""
1094

    
1095
        logger.debug("put_block: %s", len(data))
1096
        return binascii.hexlify(self.store.block_put(data))
1097

    
1098
    @backend_method(autocommit=0)
1099
    def update_block(self, hash, data, offset=0):
1100
        """Update a known block and return the hash."""
1101

    
1102
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1103
        if offset == 0 and len(data) == self.block_size:
1104
            return self.put_block(data)
1105
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1106
        return binascii.hexlify(h)
1107

    
1108
    # Path functions.
1109

    
1110
    def _generate_uuid(self):
1111
        return str(uuidlib.uuid4())
1112

    
1113
    def _put_object_node(self, path, parent, name):
1114
        path = '/'.join((path, name))
1115
        node = self.node.node_lookup(path)
1116
        if node is None:
1117
            node = self.node.node_create(parent, path)
1118
        return path, node
1119

    
1120
    def _put_path(self, user, parent, path):
1121
        node = self.node.node_create(parent, path)
1122
        self.node.version_create(node, None, 0, '', None, user,
1123
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1124
        return node
1125

    
1126
    def _lookup_account(self, account, create=True):
1127
        node = self.node.node_lookup(account)
1128
        if node is None and create:
1129
            node = self._put_path(
1130
                account, self.ROOTNODE, account)  # User is account.
1131
        return account, node
1132

    
1133
    def _lookup_container(self, account, container):
1134
        path = '/'.join((account, container))
1135
        node = self.node.node_lookup(path)
1136
        if node is None:
1137
            raise ItemNotExists('Container does not exist')
1138
        return path, node
1139

    
1140
    def _lookup_object(self, account, container, name):
1141
        path = '/'.join((account, container, name))
1142
        node = self.node.node_lookup(path)
1143
        if node is None:
1144
            raise ItemNotExists('Object does not exist')
1145
        return path, node
1146

    
1147
    def _lookup_objects(self, paths):
1148
        nodes = self.node.node_lookup_bulk(paths)
1149
        return paths, nodes
1150

    
1151
    def _get_properties(self, node, until=None):
1152
        """Return properties until the timestamp given."""
1153

    
1154
        before = until if until is not None else inf
1155
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1156
        if props is None and until is not None:
1157
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1158
        if props is None:
1159
            raise ItemNotExists('Path does not exist')
1160
        return props
1161

    
1162
    def _get_statistics(self, node, until=None):
1163
        """Return count, sum of size and latest timestamp of everything under node."""
1164

    
1165
        if until is None:
1166
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1167
        else:
1168
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1169
        if stats is None:
1170
            stats = (0, 0, 0)
1171
        return stats
1172

    
1173
    def _get_version(self, node, version=None):
1174
        if version is None:
1175
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1176
            if props is None:
1177
                raise ItemNotExists('Object does not exist')
1178
        else:
1179
            try:
1180
                version = int(version)
1181
            except ValueError:
1182
                raise VersionNotExists('Version does not exist')
1183
            props = self.node.version_get_properties(version)
1184
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1185
                raise VersionNotExists('Version does not exist')
1186
        return props
1187

    
1188
    def _get_versions(self, nodes):
1189
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1190

    
1191
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1192
        """Create a new version of the node."""
1193

    
1194
        props = self.node.version_lookup(
1195
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1196
        if props is not None:
1197
            src_version_id = props[self.SERIAL]
1198
            src_hash = props[self.HASH]
1199
            src_size = props[self.SIZE]
1200
            src_type = props[self.TYPE]
1201
            src_checksum = props[self.CHECKSUM]
1202
        else:
1203
            src_version_id = None
1204
            src_hash = None
1205
            src_size = 0
1206
            src_type = ''
1207
            src_checksum = ''
1208
        if size is None:  # Set metadata.
1209
            hash = src_hash  # This way hash can be set to None (account or container).
1210
            size = src_size
1211
        if type is None:
1212
            type = src_type
1213
        if checksum is None:
1214
            checksum = src_checksum
1215
        uuid = self._generate_uuid(
1216
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1217

    
1218
        if src_node is None:
1219
            pre_version_id = src_version_id
1220
        else:
1221
            pre_version_id = None
1222
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1223
            if props is not None:
1224
                pre_version_id = props[self.SERIAL]
1225
        if pre_version_id is not None:
1226
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1227

    
1228
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1229
        return pre_version_id, dest_version_id
1230

    
1231
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1232
        if src_version_id is not None:
1233
            self.node.attribute_copy(src_version_id, dest_version_id)
1234
        if not replace:
1235
            self.node.attribute_del(dest_version_id, domain, (
1236
                k for k, v in meta.iteritems() if v == ''))
1237
            self.node.attribute_set(dest_version_id, domain, (
1238
                (k, v) for k, v in meta.iteritems() if v != ''))
1239
        else:
1240
            self.node.attribute_del(dest_version_id, domain)
1241
            self.node.attribute_set(dest_version_id, domain, ((
1242
                k, v) for k, v in meta.iteritems()))
1243

    
1244
    def _put_metadata(self, user, node, domain, meta, replace=False):
1245
        """Create a new version and store metadata."""
1246

    
1247
        src_version_id, dest_version_id = self._put_version_duplicate(
1248
            user, node)
1249
        self._put_metadata_duplicate(
1250
            src_version_id, dest_version_id, domain, meta, replace)
1251
        return src_version_id, dest_version_id
1252

    
1253
    def _list_limits(self, listing, marker, limit):
1254
        start = 0
1255
        if marker:
1256
            try:
1257
                start = listing.index(marker) + 1
1258
            except ValueError:
1259
                pass
1260
        if not limit or limit > 10000:
1261
            limit = 10000
1262
        return start, limit
1263

    
1264
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1265
        cont_prefix = path + '/'
1266
        prefix = cont_prefix + prefix
1267
        start = cont_prefix + marker if marker else None
1268
        before = until if until is not None else inf
1269
        filterq = keys if domain else []
1270
        sizeq = size_range
1271

    
1272
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1273
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1274
        objects.sort(key=lambda x: x[0])
1275
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1276
        return objects
1277

    
1278
    # Reporting functions.
1279

    
1280
    def _report_size_change(self, user, account, size, details={}):
1281
        if size == 0:
1282
            return
1283

    
1284
        account_node = self._lookup_account(account, True)[1]
1285
        total = self._get_statistics(account_node)[1]
1286
        details.update({'user': user, 'total': total})
1287
        logger.debug(
1288
            "_report_size_change: %s %s %s %s", user, account, size, details)
1289
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1290
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1291
                              float(size), details))
1292

    
1293
        if not self.using_external_quotaholder:
1294
            return
1295

    
1296
        try:
1297
            serial = self.quotaholder.issue_commission(
1298
                    context     =   {},
1299
                    target      =   account,
1300
                    key         =   '1',
1301
                    clientkey   =   'pithos',
1302
                    ownerkey    =   '',
1303
                    name        =   details['path'] if 'path' in details else '',
1304
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1305
            )
1306
        except BaseException, e:
1307
            raise QuotaError(e)
1308
        else:
1309
            self.serials.append(serial)
1310

    
1311
    def _report_object_change(self, user, account, path, details={}):
1312
        details.update({'user': user})
1313
        logger.debug("_report_object_change: %s %s %s %s", user,
1314
                     account, path, details)
1315
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1316
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1317

    
1318
    def _report_sharing_change(self, user, account, path, details={}):
1319
        logger.debug("_report_permissions_change: %s %s %s %s",
1320
                     user, account, path, details)
1321
        details.update({'user': user})
1322
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1323
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1324

    
1325
    # Policy functions.
1326

    
1327
    def _check_policy(self, policy):
1328
        for k in policy.keys():
1329
            if policy[k] == '':
1330
                policy[k] = self.default_policy.get(k)
1331
        for k, v in policy.iteritems():
1332
            if k == 'quota':
1333
                q = int(v)  # May raise ValueError.
1334
                if q < 0:
1335
                    raise ValueError
1336
            elif k == 'versioning':
1337
                if v not in ['auto', 'none']:
1338
                    raise ValueError
1339
            else:
1340
                raise ValueError
1341

    
1342
    def _put_policy(self, node, policy, replace):
1343
        if replace:
1344
            for k, v in self.default_policy.iteritems():
1345
                if k not in policy:
1346
                    policy[k] = v
1347
        self.node.policy_set(node, policy)
1348

    
1349
    def _get_policy(self, node):
1350
        policy = self.default_policy.copy()
1351
        policy.update(self.node.policy_get(node))
1352
        return policy
1353

    
1354
    def _apply_versioning(self, account, container, version_id):
1355
        """Delete the provided version if such is the policy.
1356
           Return size of object removed.
1357
        """
1358

    
1359
        if version_id is None:
1360
            return 0
1361
        path, node = self._lookup_container(account, container)
1362
        versioning = self._get_policy(node)['versioning']
1363
        if versioning != 'auto':
1364
            hash, size = self.node.version_remove(version_id)
1365
            self.store.map_delete(hash)
1366
            return size
1367
        elif self.free_versioning:
1368
            return self.node.version_get_properties(
1369
                version_id, keys=('size',))[0]
1370
        return 0
1371

    
1372
    # Access control functions.
1373

    
1374
    def _check_groups(self, groups):
1375
        # raise ValueError('Bad characters in groups')
1376
        pass
1377

    
1378
    def _check_permissions(self, path, permissions):
1379
        # raise ValueError('Bad characters in permissions')
1380
        pass
1381

    
1382
    def _get_formatted_paths(self, paths):
1383
        formatted = []
1384
        for p in paths:
1385
            node = self.node.node_lookup(p)
1386
            props = None
1387
            if node is not None:
1388
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1389
            if props is not None:
1390
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1391
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1392
                formatted.append((p, self.MATCH_EXACT))
1393
        return formatted
1394

    
1395
    def _get_permissions_path(self, account, container, name):
1396
        path = '/'.join((account, container, name))
1397
        permission_paths = self.permissions.access_inherit(path)
1398
        permission_paths.sort()
1399
        permission_paths.reverse()
1400
        for p in permission_paths:
1401
            if p == path:
1402
                return p
1403
            else:
1404
                if p.count('/') < 2:
1405
                    continue
1406
                node = self.node.node_lookup(p)
1407
                props = None
1408
                if node is not None:
1409
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1410
                if props is not None:
1411
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1412
                        return p
1413
        return None
1414

    
1415
    def _can_read(self, user, account, container, name):
1416
        if user == account:
1417
            return True
1418
        path = '/'.join((account, container, name))
1419
        if self.permissions.public_get(path) is not None:
1420
            return True
1421
        path = self._get_permissions_path(account, container, name)
1422
        if not path:
1423
            raise NotAllowedError
1424
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1425
            raise NotAllowedError
1426

    
1427
    def _can_write(self, user, account, container, name):
1428
        if user == account:
1429
            return True
1430
        path = '/'.join((account, container, name))
1431
        path = self._get_permissions_path(account, container, name)
1432
        if not path:
1433
            raise NotAllowedError
1434
        if not self.permissions.access_check(path, self.WRITE, user):
1435
            raise NotAllowedError
1436

    
1437
    def _allowed_accounts(self, user):
1438
        allow = set()
1439
        for path in self.permissions.access_list_paths(user):
1440
            allow.add(path.split('/', 1)[0])
1441
        return sorted(allow)
1442

    
1443
    def _allowed_containers(self, user, account):
1444
        allow = set()
1445
        for path in self.permissions.access_list_paths(user, account):
1446
            allow.add(path.split('/', 2)[1])
1447
        return sorted(allow)