Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 4cfccdd2

History | View | Annotate | Download (58.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
83
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
84

    
85
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
86
QUEUE_CLIENT_ID = 'pithos'
87
QUEUE_INSTANCE_ID = '1'
88

    
89
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
90

    
91
inf = float('inf')
92

    
93
ULTIMATE_ANSWER = 42
94

    
95

    
96
logger = logging.getLogger(__name__)
97

    
98

    
99
def backend_method(func=None, autocommit=1):
100
    if func is None:
101
        def fn(func):
102
            return backend_method(func, autocommit)
103
        return fn
104

    
105
    if not autocommit:
106
        return func
107

    
108
    def fn(self, *args, **kw):
109
        self.wrapper.execute()
110
        try:
111
            self.messages = []
112
            ret = func(self, *args, **kw)
113
            for m in self.messages:
114
                self.queue.send(*m)
115
            self.wrapper.commit()
116
            return ret
117
        except:
118
            self.wrapper.rollback()
119
            raise
120
    return fn
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 queue_module=None, queue_hosts=None,
132
                 queue_exchange=None):
133
        db_module = db_module or DEFAULT_DB_MODULE
134
        db_connection = db_connection or DEFAULT_DB_CONNECTION
135
        block_module = block_module or DEFAULT_BLOCK_MODULE
136
        block_path = block_path or DEFAULT_BLOCK_PATH
137
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
138
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
139
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
140
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
141
                
142
        self.hash_algorithm = 'sha256'
143
        self.block_size = 4 * 1024 * 1024  # 4MB
144

    
145
        self.default_policy = {'quota': DEFAULT_QUOTA,
146
                               'versioning': DEFAULT_VERSIONING}
147

    
148
        def load_module(m):
149
            __import__(m)
150
            return sys.modules[m]
151

    
152
        self.db_module = load_module(db_module)
153
        self.wrapper = self.db_module.DBWrapper(db_connection)
154
        params = {'wrapper': self.wrapper}
155
        self.permissions = self.db_module.Permissions(**params)
156
        self.config = self.db_module.Config(**params)
157
        self.quotaholder_sync = self.db_module.QuotaholderSync(**params)
158
        for x in ['READ', 'WRITE']:
159
            setattr(self, x, getattr(self.db_module, x))
160
        self.node = self.db_module.Node(**params)
161
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
162
            setattr(self, x, getattr(self.db_module, x))
163

    
164
        self.block_module = load_module(block_module)
165
        params = {'path': block_path,
166
                  'block_size': self.block_size,
167
                  'hash_algorithm': self.hash_algorithm,
168
                  'umask': block_umask}
169
        self.store = self.block_module.Store(**params)
170

    
171
        if queue_module and queue_hosts:
172
            self.queue_module = load_module(queue_module)
173
            params = {'hosts': queue_hosts,
174
                              'exchange': queue_exchange,
175
                      'client_id': QUEUE_CLIENT_ID}
176
            self.queue = self.queue_module.Queue(**params)
177
        else:
178
            class NoQueue:
179
                def send(self, *args):
180
                    pass
181

    
182
                def close(self):
183
                    pass
184

    
185
            self.queue = NoQueue()
186

    
187
    def close(self):
188
        self.wrapper.close()
189
        self.queue.close()
190

    
191
    @backend_method
192
    def list_accounts(self, user, marker=None, limit=10000):
193
        """Return a list of accounts the user can access."""
194

    
195
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
196
        allowed = self._allowed_accounts(user)
197
        start, limit = self._list_limits(allowed, marker, limit)
198
        return allowed[start:start + limit]
199

    
200
    @backend_method
201
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
202
        """Return a dictionary with the account metadata for the domain."""
203

    
204
        logger.debug(
205
            "get_account_meta: %s %s %s %s", user, account, domain, until)
206
        path, node = self._lookup_account(account, user == account)
207
        if user != account:
208
            if until or node is None or account not in self._allowed_accounts(user):
209
                raise NotAllowedError
210
        try:
211
            props = self._get_properties(node, until)
212
            mtime = props[self.MTIME]
213
        except NameError:
214
            props = None
215
            mtime = until
216
        count, bytes, tstamp = self._get_statistics(node, until)
217
        tstamp = max(tstamp, mtime)
218
        if until is None:
219
            modified = tstamp
220
        else:
221
            modified = self._get_statistics(
222
                node)[2]  # Overall last modification.
223
            modified = max(modified, mtime)
224

    
225
        if user != account:
226
            meta = {'name': account}
227
        else:
228
            meta = {}
229
            if props is not None and include_user_defined:
230
                meta.update(
231
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
232
            if until is not None:
233
                meta.update({'until_timestamp': tstamp})
234
            meta.update({'name': account, 'count': count, 'bytes': bytes})
235
        meta.update({'modified': modified})
236
        return meta
237

    
238
    @backend_method
239
    def update_account_meta(self, user, account, domain, meta, replace=False):
240
        """Update the metadata associated with the account for the domain."""
241

    
242
        logger.debug("update_account_meta: %s %s %s %s %s", user,
243
                     account, domain, meta, replace)
244
        if user != account:
245
            raise NotAllowedError
246
        path, node = self._lookup_account(account, True)
247
        self._put_metadata(user, node, domain, meta, replace)
248

    
249
    @backend_method
250
    def get_account_groups(self, user, account):
251
        """Return a dictionary with the user groups defined for this account."""
252

    
253
        logger.debug("get_account_groups: %s %s", user, account)
254
        if user != account:
255
            if account not in self._allowed_accounts(user):
256
                raise NotAllowedError
257
            return {}
258
        self._lookup_account(account, True)
259
        return self.permissions.group_dict(account)
260

    
261
    @backend_method
262
    def update_account_groups(self, user, account, groups, replace=False):
263
        """Update the groups associated with the account."""
264

    
265
        logger.debug("update_account_groups: %s %s %s %s", user,
266
                     account, groups, replace)
267
        if user != account:
268
            raise NotAllowedError
269
        self._lookup_account(account, True)
270
        self._check_groups(groups)
271
        if replace:
272
            self.permissions.group_destroy(account)
273
        for k, v in groups.iteritems():
274
            if not replace:  # If not already deleted.
275
                self.permissions.group_delete(account, k)
276
            if v:
277
                self.permissions.group_addmany(account, k, v)
278

    
279
    @backend_method
280
    def get_account_policy(self, user, account):
281
        """Return a dictionary with the account policy."""
282

    
283
        logger.debug("get_account_policy: %s %s", user, account)
284
        if user != account:
285
            if account not in self._allowed_accounts(user):
286
                raise NotAllowedError
287
            return {}
288
        path, node = self._lookup_account(account, True)
289
        return self._get_policy(node)
290

    
291
    @backend_method
292
    def update_account_policy(self, user, account, policy, replace=False):
293
        """Update the policy associated with the account."""
294

    
295
        logger.debug("update_account_policy: %s %s %s %s", user,
296
                     account, policy, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, node = self._lookup_account(account, True)
300
        self._check_policy(policy)
301
        self._put_policy(node, policy, replace)
302

    
303
    @backend_method
304
    def put_account(self, user, account, policy={}):
305
        """Create a new account with the given name."""
306

    
307
        logger.debug("put_account: %s %s %s", user, account, policy)
308
        if user != account:
309
            raise NotAllowedError
310
        node = self.node.node_lookup(account)
311
        if node is not None:
312
            raise AccountExists('Account already exists')
313
        if policy:
314
            self._check_policy(policy)
315
        node = self._put_path(user, self.ROOTNODE, account)
316
        self._put_policy(node, policy, True)
317

    
318
    @backend_method
319
    def delete_account(self, user, account):
320
        """Delete the account with the given name."""
321

    
322
        logger.debug("delete_account: %s %s", user, account)
323
        if user != account:
324
            raise NotAllowedError
325
        node = self.node.node_lookup(account)
326
        if node is None:
327
            return
328
        if not self.node.node_remove(node):
329
            raise AccountNotEmpty('Account is not empty')
330
        self.permissions.group_destroy(account)
331

    
332
    @backend_method
333
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
334
        """Return a list of containers existing under an account."""
335

    
336
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
337
                     account, marker, limit, shared, until, public)
338
        if user != account:
339
            if until or account not in self._allowed_accounts(user):
340
                raise NotAllowedError
341
            allowed = self._allowed_containers(user, account)
342
            start, limit = self._list_limits(allowed, marker, limit)
343
            return allowed[start:start + limit]
344
        if shared or public:
345
            allowed = set()
346
            if shared:
347
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
348
            if public:
349
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
350
            allowed = sorted(allowed)
351
            start, limit = self._list_limits(allowed, marker, limit)
352
            return allowed[start:start + limit]
353
        node = self.node.node_lookup(account)
354
        containers = [x[0] for x in self._list_object_properties(
355
            node, account, '', '/', marker, limit, False, None, [], until)]
356
        start, limit = self._list_limits(
357
            [x[0] for x in containers], marker, limit)
358
        return containers[start:start + limit]
359

    
360
    @backend_method
361
    def list_container_meta(self, user, account, container, domain, until=None):
362
        """Return a list with all the container's object meta keys for the domain."""
363

    
364
        logger.debug("list_container_meta: %s %s %s %s %s", user,
365
                     account, container, domain, until)
366
        allowed = []
367
        if user != account:
368
            if until:
369
                raise NotAllowedError
370
            allowed = self.permissions.access_list_paths(
371
                user, '/'.join((account, container)))
372
            if not allowed:
373
                raise NotAllowedError
374
        path, node = self._lookup_container(account, container)
375
        before = until if until is not None else inf
376
        allowed = self._get_formatted_paths(allowed)
377
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
378

    
379
    @backend_method
380
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
381
        """Return a dictionary with the container metadata for the domain."""
382

    
383
        logger.debug("get_container_meta: %s %s %s %s %s", user,
384
                     account, container, domain, until)
385
        if user != account:
386
            if until or container not in self._allowed_containers(user, account):
387
                raise NotAllowedError
388
        path, node = self._lookup_container(account, container)
389
        props = self._get_properties(node, until)
390
        mtime = props[self.MTIME]
391
        count, bytes, tstamp = self._get_statistics(node, until)
392
        tstamp = max(tstamp, mtime)
393
        if until is None:
394
            modified = tstamp
395
        else:
396
            modified = self._get_statistics(
397
                node)[2]  # Overall last modification.
398
            modified = max(modified, mtime)
399

    
400
        if user != account:
401
            meta = {'name': container}
402
        else:
403
            meta = {}
404
            if include_user_defined:
405
                meta.update(
406
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
407
            if until is not None:
408
                meta.update({'until_timestamp': tstamp})
409
            meta.update({'name': container, 'count': count, 'bytes': bytes})
410
        meta.update({'modified': modified})
411
        return meta
412

    
413
    @backend_method
414
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
415
        """Update the metadata associated with the container for the domain."""
416

    
417
        logger.debug("update_container_meta: %s %s %s %s %s %s",
418
                     user, account, container, domain, meta, replace)
419
        if user != account:
420
            raise NotAllowedError
421
        path, node = self._lookup_container(account, container)
422
        src_version_id, dest_version_id = self._put_metadata(
423
            user, node, domain, meta, replace)
424
        if src_version_id is not None:
425
            versioning = self._get_policy(node)['versioning']
426
            if versioning != 'auto':
427
                self.node.version_remove(src_version_id)
428

    
429
    @backend_method
430
    def get_container_policy(self, user, account, container):
431
        """Return a dictionary with the container policy."""
432

    
433
        logger.debug(
434
            "get_container_policy: %s %s %s", user, account, container)
435
        if user != account:
436
            if container not in self._allowed_containers(user, account):
437
                raise NotAllowedError
438
            return {}
439
        path, node = self._lookup_container(account, container)
440
        return self._get_policy(node)
441

    
442
    @backend_method
443
    def update_container_policy(self, user, account, container, policy, replace=False):
444
        """Update the policy associated with the container."""
445

    
446
        logger.debug("update_container_policy: %s %s %s %s %s",
447
                     user, account, container, policy, replace)
448
        if user != account:
449
            raise NotAllowedError
450
        path, node = self._lookup_container(account, container)
451
        self._check_policy(policy)
452
        self._put_policy(node, policy, replace)
453

    
454
    @backend_method
455
    def put_container(self, user, account, container, policy={}):
456
        """Create a new container with the given name."""
457

    
458
        logger.debug(
459
            "put_container: %s %s %s %s", user, account, container, policy)
460
        if user != account:
461
            raise NotAllowedError
462
        try:
463
            path, node = self._lookup_container(account, container)
464
        except NameError:
465
            pass
466
        else:
467
            raise ContainerExists('Container already exists')
468
        if policy:
469
            self._check_policy(policy)
470
        path = '/'.join((account, container))
471
        node = self._put_path(
472
            user, self._lookup_account(account, True)[1], path)
473
        self._put_policy(node, policy, True)
474

    
475
    @backend_method
476
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
477
        """Delete/purge the container with the given name."""
478

    
479
        logger.debug("delete_container: %s %s %s %s %s %s", user,
480
                     account, container, until, prefix, delimiter)
481
        if user != account:
482
            raise NotAllowedError
483
        path, node = self._lookup_container(account, container)
484

    
485
        if until is not None:
486
            hashes, size = self.node.node_purge_children(
487
                node, until, CLUSTER_HISTORY)
488
            for h in hashes:
489
                self.store.map_delete(h)
490
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
491
            self._report_size_change(user, account, -size, {'action':
492
                                     'container purge', 'path': path})
493
            return
494

    
495
        if not delimiter:
496
            if self._get_statistics(node)[0] > 0:
497
                raise ContainerNotEmpty('Container is not empty')
498
            hashes, size = self.node.node_purge_children(
499
                node, inf, CLUSTER_HISTORY)
500
            for h in hashes:
501
                self.store.map_delete(h)
502
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
503
            self.node.node_remove(node)
504
            self._report_size_change(user, account, -size, {'action':
505
                                     'container delete', 'path': path})
506
        else:
507
                # remove only contents
508
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
509
            paths = []
510
            for t in src_names:
511
                path = '/'.join((account, container, t[0]))
512
                node = t[2]
513
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
514
                del_size = self._apply_versioning(
515
                    account, container, src_version_id)
516
                if del_size:
517
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
518
                self._report_object_change(
519
                    user, account, path, details={'action': 'object delete'})
520
                paths.append(path)
521
            self.permissions.access_clear_bulk(paths)
522

    
523
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
524
        if user != account and until:
525
            raise NotAllowedError
526
        if shared and public:
527
            # get shared first
528
            shared = self._list_object_permissions(
529
                user, account, container, prefix, shared=True, public=False)
530
            objects = set()
531
            if shared:
532
                path, node = self._lookup_container(account, container)
533
                shared = self._get_formatted_paths(shared)
534
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
535

    
536
            # get public
537
            objects |= set(self._list_public_object_properties(
538
                user, account, container, prefix, all_props))
539
            objects = list(objects)
540

    
541
            objects.sort(key=lambda x: x[0])
542
            start, limit = self._list_limits(
543
                [x[0] for x in objects], marker, limit)
544
            return objects[start:start + limit]
545
        elif public:
546
            objects = self._list_public_object_properties(
547
                user, account, container, prefix, all_props)
548
            start, limit = self._list_limits(
549
                [x[0] for x in objects], marker, limit)
550
            return objects[start:start + limit]
551

    
552
        allowed = self._list_object_permissions(
553
            user, account, container, prefix, shared, public)
554
        if shared and not allowed:
555
            return []
556
        path, node = self._lookup_container(account, container)
557
        allowed = self._get_formatted_paths(allowed)
558
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
559
        start, limit = self._list_limits(
560
            [x[0] for x in objects], marker, limit)
561
        return objects[start:start + limit]
562

    
563
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
564
        public = self._list_object_permissions(
565
            user, account, container, prefix, shared=False, public=True)
566
        paths, nodes = self._lookup_objects(public)
567
        path = '/'.join((account, container))
568
        cont_prefix = path + '/'
569
        paths = [x[len(cont_prefix):] for x in paths]
570
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
571
        objects = [(path,) + props for path, props in zip(paths, props)]
572
        return objects
573

    
574
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
575
        objects = []
576
        while True:
577
            marker = objects[-1] if objects else None
578
            limit = 10000
579
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
580
            objects.extend(l)
581
            if not l or len(l) < limit:
582
                break
583
        return objects
584

    
585
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
586
        allowed = []
587
        path = '/'.join((account, container, prefix)).rstrip('/')
588
        if user != account:
589
            allowed = self.permissions.access_list_paths(user, path)
590
            if not allowed:
591
                raise NotAllowedError
592
        else:
593
            allowed = set()
594
            if shared:
595
                allowed.update(self.permissions.access_list_shared(path))
596
            if public:
597
                allowed.update(
598
                    [x[0] for x in self.permissions.public_list(path)])
599
            allowed = sorted(allowed)
600
            if not allowed:
601
                return []
602
        return allowed
603

    
604
    @backend_method
605
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
606
        """Return a list of object (name, version_id) tuples existing under a container."""
607

    
608
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
609
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
610

    
611
    @backend_method
612
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
613
        """Return a list of object metadata dicts existing under a container."""
614

    
615
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
616
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
617
        objects = []
618
        for p in props:
619
            if len(p) == 2:
620
                objects.append({'subdir': p[0]})
621
            else:
622
                objects.append({'name': p[0],
623
                                'bytes': p[self.SIZE + 1],
624
                                'type': p[self.TYPE + 1],
625
                                'hash': p[self.HASH + 1],
626
                                'version': p[self.SERIAL + 1],
627
                                'version_timestamp': p[self.MTIME + 1],
628
                                'modified': p[self.MTIME + 1] if until is None else None,
629
                                'modified_by': p[self.MUSER + 1],
630
                                'uuid': p[self.UUID + 1],
631
                                'checksum': p[self.CHECKSUM + 1]})
632
        return objects
633

    
634
    @backend_method
635
    def list_object_permissions(self, user, account, container, prefix=''):
636
        """Return a list of paths that enforce permissions under a container."""
637

    
638
        logger.debug("list_object_permissions: %s %s %s %s", user,
639
                     account, container, prefix)
640
        return self._list_object_permissions(user, account, container, prefix, True, False)
641

    
642
    @backend_method
643
    def list_object_public(self, user, account, container, prefix=''):
644
        """Return a dict mapping paths to public ids for objects that are public under a container."""
645

    
646
        logger.debug("list_object_public: %s %s %s %s", user,
647
                     account, container, prefix)
648
        public = {}
649
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
650
            public[path] = p + ULTIMATE_ANSWER
651
        return public
652

    
653
    @backend_method
654
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
655
        """Return a dictionary with the object metadata for the domain."""
656

    
657
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
658
                     account, container, name, domain, version)
659
        self._can_read(user, account, container, name)
660
        path, node = self._lookup_object(account, container, name)
661
        props = self._get_version(node, version)
662
        if version is None:
663
            modified = props[self.MTIME]
664
        else:
665
            try:
666
                modified = self._get_version(
667
                    node)[self.MTIME]  # Overall last modification.
668
            except NameError:  # Object may be deleted.
669
                del_props = self.node.version_lookup(
670
                    node, inf, CLUSTER_DELETED)
671
                if del_props is None:
672
                    raise ItemNotExists('Object does not exist')
673
                modified = del_props[self.MTIME]
674

    
675
        meta = {}
676
        if include_user_defined:
677
            meta.update(
678
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
679
        meta.update({'name': name,
680
                     'bytes': props[self.SIZE],
681
                     'type': props[self.TYPE],
682
                     'hash': props[self.HASH],
683
                     'version': props[self.SERIAL],
684
                     'version_timestamp': props[self.MTIME],
685
                     'modified': modified,
686
                     'modified_by': props[self.MUSER],
687
                     'uuid': props[self.UUID],
688
                     'checksum': props[self.CHECKSUM]})
689
        return meta
690

    
691
    @backend_method
692
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
693
        """Update the metadata associated with the object for the domain and return the new version."""
694

    
695
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
696
                     user, account, container, name, domain, meta, replace)
697
        self._can_write(user, account, container, name)
698
        path, node = self._lookup_object(account, container, name)
699
        src_version_id, dest_version_id = self._put_metadata(
700
            user, node, domain, meta, replace)
701
        self._apply_versioning(account, container, src_version_id)
702
        return dest_version_id
703

    
704
    @backend_method
705
    def get_object_permissions(self, user, account, container, name):
706
        """Return the action allowed on the object, the path
707
        from which the object gets its permissions from,
708
        along with a dictionary containing the permissions."""
709

    
710
        logger.debug("get_object_permissions: %s %s %s %s", user,
711
                     account, container, name)
712
        allowed = 'write'
713
        permissions_path = self._get_permissions_path(account, container, name)
714
        if user != account:
715
            if self.permissions.access_check(permissions_path, self.WRITE, user):
716
                allowed = 'write'
717
            elif self.permissions.access_check(permissions_path, self.READ, user):
718
                allowed = 'read'
719
            else:
720
                raise NotAllowedError
721
        self._lookup_object(account, container, name)
722
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
723

    
724
    @backend_method
725
    def update_object_permissions(self, user, account, container, name, permissions):
726
        """Update the permissions associated with the object."""
727

    
728
        logger.debug("update_object_permissions: %s %s %s %s %s",
729
                     user, account, container, name, permissions)
730
        if user != account:
731
            raise NotAllowedError
732
        path = self._lookup_object(account, container, name)[0]
733
        self._check_permissions(path, permissions)
734
        self.permissions.access_set(path, permissions)
735
        self._report_sharing_change(user, account, path, {'members':
736
                                    self.permissions.access_members(path)})
737

    
738
    @backend_method
739
    def get_object_public(self, user, account, container, name):
740
        """Return the public id of the object if applicable."""
741

    
742
        logger.debug(
743
            "get_object_public: %s %s %s %s", user, account, container, name)
744
        self._can_read(user, account, container, name)
745
        path = self._lookup_object(account, container, name)[0]
746
        p = self.permissions.public_get(path)
747
        if p is not None:
748
            p += ULTIMATE_ANSWER
749
        return p
750

    
751
    @backend_method
752
    def update_object_public(self, user, account, container, name, public):
753
        """Update the public status of the object."""
754

    
755
        logger.debug("update_object_public: %s %s %s %s %s", user,
756
                     account, container, name, public)
757
        self._can_write(user, account, container, name)
758
        path = self._lookup_object(account, container, name)[0]
759
        if not public:
760
            self.permissions.public_unset(path)
761
        else:
762
            self.permissions.public_set(path)
763

    
764
    @backend_method
765
    def get_object_hashmap(self, user, account, container, name, version=None):
766
        """Return the object's size and a list with partial hashes."""
767

    
768
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
769
                     account, container, name, version)
770
        self._can_read(user, account, container, name)
771
        path, node = self._lookup_object(account, container, name)
772
        props = self._get_version(node, version)
773
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
774
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
775

    
776
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
777
        if permissions is not None and user != account:
778
            raise NotAllowedError
779
        self._can_write(user, account, container, name)
780
        if permissions is not None:
781
            path = '/'.join((account, container, name))
782
            self._check_permissions(path, permissions)
783

    
784
        account_path, account_node = self._lookup_account(account, True)
785
        container_path, container_node = self._lookup_container(
786
            account, container)
787
        path, node = self._put_object_node(
788
            container_path, container_node, name)
789
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
790

    
791
        # Handle meta.
792
        if src_version_id is None:
793
            src_version_id = pre_version_id
794
        self._put_metadata_duplicate(
795
            src_version_id, dest_version_id, domain, meta, replace_meta)
796

    
797
        # Check quota.
798
        del_size = self._apply_versioning(account, container, pre_version_id)
799
        size_delta = size - del_size
800
        if size_delta > 0:
801
            account_quota = long(self._get_policy(account_node)['quota'])
802
            container_quota = long(self._get_policy(container_node)['quota'])
803
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
804
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
805
                # This must be executed in a transaction, so the version is never created if it fails.
806
                raise QuotaError
807
        self._report_size_change(user, account, size_delta, {
808
                                 'action': 'object update', 'path': path})
809

    
810
        if permissions is not None:
811
            self.permissions.access_set(path, permissions)
812
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
813

    
814
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
815
        return dest_version_id
816

    
817
    @backend_method
818
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
819
        """Create/update an object with the specified size and partial hashes."""
820

    
821
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
822
                     account, container, name, size, type, hashmap, checksum)
823
        if size == 0:  # No such thing as an empty hashmap.
824
            hashmap = [self.put_block('')]
825
        map = HashMap(self.block_size, self.hash_algorithm)
826
        map.extend([binascii.unhexlify(x) for x in hashmap])
827
        missing = self.store.block_search(map)
828
        if missing:
829
            ie = IndexError()
830
            ie.data = [binascii.hexlify(x) for x in missing]
831
            raise ie
832

    
833
        hash = map.hash()
834
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
835
        self.store.map_put(hash, map)
836
        return dest_version_id
837

    
838
    @backend_method
839
    def update_object_checksum(self, user, account, container, name, version, checksum):
840
        """Update an object's checksum."""
841

    
842
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
843
                     user, account, container, name, version, checksum)
844
        # Update objects with greater version and same hashmap and size (fix metadata updates).
845
        self._can_write(user, account, container, name)
846
        path, node = self._lookup_object(account, container, name)
847
        props = self._get_version(node, version)
848
        versions = self.node.node_get_versions(node)
849
        for x in versions:
850
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
851
                self.node.version_put_property(
852
                    x[self.SERIAL], 'checksum', checksum)
853

    
854
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
855
        dest_version_ids = []
856
        self._can_read(user, src_account, src_container, src_name)
857
        path, node = self._lookup_object(src_account, src_container, src_name)
858
        # TODO: Will do another fetch of the properties in duplicate version...
859
        props = self._get_version(
860
            node, src_version)  # Check to see if source exists.
861
        src_version_id = props[self.SERIAL]
862
        hash = props[self.HASH]
863
        size = props[self.SIZE]
864
        is_copy = not is_move and (src_account, src_container, src_name) != (
865
            dest_account, dest_container, dest_name)  # New uuid.
866
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
867
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
868
            self._delete_object(user, src_account, src_container, src_name)
869

    
870
        if delimiter:
871
            prefix = src_name + \
872
                delimiter if not src_name.endswith(delimiter) else src_name
873
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
874
            src_names.sort(key=lambda x: x[2])  # order by nodes
875
            paths = [elem[0] for elem in src_names]
876
            nodes = [elem[2] for elem in src_names]
877
            # TODO: Will do another fetch of the properties in duplicate version...
878
            props = self._get_versions(nodes)  # Check to see if source exists.
879

    
880
            for prop, path, node in zip(props, paths, nodes):
881
                src_version_id = prop[self.SERIAL]
882
                hash = prop[self.HASH]
883
                vtype = prop[self.TYPE]
884
                size = prop[self.SIZE]
885
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
886
                    delimiter) else dest_name
887
                vdest_name = path.replace(prefix, dest_prefix, 1)
888
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
889
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
890
                    self._delete_object(user, src_account, src_container, path)
891
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
892

    
893
    @backend_method
894
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
895
        """Copy an object's data and metadata."""
896

    
897
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
898
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
899
        return dest_version_id
900

    
901
    @backend_method
902
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
903
        """Move an object's data and metadata."""
904

    
905
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
906
        if user != src_account:
907
            raise NotAllowedError
908
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
909
        return dest_version_id
910

    
911
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
912
        if user != account:
913
            raise NotAllowedError
914

    
915
        if until is not None:
916
            path = '/'.join((account, container, name))
917
            node = self.node.node_lookup(path)
918
            if node is None:
919
                return
920
            hashes = []
921
            size = 0
922
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
923
            hashes += h
924
            size += s
925
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
926
            hashes += h
927
            size += s
928
            for h in hashes:
929
                self.store.map_delete(h)
930
            self.node.node_purge(node, until, CLUSTER_DELETED)
931
            try:
932
                props = self._get_version(node)
933
            except NameError:
934
                self.permissions.access_clear(path)
935
            self._report_size_change(user, account, -size, {
936
                                     'action': 'object purge', 'path': path})
937
            return
938

    
939
        path, node = self._lookup_object(account, container, name)
940
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
941
        del_size = self._apply_versioning(account, container, src_version_id)
942
        if del_size:
943
            self._report_size_change(user, account, -del_size, {
944
                                     'action': 'object delete', 'path': path})
945
        self._report_object_change(
946
            user, account, path, details={'action': 'object delete'})
947
        self.permissions.access_clear(path)
948

    
949
        if delimiter:
950
            prefix = name + delimiter if not name.endswith(delimiter) else name
951
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
952
            paths = []
953
            for t in src_names:
954
                path = '/'.join((account, container, t[0]))
955
                node = t[2]
956
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
957
                del_size = self._apply_versioning(
958
                    account, container, src_version_id)
959
                if del_size:
960
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
961
                self._report_object_change(
962
                    user, account, path, details={'action': 'object delete'})
963
                paths.append(path)
964
            self.permissions.access_clear_bulk(paths)
965

    
966
    @backend_method
967
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
968
        """Delete/purge an object."""
969

    
970
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
971
                     account, container, name, until, prefix, delimiter)
972
        self._delete_object(user, account, container, name, until, delimiter)
973

    
974
    @backend_method
975
    def list_versions(self, user, account, container, name):
976
        """Return a list of all (version, version_timestamp) tuples for an object."""
977

    
978
        logger.debug(
979
            "list_versions: %s %s %s %s", user, account, container, name)
980
        self._can_read(user, account, container, name)
981
        path, node = self._lookup_object(account, container, name)
982
        versions = self.node.node_get_versions(node)
983
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
984

    
985
    @backend_method
986
    def get_uuid(self, user, uuid):
987
        """Return the (account, container, name) for the UUID given."""
988

    
989
        logger.debug("get_uuid: %s %s", user, uuid)
990
        info = self.node.latest_uuid(uuid)
991
        if info is None:
992
            raise NameError
993
        path, serial = info
994
        account, container, name = path.split('/', 2)
995
        self._can_read(user, account, container, name)
996
        return (account, container, name)
997

    
998
    @backend_method
999
    def get_public(self, user, public):
1000
        """Return the (account, container, name) for the public id given."""
1001

    
1002
        logger.debug("get_public: %s %s", user, public)
1003
        if public is None or public < ULTIMATE_ANSWER:
1004
            raise NameError
1005
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1006
        if path is None:
1007
            raise NameError
1008
        account, container, name = path.split('/', 2)
1009
        self._can_read(user, account, container, name)
1010
        return (account, container, name)
1011

    
1012
    @backend_method(autocommit=0)
1013
    def get_block(self, hash):
1014
        """Return a block's data."""
1015

    
1016
        logger.debug("get_block: %s", hash)
1017
        block = self.store.block_get(binascii.unhexlify(hash))
1018
        if not block:
1019
            raise ItemNotExists('Block does not exist')
1020
        return block
1021

    
1022
    @backend_method(autocommit=0)
1023
    def put_block(self, data):
1024
        """Store a block and return the hash."""
1025

    
1026
        logger.debug("put_block: %s", len(data))
1027
        return binascii.hexlify(self.store.block_put(data))
1028

    
1029
    @backend_method(autocommit=0)
1030
    def update_block(self, hash, data, offset=0):
1031
        """Update a known block and return the hash."""
1032

    
1033
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1034
        if offset == 0 and len(data) == self.block_size:
1035
            return self.put_block(data)
1036
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1037
        return binascii.hexlify(h)
1038

    
1039
    # Path functions.
1040

    
1041
    def _generate_uuid(self):
1042
        return str(uuidlib.uuid4())
1043

    
1044
    def _put_object_node(self, path, parent, name):
1045
        path = '/'.join((path, name))
1046
        node = self.node.node_lookup(path)
1047
        if node is None:
1048
            node = self.node.node_create(parent, path)
1049
        return path, node
1050

    
1051
    def _put_path(self, user, parent, path):
1052
        node = self.node.node_create(parent, path)
1053
        self.node.version_create(node, None, 0, '', None, user,
1054
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1055
        return node
1056

    
1057
    def _lookup_account(self, account, create=True):
1058
        node = self.node.node_lookup(account)
1059
        if node is None and create:
1060
            node = self._put_path(
1061
                account, self.ROOTNODE, account)  # User is account.
1062
        return account, node
1063

    
1064
    def _lookup_container(self, account, container):
1065
        path = '/'.join((account, container))
1066
        node = self.node.node_lookup(path)
1067
        if node is None:
1068
            raise ItemNotExists('Container does not exist')
1069
        return path, node
1070

    
1071
    def _lookup_object(self, account, container, name):
1072
        path = '/'.join((account, container, name))
1073
        node = self.node.node_lookup(path)
1074
        if node is None:
1075
            raise ItemNotExists('Object does not exist')
1076
        return path, node
1077

    
1078
    def _lookup_objects(self, paths):
1079
        nodes = self.node.node_lookup_bulk(paths)
1080
        return paths, nodes
1081

    
1082
    def _get_properties(self, node, until=None):
1083
        """Return properties until the timestamp given."""
1084

    
1085
        before = until if until is not None else inf
1086
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1087
        if props is None and until is not None:
1088
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1089
        if props is None:
1090
            raise ItemNotExists('Path does not exist')
1091
        return props
1092

    
1093
    def _get_statistics(self, node, until=None):
1094
        """Return count, sum of size and latest timestamp of everything under node."""
1095

    
1096
        if until is None:
1097
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1098
        else:
1099
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1100
        if stats is None:
1101
            stats = (0, 0, 0)
1102
        return stats
1103

    
1104
    def _get_version(self, node, version=None):
1105
        if version is None:
1106
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1107
            if props is None:
1108
                raise ItemNotExists('Object does not exist')
1109
        else:
1110
            try:
1111
                version = int(version)
1112
            except ValueError:
1113
                raise VersionNotExists('Version does not exist')
1114
            props = self.node.version_get_properties(version)
1115
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1116
                raise VersionNotExists('Version does not exist')
1117
        return props
1118

    
1119
    def _get_versions(self, nodes):
1120
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1121

    
1122
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1123
        """Create a new version of the node."""
1124

    
1125
        props = self.node.version_lookup(
1126
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1127
        if props is not None:
1128
            src_version_id = props[self.SERIAL]
1129
            src_hash = props[self.HASH]
1130
            src_size = props[self.SIZE]
1131
            src_type = props[self.TYPE]
1132
            src_checksum = props[self.CHECKSUM]
1133
        else:
1134
            src_version_id = None
1135
            src_hash = None
1136
            src_size = 0
1137
            src_type = ''
1138
            src_checksum = ''
1139
        if size is None:  # Set metadata.
1140
            hash = src_hash  # This way hash can be set to None (account or container).
1141
            size = src_size
1142
        if type is None:
1143
            type = src_type
1144
        if checksum is None:
1145
            checksum = src_checksum
1146
        uuid = self._generate_uuid(
1147
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1148

    
1149
        if src_node is None:
1150
            pre_version_id = src_version_id
1151
        else:
1152
            pre_version_id = None
1153
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1154
            if props is not None:
1155
                pre_version_id = props[self.SERIAL]
1156
        if pre_version_id is not None:
1157
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1158

    
1159
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1160
        return pre_version_id, dest_version_id
1161

    
1162
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1163
        if src_version_id is not None:
1164
            self.node.attribute_copy(src_version_id, dest_version_id)
1165
        if not replace:
1166
            self.node.attribute_del(dest_version_id, domain, (
1167
                k for k, v in meta.iteritems() if v == ''))
1168
            self.node.attribute_set(dest_version_id, domain, (
1169
                (k, v) for k, v in meta.iteritems() if v != ''))
1170
        else:
1171
            self.node.attribute_del(dest_version_id, domain)
1172
            self.node.attribute_set(dest_version_id, domain, ((
1173
                k, v) for k, v in meta.iteritems()))
1174

    
1175
    def _put_metadata(self, user, node, domain, meta, replace=False):
1176
        """Create a new version and store metadata."""
1177

    
1178
        src_version_id, dest_version_id = self._put_version_duplicate(
1179
            user, node)
1180
        self._put_metadata_duplicate(
1181
            src_version_id, dest_version_id, domain, meta, replace)
1182
        return src_version_id, dest_version_id
1183

    
1184
    def _list_limits(self, listing, marker, limit):
1185
        start = 0
1186
        if marker:
1187
            try:
1188
                start = listing.index(marker) + 1
1189
            except ValueError:
1190
                pass
1191
        if not limit or limit > 10000:
1192
            limit = 10000
1193
        return start, limit
1194

    
1195
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1196
        cont_prefix = path + '/'
1197
        prefix = cont_prefix + prefix
1198
        start = cont_prefix + marker if marker else None
1199
        before = until if until is not None else inf
1200
        filterq = keys if domain else []
1201
        sizeq = size_range
1202

    
1203
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1204
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1205
        objects.sort(key=lambda x: x[0])
1206
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1207
        return objects
1208

    
1209
    # Reporting functions.
1210

    
1211
    def _report_size_change(self, user, account, size, details={}):
1212
        account_node = self._lookup_account(account, True)[1]
1213
        total = self._get_statistics(account_node)[1]
1214
        details.update({'user': user, 'total': total})
1215
        logger.debug(
1216
            "_report_size_change: %s %s %s %s", user, account, size, details)
1217
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1218
                                                  account,
1219
                                                  QUEUE_INSTANCE_ID,
1220
                                                  'diskspace',
1221
                                                  float(size),
1222
                                                  details))
1223

    
1224
    def _report_object_change(self, user, account, path, details={}):
1225
        details.update({'user': user})
1226
        logger.debug("_report_object_change: %s %s %s %s", user,
1227
                     account, path, details)
1228
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1229
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1230

    
1231
    def _report_sharing_change(self, user, account, path, details={}):
1232
        logger.debug("_report_permissions_change: %s %s %s %s",
1233
                     user, account, path, details)
1234
        details.update({'user': user})
1235
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1236
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1237

    
1238
    # Policy functions.
1239

    
1240
    def _check_policy(self, policy):
1241
        for k in policy.keys():
1242
            if policy[k] == '':
1243
                policy[k] = self.default_policy.get(k)
1244
        for k, v in policy.iteritems():
1245
            if k == 'quota':
1246
                q = int(v)  # May raise ValueError.
1247
                if q < 0:
1248
                    raise ValueError
1249
            elif k == 'versioning':
1250
                if v not in ['auto', 'none']:
1251
                    raise ValueError
1252
            else:
1253
                raise ValueError
1254

    
1255
    def _put_policy(self, node, policy, replace):
1256
        if replace:
1257
            for k, v in self.default_policy.iteritems():
1258
                if k not in policy:
1259
                    policy[k] = v
1260
        self.node.policy_set(node, policy)
1261

    
1262
    def _get_policy(self, node):
1263
        policy = self.default_policy.copy()
1264
        policy.update(self.node.policy_get(node))
1265
        return policy
1266

    
1267
    def _apply_versioning(self, account, container, version_id):
1268
        """Delete the provided version if such is the policy.
1269
           Return size of object removed.
1270
        """
1271

    
1272
        if version_id is None:
1273
            return 0
1274
        path, node = self._lookup_container(account, container)
1275
        versioning = self._get_policy(node)['versioning']
1276
        if versioning != 'auto':
1277
            hash, size = self.node.version_remove(version_id)
1278
            self.store.map_delete(hash)
1279
            return size
1280
        return 0
1281

    
1282
    # Access control functions.
1283

    
1284
    def _check_groups(self, groups):
1285
        # raise ValueError('Bad characters in groups')
1286
        pass
1287

    
1288
    def _check_permissions(self, path, permissions):
1289
        # raise ValueError('Bad characters in permissions')
1290
        pass
1291

    
1292
    def _get_formatted_paths(self, paths):
1293
        formatted = []
1294
        for p in paths:
1295
            node = self.node.node_lookup(p)
1296
            props = None
1297
            if node is not None:
1298
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1299
            if props is not None:
1300
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1301
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1302
                formatted.append((p, self.MATCH_EXACT))
1303
        return formatted
1304

    
1305
    def _get_permissions_path(self, account, container, name):
1306
        path = '/'.join((account, container, name))
1307
        permission_paths = self.permissions.access_inherit(path)
1308
        permission_paths.sort()
1309
        permission_paths.reverse()
1310
        for p in permission_paths:
1311
            if p == path:
1312
                return p
1313
            else:
1314
                if p.count('/') < 2:
1315
                    continue
1316
                node = self.node.node_lookup(p)
1317
                if node is not None:
1318
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1319
                if props is not None:
1320
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1321
                        return p
1322
        return None
1323

    
1324
    def _can_read(self, user, account, container, name):
1325
        if user == account:
1326
            return True
1327
        path = '/'.join((account, container, name))
1328
        if self.permissions.public_get(path) is not None:
1329
            return True
1330
        path = self._get_permissions_path(account, container, name)
1331
        if not path:
1332
            raise NotAllowedError
1333
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1334
            raise NotAllowedError
1335

    
1336
    def _can_write(self, user, account, container, name):
1337
        if user == account:
1338
            return True
1339
        path = '/'.join((account, container, name))
1340
        path = self._get_permissions_path(account, container, name)
1341
        if not path:
1342
            raise NotAllowedError
1343
        if not self.permissions.access_check(path, self.WRITE, user):
1344
            raise NotAllowedError
1345

    
1346
    def _allowed_accounts(self, user):
1347
        allow = set()
1348
        for path in self.permissions.access_list_paths(user):
1349
            allow.add(path.split('/', 1)[0])
1350
        return sorted(allow)
1351

    
1352
    def _allowed_containers(self, user, account):
1353
        allow = set()
1354
        for path in self.permissions.access_list_paths(user, account):
1355
            allow.add(path.split('/', 2)[1])
1356
        return sorted(allow)