Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d50ed8d4

History | View | Annotate | Download (58.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
83

    
84
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
85
QUEUE_CLIENT_ID = 'pithos'
86
QUEUE_INSTANCE_ID = '1'
87

    
88
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
89

    
90
inf = float('inf')
91

    
92
ULTIMATE_ANSWER = 42
93

    
94

    
95
logger = logging.getLogger(__name__)
96

    
97

    
98
def backend_method(func=None, autocommit=1):
99
    if func is None:
100
        def fn(func):
101
            return backend_method(func, autocommit)
102
        return fn
103

    
104
    if not autocommit:
105
        return func
106

    
107
    def fn(self, *args, **kw):
108
        self.wrapper.execute()
109
        try:
110
            self.messages = []
111
            ret = func(self, *args, **kw)
112
            for m in self.messages:
113
                self.queue.send(*m)
114
            self.wrapper.commit()
115
            return ret
116
        except:
117
            self.wrapper.rollback()
118
            raise
119
    return fn
120

    
121

    
122
class ModularBackend(BaseBackend):
123
    """A modular backend.
124

125
    Uses modules for SQL functions and storage.
126
    """
127

    
128
    def __init__(self, db_module=None, db_connection=None,
129
                 block_module=None, block_path=None, block_umask=None,
130
                 queue_module=None, queue_connection=None):
131
        db_module = db_module or DEFAULT_DB_MODULE
132
        db_connection = db_connection or DEFAULT_DB_CONNECTION
133
        block_module = block_module or DEFAULT_BLOCK_MODULE
134
        block_path = block_path or DEFAULT_BLOCK_PATH
135
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
136
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
138

    
139
        self.hash_algorithm = 'sha256'
140
        self.block_size = 4 * 1024 * 1024  # 4MB
141

    
142
        self.default_policy = {'quota': DEFAULT_QUOTA,
143
                               'versioning': DEFAULT_VERSIONING}
144

    
145
        def load_module(m):
146
            __import__(m)
147
            return sys.modules[m]
148

    
149
        self.db_module = load_module(db_module)
150
        self.wrapper = self.db_module.DBWrapper(db_connection)
151
        params = {'wrapper': self.wrapper}
152
        self.permissions = self.db_module.Permissions(**params)
153
        for x in ['READ', 'WRITE']:
154
            setattr(self, x, getattr(self.db_module, x))
155
        self.node = self.db_module.Node(**params)
156
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
157
            setattr(self, x, getattr(self.db_module, x))
158

    
159
        self.block_module = load_module(block_module)
160
        params = {'path': block_path,
161
                  'block_size': self.block_size,
162
                  'hash_algorithm': self.hash_algorithm,
163
                  'umask': block_umask}
164
        self.store = self.block_module.Store(**params)
165

    
166
        if queue_module and queue_connection:
167
            self.queue_module = load_module(queue_module)
168
            params = {'exchange': queue_connection,
169
                      'client_id': QUEUE_CLIENT_ID}
170
            self.queue = self.queue_module.Queue(**params)
171
        else:
172
            class NoQueue:
173
                def send(self, *args):
174
                    pass
175

    
176
                def close(self):
177
                    pass
178

    
179
            self.queue = NoQueue()
180

    
181
    def close(self):
182
        self.wrapper.close()
183
        self.queue.close()
184

    
185
    @backend_method
186
    def list_accounts(self, user, marker=None, limit=10000):
187
        """Return a list of accounts the user can access."""
188

    
189
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
190
        allowed = self._allowed_accounts(user)
191
        start, limit = self._list_limits(allowed, marker, limit)
192
        return allowed[start:start + limit]
193

    
194
    @backend_method
195
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
196
        """Return a dictionary with the account metadata for the domain."""
197

    
198
        logger.debug(
199
            "get_account_meta: %s %s %s %s", user, account, domain, until)
200
        path, node = self._lookup_account(account, user == account)
201
        if user != account:
202
            if until or node is None or account not in self._allowed_accounts(user):
203
                raise NotAllowedError
204
        try:
205
            props = self._get_properties(node, until)
206
            mtime = props[self.MTIME]
207
        except NameError:
208
            props = None
209
            mtime = until
210
        count, bytes, tstamp = self._get_statistics(node, until)
211
        tstamp = max(tstamp, mtime)
212
        if until is None:
213
            modified = tstamp
214
        else:
215
            modified = self._get_statistics(
216
                node)[2]  # Overall last modification.
217
            modified = max(modified, mtime)
218

    
219
        if user != account:
220
            meta = {'name': account}
221
        else:
222
            meta = {}
223
            if props is not None and include_user_defined:
224
                meta.update(
225
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
226
            if until is not None:
227
                meta.update({'until_timestamp': tstamp})
228
            meta.update({'name': account, 'count': count, 'bytes': bytes})
229
        meta.update({'modified': modified})
230
        return meta
231

    
232
    @backend_method
233
    def update_account_meta(self, user, account, domain, meta, replace=False):
234
        """Update the metadata associated with the account for the domain."""
235

    
236
        logger.debug("update_account_meta: %s %s %s %s %s", user,
237
                     account, domain, meta, replace)
238
        if user != account:
239
            raise NotAllowedError
240
        path, node = self._lookup_account(account, True)
241
        self._put_metadata(user, node, domain, meta, replace)
242

    
243
    @backend_method
244
    def get_account_groups(self, user, account):
245
        """Return a dictionary with the user groups defined for this account."""
246

    
247
        logger.debug("get_account_groups: %s %s", user, account)
248
        if user != account:
249
            if account not in self._allowed_accounts(user):
250
                raise NotAllowedError
251
            return {}
252
        self._lookup_account(account, True)
253
        return self.permissions.group_dict(account)
254

    
255
    @backend_method
256
    def update_account_groups(self, user, account, groups, replace=False):
257
        """Update the groups associated with the account."""
258

    
259
        logger.debug("update_account_groups: %s %s %s %s", user,
260
                     account, groups, replace)
261
        if user != account:
262
            raise NotAllowedError
263
        self._lookup_account(account, True)
264
        self._check_groups(groups)
265
        if replace:
266
            self.permissions.group_destroy(account)
267
        for k, v in groups.iteritems():
268
            if not replace:  # If not already deleted.
269
                self.permissions.group_delete(account, k)
270
            if v:
271
                self.permissions.group_addmany(account, k, v)
272

    
273
    @backend_method
274
    def get_account_policy(self, user, account):
275
        """Return a dictionary with the account policy."""
276

    
277
        logger.debug("get_account_policy: %s %s", user, account)
278
        if user != account:
279
            if account not in self._allowed_accounts(user):
280
                raise NotAllowedError
281
            return {}
282
        path, node = self._lookup_account(account, True)
283
        return self._get_policy(node)
284

    
285
    @backend_method
286
    def update_account_policy(self, user, account, policy, replace=False):
287
        """Update the policy associated with the account."""
288

    
289
        logger.debug("update_account_policy: %s %s %s %s", user,
290
                     account, policy, replace)
291
        if user != account:
292
            raise NotAllowedError
293
        path, node = self._lookup_account(account, True)
294
        self._check_policy(policy)
295
        self._put_policy(node, policy, replace)
296

    
297
    @backend_method
298
    def put_account(self, user, account, policy={}):
299
        """Create a new account with the given name."""
300

    
301
        logger.debug("put_account: %s %s %s", user, account, policy)
302
        if user != account:
303
            raise NotAllowedError
304
        node = self.node.node_lookup(account)
305
        if node is not None:
306
            raise AccountExists('Account already exists')
307
        if policy:
308
            self._check_policy(policy)
309
        node = self._put_path(user, self.ROOTNODE, account)
310
        self._put_policy(node, policy, True)
311

    
312
    @backend_method
313
    def delete_account(self, user, account):
314
        """Delete the account with the given name."""
315

    
316
        logger.debug("delete_account: %s %s", user, account)
317
        if user != account:
318
            raise NotAllowedError
319
        node = self.node.node_lookup(account)
320
        if node is None:
321
            return
322
        if not self.node.node_remove(node):
323
            raise AccountNotEmpty('Account is not empty')
324
        self.permissions.group_destroy(account)
325

    
326
    @backend_method
327
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
328
        """Return a list of containers existing under an account."""
329

    
330
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
331
                     account, marker, limit, shared, until, public)
332
        if user != account:
333
            if until or account not in self._allowed_accounts(user):
334
                raise NotAllowedError
335
            allowed = self._allowed_containers(user, account)
336
            start, limit = self._list_limits(allowed, marker, limit)
337
            return allowed[start:start + limit]
338
        if shared or public:
339
            allowed = set()
340
            if shared:
341
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
342
            if public:
343
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
344
            allowed = sorted(allowed)
345
            start, limit = self._list_limits(allowed, marker, limit)
346
            return allowed[start:start + limit]
347
        node = self.node.node_lookup(account)
348
        containers = [x[0] for x in self._list_object_properties(
349
            node, account, '', '/', marker, limit, False, None, [], until)]
350
        start, limit = self._list_limits(
351
            [x[0] for x in containers], marker, limit)
352
        return containers[start:start + limit]
353

    
354
    @backend_method
355
    def list_container_meta(self, user, account, container, domain, until=None):
356
        """Return a list with all the container's object meta keys for the domain."""
357

    
358
        logger.debug("list_container_meta: %s %s %s %s %s", user,
359
                     account, container, domain, until)
360
        allowed = []
361
        if user != account:
362
            if until:
363
                raise NotAllowedError
364
            allowed = self.permissions.access_list_paths(
365
                user, '/'.join((account, container)))
366
            if not allowed:
367
                raise NotAllowedError
368
        path, node = self._lookup_container(account, container)
369
        before = until if until is not None else inf
370
        allowed = self._get_formatted_paths(allowed)
371
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
372

    
373
    @backend_method
374
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
375
        """Return a dictionary with the container metadata for the domain."""
376

    
377
        logger.debug("get_container_meta: %s %s %s %s %s", user,
378
                     account, container, domain, until)
379
        if user != account:
380
            if until or container not in self._allowed_containers(user, account):
381
                raise NotAllowedError
382
        path, node = self._lookup_container(account, container)
383
        props = self._get_properties(node, until)
384
        mtime = props[self.MTIME]
385
        count, bytes, tstamp = self._get_statistics(node, until)
386
        tstamp = max(tstamp, mtime)
387
        if until is None:
388
            modified = tstamp
389
        else:
390
            modified = self._get_statistics(
391
                node)[2]  # Overall last modification.
392
            modified = max(modified, mtime)
393

    
394
        if user != account:
395
            meta = {'name': container}
396
        else:
397
            meta = {}
398
            if include_user_defined:
399
                meta.update(
400
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
401
            if until is not None:
402
                meta.update({'until_timestamp': tstamp})
403
            meta.update({'name': container, 'count': count, 'bytes': bytes})
404
        meta.update({'modified': modified})
405
        return meta
406

    
407
    @backend_method
408
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
409
        """Update the metadata associated with the container for the domain."""
410

    
411
        logger.debug("update_container_meta: %s %s %s %s %s %s",
412
                     user, account, container, domain, meta, replace)
413
        if user != account:
414
            raise NotAllowedError
415
        path, node = self._lookup_container(account, container)
416
        src_version_id, dest_version_id = self._put_metadata(
417
            user, node, domain, meta, replace)
418
        if src_version_id is not None:
419
            versioning = self._get_policy(node)['versioning']
420
            if versioning != 'auto':
421
                self.node.version_remove(src_version_id)
422

    
423
    @backend_method
424
    def get_container_policy(self, user, account, container):
425
        """Return a dictionary with the container policy."""
426

    
427
        logger.debug(
428
            "get_container_policy: %s %s %s", user, account, container)
429
        if user != account:
430
            if container not in self._allowed_containers(user, account):
431
                raise NotAllowedError
432
            return {}
433
        path, node = self._lookup_container(account, container)
434
        return self._get_policy(node)
435

    
436
    @backend_method
437
    def update_container_policy(self, user, account, container, policy, replace=False):
438
        """Update the policy associated with the container."""
439

    
440
        logger.debug("update_container_policy: %s %s %s %s %s",
441
                     user, account, container, policy, replace)
442
        if user != account:
443
            raise NotAllowedError
444
        path, node = self._lookup_container(account, container)
445
        self._check_policy(policy)
446
        self._put_policy(node, policy, replace)
447

    
448
    @backend_method
449
    def put_container(self, user, account, container, policy={}):
450
        """Create a new container with the given name."""
451

    
452
        logger.debug(
453
            "put_container: %s %s %s %s", user, account, container, policy)
454
        if user != account:
455
            raise NotAllowedError
456
        try:
457
            path, node = self._lookup_container(account, container)
458
        except NameError:
459
            pass
460
        else:
461
            raise ContainerExists('Container already exists')
462
        if policy:
463
            self._check_policy(policy)
464
        path = '/'.join((account, container))
465
        node = self._put_path(
466
            user, self._lookup_account(account, True)[1], path)
467
        self._put_policy(node, policy, True)
468

    
469
    @backend_method
470
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
471
        """Delete/purge the container with the given name."""
472

    
473
        logger.debug("delete_container: %s %s %s %s %s %s", user,
474
                     account, container, until, prefix, delimiter)
475
        if user != account:
476
            raise NotAllowedError
477
        path, node = self._lookup_container(account, container)
478

    
479
        if until is not None:
480
            hashes, size = self.node.node_purge_children(
481
                node, until, CLUSTER_HISTORY)
482
            for h in hashes:
483
                self.store.map_delete(h)
484
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
485
            self._report_size_change(user, account, -size, {'action':
486
                                     'container purge', 'path': path})
487
            return
488

    
489
        if not delimiter:
490
            if self._get_statistics(node)[0] > 0:
491
                raise ContainerNotEmpty('Container is not empty')
492
            hashes, size = self.node.node_purge_children(
493
                node, inf, CLUSTER_HISTORY)
494
            for h in hashes:
495
                self.store.map_delete(h)
496
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
497
            self.node.node_remove(node)
498
            self._report_size_change(user, account, -size, {'action':
499
                                     'container delete', 'path': path})
500
        else:
501
                # remove only contents
502
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
503
            paths = []
504
            for t in src_names:
505
                path = '/'.join((account, container, t[0]))
506
                node = t[2]
507
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
508
                del_size = self._apply_versioning(
509
                    account, container, src_version_id)
510
                if del_size:
511
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
512
                self._report_object_change(
513
                    user, account, path, details={'action': 'object delete'})
514
                paths.append(path)
515
            self.permissions.access_clear_bulk(paths)
516

    
517
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
518
        if user != account and until:
519
            raise NotAllowedError
520
        if shared and public:
521
            # get shared first
522
            shared = self._list_object_permissions(
523
                user, account, container, prefix, shared=True, public=False)
524
            objects = set()
525
            if shared:
526
                path, node = self._lookup_container(account, container)
527
                shared = self._get_formatted_paths(shared)
528
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
529

    
530
            # get public
531
            objects |= set(self._list_public_object_properties(
532
                user, account, container, prefix, all_props))
533
            objects = list(objects)
534

    
535
            objects.sort(key=lambda x: x[0])
536
            start, limit = self._list_limits(
537
                [x[0] for x in objects], marker, limit)
538
            return objects[start:start + limit]
539
        elif public:
540
            objects = self._list_public_object_properties(
541
                user, account, container, prefix, all_props)
542
            start, limit = self._list_limits(
543
                [x[0] for x in objects], marker, limit)
544
            return objects[start:start + limit]
545

    
546
        allowed = self._list_object_permissions(
547
            user, account, container, prefix, shared, public)
548
        if shared and not allowed:
549
            return []
550
        path, node = self._lookup_container(account, container)
551
        allowed = self._get_formatted_paths(allowed)
552
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
553
        start, limit = self._list_limits(
554
            [x[0] for x in objects], marker, limit)
555
        return objects[start:start + limit]
556

    
557
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
558
        public = self._list_object_permissions(
559
            user, account, container, prefix, shared=False, public=True)
560
        paths, nodes = self._lookup_objects(public)
561
        path = '/'.join((account, container))
562
        cont_prefix = path + '/'
563
        paths = [x[len(cont_prefix):] for x in paths]
564
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
565
        objects = [(path,) + props for path, props in zip(paths, props)]
566
        return objects
567

    
568
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
569
        objects = []
570
        while True:
571
            marker = objects[-1] if objects else None
572
            limit = 10000
573
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
574
            objects.extend(l)
575
            if not l or len(l) < limit:
576
                break
577
        return objects
578

    
579
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
580
        allowed = []
581
        path = '/'.join((account, container, prefix)).rstrip('/')
582
        if user != account:
583
            allowed = self.permissions.access_list_paths(user, path)
584
            if not allowed:
585
                raise NotAllowedError
586
        else:
587
            allowed = set()
588
            if shared:
589
                allowed.update(self.permissions.access_list_shared(path))
590
            if public:
591
                allowed.update(
592
                    [x[0] for x in self.permissions.public_list(path)])
593
            allowed = sorted(allowed)
594
            if not allowed:
595
                return []
596
        return allowed
597

    
598
    @backend_method
599
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
600
        """Return a list of object (name, version_id) tuples existing under a container."""
601

    
602
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
603
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
604

    
605
    @backend_method
606
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
607
        """Return a list of object metadata dicts existing under a container."""
608

    
609
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
610
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
611
        objects = []
612
        for p in props:
613
            if len(p) == 2:
614
                objects.append({'subdir': p[0]})
615
            else:
616
                objects.append({'name': p[0],
617
                                'bytes': p[self.SIZE + 1],
618
                                'type': p[self.TYPE + 1],
619
                                'hash': p[self.HASH + 1],
620
                                'version': p[self.SERIAL + 1],
621
                                'version_timestamp': p[self.MTIME + 1],
622
                                'modified': p[self.MTIME + 1] if until is None else None,
623
                                'modified_by': p[self.MUSER + 1],
624
                                'uuid': p[self.UUID + 1],
625
                                'checksum': p[self.CHECKSUM + 1]})
626
        return objects
627

    
628
    @backend_method
629
    def list_object_permissions(self, user, account, container, prefix=''):
630
        """Return a list of paths that enforce permissions under a container."""
631

    
632
        logger.debug("list_object_permissions: %s %s %s %s", user,
633
                     account, container, prefix)
634
        return self._list_object_permissions(user, account, container, prefix, True, False)
635

    
636
    @backend_method
637
    def list_object_public(self, user, account, container, prefix=''):
638
        """Return a dict mapping paths to public ids for objects that are public under a container."""
639

    
640
        logger.debug("list_object_public: %s %s %s %s", user,
641
                     account, container, prefix)
642
        public = {}
643
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
644
            public[path] = p + ULTIMATE_ANSWER
645
        return public
646

    
647
    @backend_method
648
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
649
        """Return a dictionary with the object metadata for the domain."""
650

    
651
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
652
                     account, container, name, domain, version)
653
        self._can_read(user, account, container, name)
654
        path, node = self._lookup_object(account, container, name)
655
        props = self._get_version(node, version)
656
        if version is None:
657
            modified = props[self.MTIME]
658
        else:
659
            try:
660
                modified = self._get_version(
661
                    node)[self.MTIME]  # Overall last modification.
662
            except NameError:  # Object may be deleted.
663
                del_props = self.node.version_lookup(
664
                    node, inf, CLUSTER_DELETED)
665
                if del_props is None:
666
                    raise ItemNotExists('Object does not exist')
667
                modified = del_props[self.MTIME]
668

    
669
        meta = {}
670
        if include_user_defined:
671
            meta.update(
672
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
673
        meta.update({'name': name,
674
                     'bytes': props[self.SIZE],
675
                     'type': props[self.TYPE],
676
                     'hash': props[self.HASH],
677
                     'version': props[self.SERIAL],
678
                     'version_timestamp': props[self.MTIME],
679
                     'modified': modified,
680
                     'modified_by': props[self.MUSER],
681
                     'uuid': props[self.UUID],
682
                     'checksum': props[self.CHECKSUM]})
683
        return meta
684

    
685
    @backend_method
686
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
687
        """Update the metadata associated with the object for the domain and return the new version."""
688

    
689
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
690
                     user, account, container, name, domain, meta, replace)
691
        self._can_write(user, account, container, name)
692
        path, node = self._lookup_object(account, container, name)
693
        src_version_id, dest_version_id = self._put_metadata(
694
            user, node, domain, meta, replace)
695
        self._apply_versioning(account, container, src_version_id)
696
        return dest_version_id
697

    
698
    @backend_method
699
    def get_object_permissions(self, user, account, container, name):
700
        """Return the action allowed on the object, the path
701
        from which the object gets its permissions from,
702
        along with a dictionary containing the permissions."""
703

    
704
        logger.debug("get_object_permissions: %s %s %s %s", user,
705
                     account, container, name)
706
        allowed = 'write'
707
        permissions_path = self._get_permissions_path(account, container, name)
708
        if user != account:
709
            if self.permissions.access_check(permissions_path, self.WRITE, user):
710
                allowed = 'write'
711
            elif self.permissions.access_check(permissions_path, self.READ, user):
712
                allowed = 'read'
713
            else:
714
                raise NotAllowedError
715
        self._lookup_object(account, container, name)
716
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
717

    
718
    @backend_method
719
    def update_object_permissions(self, user, account, container, name, permissions):
720
        """Update the permissions associated with the object."""
721

    
722
        logger.debug("update_object_permissions: %s %s %s %s %s",
723
                     user, account, container, name, permissions)
724
        if user != account:
725
            raise NotAllowedError
726
        path = self._lookup_object(account, container, name)[0]
727
        self._check_permissions(path, permissions)
728
        self.permissions.access_set(path, permissions)
729
        self._report_sharing_change(user, account, path, {'members':
730
                                    self.permissions.access_members(path)})
731

    
732
    @backend_method
733
    def get_object_public(self, user, account, container, name):
734
        """Return the public id of the object if applicable."""
735

    
736
        logger.debug(
737
            "get_object_public: %s %s %s %s", user, account, container, name)
738
        self._can_read(user, account, container, name)
739
        path = self._lookup_object(account, container, name)[0]
740
        p = self.permissions.public_get(path)
741
        if p is not None:
742
            p += ULTIMATE_ANSWER
743
        return p
744

    
745
    @backend_method
746
    def update_object_public(self, user, account, container, name, public):
747
        """Update the public status of the object."""
748

    
749
        logger.debug("update_object_public: %s %s %s %s %s", user,
750
                     account, container, name, public)
751
        self._can_write(user, account, container, name)
752
        path = self._lookup_object(account, container, name)[0]
753
        if not public:
754
            self.permissions.public_unset(path)
755
        else:
756
            self.permissions.public_set(path)
757

    
758
    @backend_method
759
    def get_object_hashmap(self, user, account, container, name, version=None):
760
        """Return the object's size and a list with partial hashes."""
761

    
762
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
763
                     account, container, name, version)
764
        self._can_read(user, account, container, name)
765
        path, node = self._lookup_object(account, container, name)
766
        props = self._get_version(node, version)
767
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
768
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
769

    
770
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
771
        if permissions is not None and user != account:
772
            raise NotAllowedError
773
        self._can_write(user, account, container, name)
774
        if permissions is not None:
775
            path = '/'.join((account, container, name))
776
            self._check_permissions(path, permissions)
777

    
778
        account_path, account_node = self._lookup_account(account, True)
779
        container_path, container_node = self._lookup_container(
780
            account, container)
781
        path, node = self._put_object_node(
782
            container_path, container_node, name)
783
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
784

    
785
        # Handle meta.
786
        if src_version_id is None:
787
            src_version_id = pre_version_id
788
        self._put_metadata_duplicate(
789
            src_version_id, dest_version_id, domain, meta, replace_meta)
790

    
791
        # Check quota.
792
        del_size = self._apply_versioning(account, container, pre_version_id)
793
        size_delta = size - del_size
794
        if size_delta > 0:
795
            account_quota = long(self._get_policy(account_node)['quota'])
796
            container_quota = long(self._get_policy(container_node)['quota'])
797
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
798
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
799
                # This must be executed in a transaction, so the version is never created if it fails.
800
                raise QuotaError
801
        self._report_size_change(user, account, size_delta, {
802
                                 'action': 'object update', 'path': path})
803

    
804
        if permissions is not None:
805
            self.permissions.access_set(path, permissions)
806
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
807

    
808
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
809
        return dest_version_id
810

    
811
    @backend_method
812
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
813
        """Create/update an object with the specified size and partial hashes."""
814

    
815
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
816
                     account, container, name, size, type, hashmap, checksum)
817
        if size == 0:  # No such thing as an empty hashmap.
818
            hashmap = [self.put_block('')]
819
        map = HashMap(self.block_size, self.hash_algorithm)
820
        map.extend([binascii.unhexlify(x) for x in hashmap])
821
        missing = self.store.block_search(map)
822
        if missing:
823
            ie = IndexError()
824
            ie.data = [binascii.hexlify(x) for x in missing]
825
            raise ie
826

    
827
        hash = map.hash()
828
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
829
        self.store.map_put(hash, map)
830
        return dest_version_id
831

    
832
    @backend_method
833
    def update_object_checksum(self, user, account, container, name, version, checksum):
834
        """Update an object's checksum."""
835

    
836
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
837
                     user, account, container, name, version, checksum)
838
        # Update objects with greater version and same hashmap and size (fix metadata updates).
839
        self._can_write(user, account, container, name)
840
        path, node = self._lookup_object(account, container, name)
841
        props = self._get_version(node, version)
842
        versions = self.node.node_get_versions(node)
843
        for x in versions:
844
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
845
                self.node.version_put_property(
846
                    x[self.SERIAL], 'checksum', checksum)
847

    
848
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
849
        dest_version_ids = []
850
        self._can_read(user, src_account, src_container, src_name)
851
        path, node = self._lookup_object(src_account, src_container, src_name)
852
        # TODO: Will do another fetch of the properties in duplicate version...
853
        props = self._get_version(
854
            node, src_version)  # Check to see if source exists.
855
        src_version_id = props[self.SERIAL]
856
        hash = props[self.HASH]
857
        size = props[self.SIZE]
858
        is_copy = not is_move and (src_account, src_container, src_name) != (
859
            dest_account, dest_container, dest_name)  # New uuid.
860
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
861
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
862
            self._delete_object(user, src_account, src_container, src_name)
863

    
864
        if delimiter:
865
            prefix = src_name + \
866
                delimiter if not src_name.endswith(delimiter) else src_name
867
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
868
            src_names.sort(key=lambda x: x[2])  # order by nodes
869
            paths = [elem[0] for elem in src_names]
870
            nodes = [elem[2] for elem in src_names]
871
            # TODO: Will do another fetch of the properties in duplicate version...
872
            props = self._get_versions(nodes)  # Check to see if source exists.
873

    
874
            for prop, path, node in zip(props, paths, nodes):
875
                src_version_id = prop[self.SERIAL]
876
                hash = prop[self.HASH]
877
                vtype = prop[self.TYPE]
878
                size = prop[self.SIZE]
879
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
880
                    delimiter) else dest_name
881
                vdest_name = path.replace(prefix, dest_prefix, 1)
882
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
883
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
884
                    self._delete_object(user, src_account, src_container, path)
885
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
886

    
887
    @backend_method
888
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
889
        """Copy an object's data and metadata."""
890

    
891
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
892
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
893
        return dest_version_id
894

    
895
    @backend_method
896
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
897
        """Move an object's data and metadata."""
898

    
899
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
900
        if user != src_account:
901
            raise NotAllowedError
902
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
903
        return dest_version_id
904

    
905
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
906
        if user != account:
907
            raise NotAllowedError
908

    
909
        if until is not None:
910
            path = '/'.join((account, container, name))
911
            node = self.node.node_lookup(path)
912
            if node is None:
913
                return
914
            hashes = []
915
            size = 0
916
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
917
            hashes += h
918
            size += s
919
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
920
            hashes += h
921
            size += s
922
            for h in hashes:
923
                self.store.map_delete(h)
924
            self.node.node_purge(node, until, CLUSTER_DELETED)
925
            try:
926
                props = self._get_version(node)
927
            except NameError:
928
                self.permissions.access_clear(path)
929
            self._report_size_change(user, account, -size, {
930
                                     'action': 'object purge', 'path': path})
931
            return
932

    
933
        path, node = self._lookup_object(account, container, name)
934
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
935
        del_size = self._apply_versioning(account, container, src_version_id)
936
        if del_size:
937
            self._report_size_change(user, account, -del_size, {
938
                                     'action': 'object delete', 'path': path})
939
        self._report_object_change(
940
            user, account, path, details={'action': 'object delete'})
941
        self.permissions.access_clear(path)
942

    
943
        if delimiter:
944
            prefix = name + delimiter if not name.endswith(delimiter) else name
945
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
946
            paths = []
947
            for t in src_names:
948
                path = '/'.join((account, container, t[0]))
949
                node = t[2]
950
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
951
                del_size = self._apply_versioning(
952
                    account, container, src_version_id)
953
                if del_size:
954
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
955
                self._report_object_change(
956
                    user, account, path, details={'action': 'object delete'})
957
                paths.append(path)
958
            self.permissions.access_clear_bulk(paths)
959

    
960
    @backend_method
961
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
962
        """Delete/purge an object."""
963

    
964
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
965
                     account, container, name, until, prefix, delimiter)
966
        self._delete_object(user, account, container, name, until, delimiter)
967

    
968
    @backend_method
969
    def list_versions(self, user, account, container, name):
970
        """Return a list of all (version, version_timestamp) tuples for an object."""
971

    
972
        logger.debug(
973
            "list_versions: %s %s %s %s", user, account, container, name)
974
        self._can_read(user, account, container, name)
975
        path, node = self._lookup_object(account, container, name)
976
        versions = self.node.node_get_versions(node)
977
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
978

    
979
    @backend_method
980
    def get_uuid(self, user, uuid):
981
        """Return the (account, container, name) for the UUID given."""
982

    
983
        logger.debug("get_uuid: %s %s", user, uuid)
984
        info = self.node.latest_uuid(uuid)
985
        if info is None:
986
            raise NameError
987
        path, serial = info
988
        account, container, name = path.split('/', 2)
989
        self._can_read(user, account, container, name)
990
        return (account, container, name)
991

    
992
    @backend_method
993
    def get_public(self, user, public):
994
        """Return the (account, container, name) for the public id given."""
995

    
996
        logger.debug("get_public: %s %s", user, public)
997
        if public is None or public < ULTIMATE_ANSWER:
998
            raise NameError
999
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1000
        if path is None:
1001
            raise NameError
1002
        account, container, name = path.split('/', 2)
1003
        self._can_read(user, account, container, name)
1004
        return (account, container, name)
1005

    
1006
    @backend_method(autocommit=0)
1007
    def get_block(self, hash):
1008
        """Return a block's data."""
1009

    
1010
        logger.debug("get_block: %s", hash)
1011
        block = self.store.block_get(binascii.unhexlify(hash))
1012
        if not block:
1013
            raise ItemNotExists('Block does not exist')
1014
        return block
1015

    
1016
    @backend_method(autocommit=0)
1017
    def put_block(self, data):
1018
        """Store a block and return the hash."""
1019

    
1020
        logger.debug("put_block: %s", len(data))
1021
        return binascii.hexlify(self.store.block_put(data))
1022

    
1023
    @backend_method(autocommit=0)
1024
    def update_block(self, hash, data, offset=0):
1025
        """Update a known block and return the hash."""
1026

    
1027
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1028
        if offset == 0 and len(data) == self.block_size:
1029
            return self.put_block(data)
1030
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1031
        return binascii.hexlify(h)
1032

    
1033
    # Path functions.
1034

    
1035
    def _generate_uuid(self):
1036
        return str(uuidlib.uuid4())
1037

    
1038
    def _put_object_node(self, path, parent, name):
1039
        path = '/'.join((path, name))
1040
        node = self.node.node_lookup(path)
1041
        if node is None:
1042
            node = self.node.node_create(parent, path)
1043
        return path, node
1044

    
1045
    def _put_path(self, user, parent, path):
1046
        node = self.node.node_create(parent, path)
1047
        self.node.version_create(node, None, 0, '', None, user,
1048
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1049
        return node
1050

    
1051
    def _lookup_account(self, account, create=True):
1052
        node = self.node.node_lookup(account)
1053
        if node is None and create:
1054
            node = self._put_path(
1055
                account, self.ROOTNODE, account)  # User is account.
1056
        return account, node
1057

    
1058
    def _lookup_container(self, account, container):
1059
        path = '/'.join((account, container))
1060
        node = self.node.node_lookup(path)
1061
        if node is None:
1062
            raise ItemNotExists('Container does not exist')
1063
        return path, node
1064

    
1065
    def _lookup_object(self, account, container, name):
1066
        path = '/'.join((account, container, name))
1067
        node = self.node.node_lookup(path)
1068
        if node is None:
1069
            raise ItemNotExists('Object does not exist')
1070
        return path, node
1071

    
1072
    def _lookup_objects(self, paths):
1073
        nodes = self.node.node_lookup_bulk(paths)
1074
        return paths, nodes
1075

    
1076
    def _get_properties(self, node, until=None):
1077
        """Return properties until the timestamp given."""
1078

    
1079
        before = until if until is not None else inf
1080
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1081
        if props is None and until is not None:
1082
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1083
        if props is None:
1084
            raise ItemNotExists('Path does not exist')
1085
        return props
1086

    
1087
    def _get_statistics(self, node, until=None):
1088
        """Return count, sum of size and latest timestamp of everything under node."""
1089

    
1090
        if until is None:
1091
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1092
        else:
1093
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1094
        if stats is None:
1095
            stats = (0, 0, 0)
1096
        return stats
1097

    
1098
    def _get_version(self, node, version=None):
1099
        if version is None:
1100
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1101
            if props is None:
1102
                raise ItemNotExists('Object does not exist')
1103
        else:
1104
            try:
1105
                version = int(version)
1106
            except ValueError:
1107
                raise VersionNotExists('Version does not exist')
1108
            props = self.node.version_get_properties(version)
1109
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1110
                raise VersionNotExists('Version does not exist')
1111
        return props
1112

    
1113
    def _get_versions(self, nodes):
1114
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1115

    
1116
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1117
        """Create a new version of the node."""
1118

    
1119
        props = self.node.version_lookup(
1120
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1121
        if props is not None:
1122
            src_version_id = props[self.SERIAL]
1123
            src_hash = props[self.HASH]
1124
            src_size = props[self.SIZE]
1125
            src_type = props[self.TYPE]
1126
            src_checksum = props[self.CHECKSUM]
1127
        else:
1128
            src_version_id = None
1129
            src_hash = None
1130
            src_size = 0
1131
            src_type = ''
1132
            src_checksum = ''
1133
        if size is None:  # Set metadata.
1134
            hash = src_hash  # This way hash can be set to None (account or container).
1135
            size = src_size
1136
        if type is None:
1137
            type = src_type
1138
        if checksum is None:
1139
            checksum = src_checksum
1140
        uuid = self._generate_uuid(
1141
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1142

    
1143
        if src_node is None:
1144
            pre_version_id = src_version_id
1145
        else:
1146
            pre_version_id = None
1147
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1148
            if props is not None:
1149
                pre_version_id = props[self.SERIAL]
1150
        if pre_version_id is not None:
1151
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1152

    
1153
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1154
        return pre_version_id, dest_version_id
1155

    
1156
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1157
        if src_version_id is not None:
1158
            self.node.attribute_copy(src_version_id, dest_version_id)
1159
        if not replace:
1160
            self.node.attribute_del(dest_version_id, domain, (
1161
                k for k, v in meta.iteritems() if v == ''))
1162
            self.node.attribute_set(dest_version_id, domain, (
1163
                (k, v) for k, v in meta.iteritems() if v != ''))
1164
        else:
1165
            self.node.attribute_del(dest_version_id, domain)
1166
            self.node.attribute_set(dest_version_id, domain, ((
1167
                k, v) for k, v in meta.iteritems()))
1168

    
1169
    def _put_metadata(self, user, node, domain, meta, replace=False):
1170
        """Create a new version and store metadata."""
1171

    
1172
        src_version_id, dest_version_id = self._put_version_duplicate(
1173
            user, node)
1174
        self._put_metadata_duplicate(
1175
            src_version_id, dest_version_id, domain, meta, replace)
1176
        return src_version_id, dest_version_id
1177

    
1178
    def _list_limits(self, listing, marker, limit):
1179
        start = 0
1180
        if marker:
1181
            try:
1182
                start = listing.index(marker) + 1
1183
            except ValueError:
1184
                pass
1185
        if not limit or limit > 10000:
1186
            limit = 10000
1187
        return start, limit
1188

    
1189
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1190
        cont_prefix = path + '/'
1191
        prefix = cont_prefix + prefix
1192
        start = cont_prefix + marker if marker else None
1193
        before = until if until is not None else inf
1194
        filterq = keys if domain else []
1195
        sizeq = size_range
1196

    
1197
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1198
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1199
        objects.sort(key=lambda x: x[0])
1200
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1201
        return objects
1202

    
1203
    # Reporting functions.
1204

    
1205
    def _report_size_change(self, user, account, size, details={}):
1206
        account_node = self._lookup_account(account, True)[1]
1207
        total = self._get_statistics(account_node)[1]
1208
        details.update({'user': user, 'total': total})
1209
        logger.debug(
1210
            "_report_size_change: %s %s %s %s", user, account, size, details)
1211
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1212
                                                  account,
1213
                                                  QUEUE_INSTANCE_ID,
1214
                                                  'diskspace',
1215
                                                  float(size),
1216
                                                  details))
1217

    
1218
    def _report_object_change(self, user, account, path, details={}):
1219
        details.update({'user': user})
1220
        logger.debug("_report_object_change: %s %s %s %s", user,
1221
                     account, path, details)
1222
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
1223
            'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1224

    
1225
    def _report_sharing_change(self, user, account, path, details={}):
1226
        logger.debug("_report_permissions_change: %s %s %s %s",
1227
                     user, account, path, details)
1228
        details.update({'user': user})
1229
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
1230
                             QUEUE_INSTANCE_ID, 'sharing', path, details))
1231

    
1232
    # Policy functions.
1233

    
1234
    def _check_policy(self, policy):
1235
        for k in policy.keys():
1236
            if policy[k] == '':
1237
                policy[k] = self.default_policy.get(k)
1238
        for k, v in policy.iteritems():
1239
            if k == 'quota':
1240
                q = int(v)  # May raise ValueError.
1241
                if q < 0:
1242
                    raise ValueError
1243
            elif k == 'versioning':
1244
                if v not in ['auto', 'none']:
1245
                    raise ValueError
1246
            else:
1247
                raise ValueError
1248

    
1249
    def _put_policy(self, node, policy, replace):
1250
        if replace:
1251
            for k, v in self.default_policy.iteritems():
1252
                if k not in policy:
1253
                    policy[k] = v
1254
        self.node.policy_set(node, policy)
1255

    
1256
    def _get_policy(self, node):
1257
        policy = self.default_policy.copy()
1258
        policy.update(self.node.policy_get(node))
1259
        return policy
1260

    
1261
    def _apply_versioning(self, account, container, version_id):
1262
        """Delete the provided version if such is the policy.
1263
           Return size of object removed.
1264
        """
1265

    
1266
        if version_id is None:
1267
            return 0
1268
        path, node = self._lookup_container(account, container)
1269
        versioning = self._get_policy(node)['versioning']
1270
        if versioning != 'auto':
1271
            hash, size = self.node.version_remove(version_id)
1272
            self.store.map_delete(hash)
1273
            return size
1274
        return 0
1275

    
1276
    # Access control functions.
1277

    
1278
    def _check_groups(self, groups):
1279
        # raise ValueError('Bad characters in groups')
1280
        pass
1281

    
1282
    def _check_permissions(self, path, permissions):
1283
        # raise ValueError('Bad characters in permissions')
1284
        pass
1285

    
1286
    def _get_formatted_paths(self, paths):
1287
        formatted = []
1288
        for p in paths:
1289
            node = self.node.node_lookup(p)
1290
            props = None
1291
            if node is not None:
1292
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1293
            if props is not None:
1294
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1295
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1296
                formatted.append((p, self.MATCH_EXACT))
1297
        return formatted
1298

    
1299
    def _get_permissions_path(self, account, container, name):
1300
        path = '/'.join((account, container, name))
1301
        permission_paths = self.permissions.access_inherit(path)
1302
        permission_paths.sort()
1303
        permission_paths.reverse()
1304
        for p in permission_paths:
1305
            if p == path:
1306
                return p
1307
            else:
1308
                if p.count('/') < 2:
1309
                    continue
1310
                node = self.node.node_lookup(p)
1311
                if node is not None:
1312
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1313
                if props is not None:
1314
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1315
                        return p
1316
        return None
1317

    
1318
    def _can_read(self, user, account, container, name):
1319
        if user == account:
1320
            return True
1321
        path = '/'.join((account, container, name))
1322
        if self.permissions.public_get(path) is not None:
1323
            return True
1324
        path = self._get_permissions_path(account, container, name)
1325
        if not path:
1326
            raise NotAllowedError
1327
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1328
            raise NotAllowedError
1329

    
1330
    def _can_write(self, user, account, container, name):
1331
        if user == account:
1332
            return True
1333
        path = '/'.join((account, container, name))
1334
        path = self._get_permissions_path(account, container, name)
1335
        if not path:
1336
            raise NotAllowedError
1337
        if not self.permissions.access_check(path, self.WRITE, user):
1338
            raise NotAllowedError
1339

    
1340
    def _allowed_accounts(self, user):
1341
        allow = set()
1342
        for path in self.permissions.access_list_paths(user):
1343
            allow.add(path.split('/', 1)[0])
1344
        return sorted(allow)
1345

    
1346
    def _allowed_containers(self, user, account):
1347
        allow = set()
1348
        for path in self.permissions.access_list_paths(user, account):
1349
            allow.add(path.split('/', 2)[1])
1350
        return sorted(allow)