Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 47462eda

History | View | Annotate | Download (57.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46
class HashMap(list):
47

    
48
    def __init__(self, blocksize, blockhash):
49
        super(HashMap, self).__init__()
50
        self.blocksize = blocksize
51
        self.blockhash = blockhash
52

    
53
    def _hash_raw(self, v):
54
        h = hashlib.new(self.blockhash)
55
        h.update(v)
56
        return h.digest()
57

    
58
    def hash(self):
59
        if len(self) == 0:
60
            return self._hash_raw('')
61
        if len(self) == 1:
62
            return self.__getitem__(0)
63

    
64
        h = list(self)
65
        s = 2
66
        while s < len(h):
67
            s = s * 2
68
        h += [('\x00' * len(h[0]))] * (s - len(h))
69
        while len(h) > 1:
70
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71
        return h[0]
72

    
73
# Default modules and settings.
74
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77
DEFAULT_BLOCK_PATH = 'data/'
78
DEFAULT_BLOCK_UMASK = 0o022
79
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
82

    
83
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
84
QUEUE_CLIENT_ID = 'pithos'
85
QUEUE_INSTANCE_ID = '1'
86

    
87
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
88

    
89
inf = float('inf')
90

    
91
ULTIMATE_ANSWER = 42
92

    
93

    
94
logger = logging.getLogger(__name__)
95

    
96

    
97
def backend_method(func=None, autocommit=1):
98
    if func is None:
99
        def fn(func):
100
            return backend_method(func, autocommit)
101
        return fn
102

    
103
    if not autocommit:
104
        return func
105
    def fn(self, *args, **kw):
106
        self.wrapper.execute()
107
        try:
108
            self.messages = []
109
            ret = func(self, *args, **kw)
110
            for m in self.messages:
111
                self.queue.send(*m)
112
            self.wrapper.commit()
113
            return ret
114
        except:
115
            self.wrapper.rollback()
116
            raise
117
    return fn
118

    
119

    
120
class ModularBackend(BaseBackend):
121
    """A modular backend.
122

123
    Uses modules for SQL functions and storage.
124
    """
125

    
126
    def __init__(self, db_module=None, db_connection=None,
127
                 block_module=None, block_path=None, block_umask=None,
128
                 queue_module=None, queue_connection=None,
129
                 block_params=None):
130
        db_module = db_module or DEFAULT_DB_MODULE
131
        db_connection = db_connection or DEFAULT_DB_CONNECTION
132
        block_module = block_module or DEFAULT_BLOCK_MODULE
133
        block_path = block_path or DEFAULT_BLOCK_PATH
134
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
135
        block_params = block_params or DEFAULT_BLOCK_PARAMS
136
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
138

    
139
        self.hash_algorithm = 'sha256'
140
        self.block_size = 4 * 1024 * 1024 # 4MB
141

    
142
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
143

    
144
        def load_module(m):
145
            __import__(m)
146
            return sys.modules[m]
147

    
148
        self.db_module = load_module(db_module)
149
        self.wrapper = self.db_module.DBWrapper(db_connection)
150
        params = {'wrapper': self.wrapper}
151
        self.permissions = self.db_module.Permissions(**params)
152
        for x in ['READ', 'WRITE']:
153
            setattr(self, x, getattr(self.db_module, x))
154
        self.node = self.db_module.Node(**params)
155
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
156
            setattr(self, x, getattr(self.db_module, x))
157

    
158
        self.block_module = load_module(block_module)
159
        self.block_params = block_params
160
        params = {'path': block_path,
161
                  'block_size': self.block_size,
162
                  'hash_algorithm': self.hash_algorithm,
163
                  'umask': block_umask}
164
        params.update(self.block_params)
165
        self.store = self.block_module.Store(**params)
166

    
167
        if queue_module and queue_connection:
168
            self.queue_module = load_module(queue_module)
169
            params = {'exchange': queue_connection,
170
                      'client_id': QUEUE_CLIENT_ID}
171
            self.queue = self.queue_module.Queue(**params)
172
        else:
173
            class NoQueue:
174
                def send(self, *args):
175
                    pass
176

    
177
                def close(self):
178
                    pass
179

    
180
            self.queue = NoQueue()
181

    
182
    def close(self):
183
        self.wrapper.close()
184
        self.queue.close()
185

    
186
    @backend_method
187
    def list_accounts(self, user, marker=None, limit=10000):
188
        """Return a list of accounts the user can access."""
189

    
190
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
191
        allowed = self._allowed_accounts(user)
192
        start, limit = self._list_limits(allowed, marker, limit)
193
        return allowed[start:start + limit]
194

    
195
    @backend_method
196
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
197
        """Return a dictionary with the account metadata for the domain."""
198

    
199
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
200
        path, node = self._lookup_account(account, user == account)
201
        if user != account:
202
            if until or node is None or account not in self._allowed_accounts(user):
203
                raise NotAllowedError
204
        try:
205
            props = self._get_properties(node, until)
206
            mtime = props[self.MTIME]
207
        except NameError:
208
            props = None
209
            mtime = until
210
        count, bytes, tstamp = self._get_statistics(node, until)
211
        tstamp = max(tstamp, mtime)
212
        if until is None:
213
            modified = tstamp
214
        else:
215
            modified = self._get_statistics(node)[2] # Overall last modification.
216
            modified = max(modified, mtime)
217

    
218
        if user != account:
219
            meta = {'name': account}
220
        else:
221
            meta = {}
222
            if props is not None and include_user_defined:
223
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
224
            if until is not None:
225
                meta.update({'until_timestamp': tstamp})
226
            meta.update({'name': account, 'count': count, 'bytes': bytes})
227
        meta.update({'modified': modified})
228
        return meta
229

    
230
    @backend_method
231
    def update_account_meta(self, user, account, domain, meta, replace=False):
232
        """Update the metadata associated with the account for the domain."""
233

    
234
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
235
        if user != account:
236
            raise NotAllowedError
237
        path, node = self._lookup_account(account, True)
238
        self._put_metadata(user, node, domain, meta, replace)
239

    
240
    @backend_method
241
    def get_account_groups(self, user, account):
242
        """Return a dictionary with the user groups defined for this account."""
243

    
244
        logger.debug("get_account_groups: %s %s", user, account)
245
        if user != account:
246
            if account not in self._allowed_accounts(user):
247
                raise NotAllowedError
248
            return {}
249
        self._lookup_account(account, True)
250
        return self.permissions.group_dict(account)
251

    
252
    @backend_method
253
    def update_account_groups(self, user, account, groups, replace=False):
254
        """Update the groups associated with the account."""
255

    
256
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
257
        if user != account:
258
            raise NotAllowedError
259
        self._lookup_account(account, True)
260
        self._check_groups(groups)
261
        if replace:
262
            self.permissions.group_destroy(account)
263
        for k, v in groups.iteritems():
264
            if not replace: # If not already deleted.
265
                self.permissions.group_delete(account, k)
266
            if v:
267
                self.permissions.group_addmany(account, k, v)
268

    
269
    @backend_method
270
    def get_account_policy(self, user, account):
271
        """Return a dictionary with the account policy."""
272

    
273
        logger.debug("get_account_policy: %s %s", user, account)
274
        if user != account:
275
            if account not in self._allowed_accounts(user):
276
                raise NotAllowedError
277
            return {}
278
        path, node = self._lookup_account(account, True)
279
        return self._get_policy(node)
280

    
281
    @backend_method
282
    def update_account_policy(self, user, account, policy, replace=False):
283
        """Update the policy associated with the account."""
284

    
285
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
286
        if user != account:
287
            raise NotAllowedError
288
        path, node = self._lookup_account(account, True)
289
        self._check_policy(policy)
290
        self._put_policy(node, policy, replace)
291

    
292
    @backend_method
293
    def put_account(self, user, account, policy={}):
294
        """Create a new account with the given name."""
295

    
296
        logger.debug("put_account: %s %s %s", user, account, policy)
297
        if user != account:
298
            raise NotAllowedError
299
        node = self.node.node_lookup(account)
300
        if node is not None:
301
            raise AccountExists('Account already exists')
302
        if policy:
303
            self._check_policy(policy)
304
        node = self._put_path(user, self.ROOTNODE, account)
305
        self._put_policy(node, policy, True)
306

    
307
    @backend_method
308
    def delete_account(self, user, account):
309
        """Delete the account with the given name."""
310

    
311
        logger.debug("delete_account: %s %s", user, account)
312
        if user != account:
313
            raise NotAllowedError
314
        node = self.node.node_lookup(account)
315
        if node is None:
316
            return
317
        if not self.node.node_remove(node):
318
            raise AccountNotEmpty('Account is not empty')
319
        self.permissions.group_destroy(account)
320

    
321
    @backend_method
322
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
323
        """Return a list of containers existing under an account."""
324

    
325
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
326
        if user != account:
327
            if until or account not in self._allowed_accounts(user):
328
                raise NotAllowedError
329
            allowed = self._allowed_containers(user, account)
330
            start, limit = self._list_limits(allowed, marker, limit)
331
            return allowed[start:start + limit]
332
        if shared or public:
333
            allowed = set()
334
            if shared:
335
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
336
            if public:
337
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
338
            allowed = sorted(allowed)
339
            start, limit = self._list_limits(allowed, marker, limit)
340
            return allowed[start:start + limit]
341
        node = self.node.node_lookup(account)
342
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
343
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
344
        return containers[start:start + limit]
345

    
346
    @backend_method
347
    def list_container_meta(self, user, account, container, domain, until=None):
348
        """Return a list with all the container's object meta keys for the domain."""
349

    
350
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
351
        allowed = []
352
        if user != account:
353
            if until:
354
                raise NotAllowedError
355
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
356
            if not allowed:
357
                raise NotAllowedError
358
        path, node = self._lookup_container(account, container)
359
        before = until if until is not None else inf
360
        allowed = self._get_formatted_paths(allowed)
361
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
362

    
363
    @backend_method
364
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
365
        """Return a dictionary with the container metadata for the domain."""
366

    
367
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
368
        if user != account:
369
            if until or container not in self._allowed_containers(user, account):
370
                raise NotAllowedError
371
        path, node = self._lookup_container(account, container)
372
        props = self._get_properties(node, until)
373
        mtime = props[self.MTIME]
374
        count, bytes, tstamp = self._get_statistics(node, until)
375
        tstamp = max(tstamp, mtime)
376
        if until is None:
377
            modified = tstamp
378
        else:
379
            modified = self._get_statistics(node)[2] # Overall last modification.
380
            modified = max(modified, mtime)
381

    
382
        if user != account:
383
            meta = {'name': container}
384
        else:
385
            meta = {}
386
            if include_user_defined:
387
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
388
            if until is not None:
389
                meta.update({'until_timestamp': tstamp})
390
            meta.update({'name': container, 'count': count, 'bytes': bytes})
391
        meta.update({'modified': modified})
392
        return meta
393

    
394
    @backend_method
395
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
396
        """Update the metadata associated with the container for the domain."""
397

    
398
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
399
        if user != account:
400
            raise NotAllowedError
401
        path, node = self._lookup_container(account, container)
402
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
403
        if src_version_id is not None:
404
            versioning = self._get_policy(node)['versioning']
405
            if versioning != 'auto':
406
                self.node.version_remove(src_version_id)
407

    
408
    @backend_method
409
    def get_container_policy(self, user, account, container):
410
        """Return a dictionary with the container policy."""
411

    
412
        logger.debug("get_container_policy: %s %s %s", user, account, container)
413
        if user != account:
414
            if container not in self._allowed_containers(user, account):
415
                raise NotAllowedError
416
            return {}
417
        path, node = self._lookup_container(account, container)
418
        return self._get_policy(node)
419

    
420
    @backend_method
421
    def update_container_policy(self, user, account, container, policy, replace=False):
422
        """Update the policy associated with the container."""
423

    
424
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
425
        if user != account:
426
            raise NotAllowedError
427
        path, node = self._lookup_container(account, container)
428
        self._check_policy(policy)
429
        self._put_policy(node, policy, replace)
430

    
431
    @backend_method
432
    def put_container(self, user, account, container, policy={}):
433
        """Create a new container with the given name."""
434

    
435
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
436
        if user != account:
437
            raise NotAllowedError
438
        try:
439
            path, node = self._lookup_container(account, container)
440
        except NameError:
441
            pass
442
        else:
443
            raise ContainerExists('Container already exists')
444
        if policy:
445
            self._check_policy(policy)
446
        path = '/'.join((account, container))
447
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
448
        self._put_policy(node, policy, True)
449

    
450
    @backend_method
451
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
452
        """Delete/purge the container with the given name."""
453

    
454
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
455
        if user != account:
456
            raise NotAllowedError
457
        path, node = self._lookup_container(account, container)
458

    
459
        if until is not None:
460
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
461
            for h in hashes:
462
                self.store.map_delete(h)
463
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
464
            self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path})
465
            return
466

    
467
        if not delimiter:
468
            if self._get_statistics(node)[0] > 0:
469
                raise ContainerNotEmpty('Container is not empty')
470
            hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
471
            for h in hashes:
472
                self.store.map_delete(h)
473
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
474
            self.node.node_remove(node)
475
            self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path})
476
        else:
477
                # remove only contents
478
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
479
            paths = []
480
            for t in src_names:
481
                path = '/'.join((account, container, t[0]))
482
                node = t[2]
483
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
484
                del_size = self._apply_versioning(account, container, src_version_id)
485
                if del_size:
486
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
487
                self._report_object_change(user, account, path, details={'action': 'object delete'})
488
                paths.append(path)
489
            self.permissions.access_clear_bulk(paths)
490

    
491
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
492
        if user != account and until:
493
            raise NotAllowedError
494
        if shared and public:
495
            # get shared first
496
            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
497
            objects = set()
498
            if shared:
499
                path, node = self._lookup_container(account, container)
500
                shared = self._get_formatted_paths(shared)
501
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
502

    
503
            # get public
504
            objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
505
            objects = list(objects)
506

    
507
            objects.sort(key=lambda x: x[0])
508
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
509
            return objects[start:start + limit]
510
        elif public:
511
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
512
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
513
            return objects[start:start + limit]
514

    
515
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
516
        if shared and not allowed:
517
            return []
518
        path, node = self._lookup_container(account, container)
519
        allowed = self._get_formatted_paths(allowed)
520
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
521
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
522
        return objects[start:start + limit]
523

    
524
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
525
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
526
        paths, nodes = self._lookup_objects(public)
527
        path = '/'.join((account, container))
528
        cont_prefix = path + '/'
529
        paths = [x[len(cont_prefix):] for x in paths]
530
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
531
        objects = [(path,) + props for path, props in zip(paths, props)]
532
        return objects
533

    
534
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
535
        objects = []
536
        while True:
537
            marker = objects[-1] if objects else None
538
            limit = 10000
539
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
540
            objects.extend(l)
541
            if not l or len(l) < limit:
542
                break
543
        return objects
544

    
545
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
546
        allowed = []
547
        path = '/'.join((account, container, prefix)).rstrip('/')
548
        if user != account:
549
            allowed = self.permissions.access_list_paths(user, path)
550
            if not allowed:
551
                raise NotAllowedError
552
        else:
553
            allowed = set()
554
            if shared:
555
                allowed.update(self.permissions.access_list_shared(path))
556
            if public:
557
                allowed.update([x[0] for x in self.permissions.public_list(path)])
558
            allowed = sorted(allowed)
559
            if not allowed:
560
                return []
561
        return allowed
562

    
563
    @backend_method
564
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
565
        """Return a list of object (name, version_id) tuples existing under a container."""
566

    
567
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
568
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
569

    
570
    @backend_method
571
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
572
        """Return a list of object metadata dicts existing under a container."""
573

    
574
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
575
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
576
        objects = []
577
        for p in props:
578
            if len(p) == 2:
579
                objects.append({'subdir': p[0]})
580
            else:
581
                objects.append({'name': p[0],
582
                                'bytes': p[self.SIZE + 1],
583
                                'type': p[self.TYPE + 1],
584
                                'hash': p[self.HASH + 1],
585
                                'version': p[self.SERIAL + 1],
586
                                'version_timestamp': p[self.MTIME + 1],
587
                                'modified': p[self.MTIME + 1] if until is None else None,
588
                                'modified_by': p[self.MUSER + 1],
589
                                'uuid': p[self.UUID + 1],
590
                                'checksum': p[self.CHECKSUM + 1]})
591
        return objects
592

    
593
    @backend_method
594
    def list_object_permissions(self, user, account, container, prefix=''):
595
        """Return a list of paths that enforce permissions under a container."""
596

    
597
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
598
        return self._list_object_permissions(user, account, container, prefix, True, False)
599

    
600
    @backend_method
601
    def list_object_public(self, user, account, container, prefix=''):
602
        """Return a dict mapping paths to public ids for objects that are public under a container."""
603

    
604
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
605
        public = {}
606
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
607
            public[path] = p + ULTIMATE_ANSWER
608
        return public
609

    
610
    @backend_method
611
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
612
        """Return a dictionary with the object metadata for the domain."""
613

    
614
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
615
        self._can_read(user, account, container, name)
616
        path, node = self._lookup_object(account, container, name)
617
        props = self._get_version(node, version)
618
        if version is None:
619
            modified = props[self.MTIME]
620
        else:
621
            try:
622
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
623
            except NameError: # Object may be deleted.
624
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
625
                if del_props is None:
626
                    raise ItemNotExists('Object does not exist')
627
                modified = del_props[self.MTIME]
628

    
629
        meta = {}
630
        if include_user_defined:
631
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
632
        meta.update({'name': name,
633
                     'bytes': props[self.SIZE],
634
                     'type': props[self.TYPE],
635
                     'hash': props[self.HASH],
636
                     'version': props[self.SERIAL],
637
                     'version_timestamp': props[self.MTIME],
638
                     'modified': modified,
639
                     'modified_by': props[self.MUSER],
640
                     'uuid': props[self.UUID],
641
                     'checksum': props[self.CHECKSUM]})
642
        return meta
643

    
644
    @backend_method
645
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
646
        """Update the metadata associated with the object for the domain and return the new version."""
647

    
648
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
649
        self._can_write(user, account, container, name)
650
        path, node = self._lookup_object(account, container, name)
651
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
652
        self._apply_versioning(account, container, src_version_id)
653
        return dest_version_id
654

    
655
    @backend_method
656
    def get_object_permissions(self, user, account, container, name):
657
        """Return the action allowed on the object, the path
658
        from which the object gets its permissions from,
659
        along with a dictionary containing the permissions."""
660

    
661
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
662
        allowed = 'write'
663
        permissions_path = self._get_permissions_path(account, container, name)
664
        if user != account:
665
            if self.permissions.access_check(permissions_path, self.WRITE, user):
666
                allowed = 'write'
667
            elif self.permissions.access_check(permissions_path, self.READ, user):
668
                allowed = 'read'
669
            else:
670
                raise NotAllowedError
671
        self._lookup_object(account, container, name)
672
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
673

    
674
    @backend_method
675
    def update_object_permissions(self, user, account, container, name, permissions):
676
        """Update the permissions associated with the object."""
677

    
678
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
679
        if user != account:
680
            raise NotAllowedError
681
        path = self._lookup_object(account, container, name)[0]
682
        self._check_permissions(path, permissions)
683
        self.permissions.access_set(path, permissions)
684
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
685

    
686
    @backend_method
687
    def get_object_public(self, user, account, container, name):
688
        """Return the public id of the object if applicable."""
689

    
690
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
691
        self._can_read(user, account, container, name)
692
        path = self._lookup_object(account, container, name)[0]
693
        p = self.permissions.public_get(path)
694
        if p is not None:
695
            p += ULTIMATE_ANSWER
696
        return p
697

    
698
    @backend_method
699
    def update_object_public(self, user, account, container, name, public):
700
        """Update the public status of the object."""
701

    
702
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
703
        self._can_write(user, account, container, name)
704
        path = self._lookup_object(account, container, name)[0]
705
        if not public:
706
            self.permissions.public_unset(path)
707
        else:
708
            self.permissions.public_set(path)
709

    
710
    @backend_method
711
    def get_object_hashmap(self, user, account, container, name, version=None):
712
        """Return the object's size and a list with partial hashes."""
713

    
714
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
715
        self._can_read(user, account, container, name)
716
        path, node = self._lookup_object(account, container, name)
717
        props = self._get_version(node, version)
718
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
719
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
720

    
721
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
722
        if permissions is not None and user != account:
723
            raise NotAllowedError
724
        self._can_write(user, account, container, name)
725
        if permissions is not None:
726
            path = '/'.join((account, container, name))
727
            self._check_permissions(path, permissions)
728

    
729
        account_path, account_node = self._lookup_account(account, True)
730
        container_path, container_node = self._lookup_container(account, container)
731
        path, node = self._put_object_node(container_path, container_node, name)
732
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
733

    
734
        # Handle meta.
735
        if src_version_id is None:
736
            src_version_id = pre_version_id
737
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
738

    
739
        # Check quota.
740
        del_size = self._apply_versioning(account, container, pre_version_id)
741
        size_delta = size - del_size
742
        if size_delta > 0:
743
            account_quota = long(self._get_policy(account_node)['quota'])
744
            container_quota = long(self._get_policy(container_node)['quota'])
745
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
746
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
747
                # This must be executed in a transaction, so the version is never created if it fails.
748
                raise QuotaError
749
        self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path})
750

    
751
        if permissions is not None:
752
            self.permissions.access_set(path, permissions)
753
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
754

    
755
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
756
        return dest_version_id
757

    
758
    @backend_method
759
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
760
        """Create/update an object with the specified size and partial hashes."""
761

    
762
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
763
        if size == 0: # No such thing as an empty hashmap.
764
            hashmap = [self.put_block('')]
765
        map = HashMap(self.block_size, self.hash_algorithm)
766
        map.extend([binascii.unhexlify(x) for x in hashmap])
767
        missing = self.store.block_search(map)
768
        if missing:
769
            ie = IndexError()
770
            ie.data = [binascii.hexlify(x) for x in missing]
771
            raise ie
772

    
773
        hash = map.hash()
774
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
775
        self.store.map_put(hash, map)
776
        return dest_version_id
777

    
778
    @backend_method
779
    def update_object_checksum(self, user, account, container, name, version, checksum):
780
        """Update an object's checksum."""
781

    
782
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
783
        # Update objects with greater version and same hashmap and size (fix metadata updates).
784
        self._can_write(user, account, container, name)
785
        path, node = self._lookup_object(account, container, name)
786
        props = self._get_version(node, version)
787
        versions = self.node.node_get_versions(node)
788
        for x in versions:
789
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
790
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
791

    
792
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
793
        dest_version_ids = []
794
        self._can_read(user, src_account, src_container, src_name)
795
        path, node = self._lookup_object(src_account, src_container, src_name)
796
        # TODO: Will do another fetch of the properties in duplicate version...
797
        props = self._get_version(node, src_version) # Check to see if source exists.
798
        src_version_id = props[self.SERIAL]
799
        hash = props[self.HASH]
800
        size = props[self.SIZE]
801
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
802
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
803
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
804
                self._delete_object(user, src_account, src_container, src_name)
805

    
806
        if delimiter:
807
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
808
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
809
            src_names.sort(key=lambda x: x[2]) # order by nodes
810
            paths = [elem[0] for elem in src_names]
811
            nodes = [elem[2] for elem in src_names]
812
            # TODO: Will do another fetch of the properties in duplicate version...
813
            props = self._get_versions(nodes) # Check to see if source exists.
814

    
815
            for prop, path, node in zip(props, paths, nodes):
816
                src_version_id = prop[self.SERIAL]
817
                hash = prop[self.HASH]
818
                vtype = prop[self.TYPE]
819
                size = prop[self.SIZE]
820
                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
821
                vdest_name = path.replace(prefix, dest_prefix, 1)
822
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
823
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
824
                        self._delete_object(user, src_account, src_container, path)
825
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
826

    
827
    @backend_method
828
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
829
        """Copy an object's data and metadata."""
830

    
831
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
832
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
833
        return dest_version_id
834

    
835
    @backend_method
836
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
837
        """Move an object's data and metadata."""
838

    
839
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
840
        if user != src_account:
841
            raise NotAllowedError
842
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
843
        return dest_version_id
844

    
845
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
846
        if user != account:
847
            raise NotAllowedError
848

    
849
        if until is not None:
850
            path = '/'.join((account, container, name))
851
            node = self.node.node_lookup(path)
852
            if node is None:
853
                return
854
            hashes = []
855
            size = 0
856
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
857
            hashes += h
858
            size += s
859
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
860
            hashes += h
861
            size += s
862
            for h in hashes:
863
                self.store.map_delete(h)
864
            self.node.node_purge(node, until, CLUSTER_DELETED)
865
            try:
866
                props = self._get_version(node)
867
            except NameError:
868
                self.permissions.access_clear(path)
869
            self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path})
870
            return
871

    
872
        path, node = self._lookup_object(account, container, name)
873
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
874
        del_size = self._apply_versioning(account, container, src_version_id)
875
        if del_size:
876
            self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
877
        self._report_object_change(user, account, path, details={'action': 'object delete'})
878
        self.permissions.access_clear(path)
879

    
880
        if delimiter:
881
            prefix = name + delimiter if not name.endswith(delimiter) else name
882
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
883
            paths = []
884
            for t in src_names:
885
                    path = '/'.join((account, container, t[0]))
886
                    node = t[2]
887
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
888
                del_size = self._apply_versioning(account, container, src_version_id)
889
                if del_size:
890
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
891
                self._report_object_change(user, account, path, details={'action': 'object delete'})
892
                paths.append(path)
893
            self.permissions.access_clear_bulk(paths)
894

    
895
    @backend_method
896
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
897
        """Delete/purge an object."""
898

    
899
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
900
        self._delete_object(user, account, container, name, until, delimiter)
901

    
902
    @backend_method
903
    def list_versions(self, user, account, container, name):
904
        """Return a list of all (version, version_timestamp) tuples for an object."""
905

    
906
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
907
        self._can_read(user, account, container, name)
908
        path, node = self._lookup_object(account, container, name)
909
        versions = self.node.node_get_versions(node)
910
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
911

    
912
    @backend_method
913
    def get_uuid(self, user, uuid):
914
        """Return the (account, container, name) for the UUID given."""
915

    
916
        logger.debug("get_uuid: %s %s", user, uuid)
917
        info = self.node.latest_uuid(uuid)
918
        if info is None:
919
            raise NameError
920
        path, serial = info
921
        account, container, name = path.split('/', 2)
922
        self._can_read(user, account, container, name)
923
        return (account, container, name)
924

    
925
    @backend_method
926
    def get_public(self, user, public):
927
        """Return the (account, container, name) for the public id given."""
928

    
929
        logger.debug("get_public: %s %s", user, public)
930
        if public is None or public < ULTIMATE_ANSWER:
931
            raise NameError
932
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
933
        if path is None:
934
            raise NameError
935
        account, container, name = path.split('/', 2)
936
        self._can_read(user, account, container, name)
937
        return (account, container, name)
938

    
939
    @backend_method(autocommit=0)
940
    def get_block(self, hash):
941
        """Return a block's data."""
942

    
943
        logger.debug("get_block: %s", hash)
944
        block = self.store.block_get(binascii.unhexlify(hash))
945
        if not block:
946
            raise ItemNotExists('Block does not exist')
947
        return block
948

    
949
    @backend_method(autocommit=0)
950
    def put_block(self, data):
951
        """Store a block and return the hash."""
952

    
953
        logger.debug("put_block: %s", len(data))
954
        return binascii.hexlify(self.store.block_put(data))
955

    
956
    @backend_method(autocommit=0)
957
    def update_block(self, hash, data, offset=0):
958
        """Update a known block and return the hash."""
959

    
960
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
961
        if offset == 0 and len(data) == self.block_size:
962
            return self.put_block(data)
963
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
964
        return binascii.hexlify(h)
965

    
966
    # Path functions.
967

    
968
    def _generate_uuid(self):
969
        return str(uuidlib.uuid4())
970

    
971
    def _put_object_node(self, path, parent, name):
972
        path = '/'.join((path, name))
973
        node = self.node.node_lookup(path)
974
        if node is None:
975
            node = self.node.node_create(parent, path)
976
        return path, node
977

    
978
    def _put_path(self, user, parent, path):
979
        node = self.node.node_create(parent, path)
980
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
981
        return node
982

    
983
    def _lookup_account(self, account, create=True):
984
        node = self.node.node_lookup(account)
985
        if node is None and create:
986
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
987
        return account, node
988

    
989
    def _lookup_container(self, account, container):
990
        path = '/'.join((account, container))
991
        node = self.node.node_lookup(path)
992
        if node is None:
993
            raise ItemNotExists('Container does not exist')
994
        return path, node
995

    
996
    def _lookup_object(self, account, container, name):
997
        path = '/'.join((account, container, name))
998
        node = self.node.node_lookup(path)
999
        if node is None:
1000
            raise ItemNotExists('Object does not exist')
1001
        return path, node
1002

    
1003
    def _lookup_objects(self, paths):
1004
        nodes = self.node.node_lookup_bulk(paths)
1005
        return paths, nodes
1006

    
1007
    def _get_properties(self, node, until=None):
1008
        """Return properties until the timestamp given."""
1009

    
1010
        before = until if until is not None else inf
1011
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1012
        if props is None and until is not None:
1013
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1014
        if props is None:
1015
            raise ItemNotExists('Path does not exist')
1016
        return props
1017

    
1018
    def _get_statistics(self, node, until=None):
1019
        """Return count, sum of size and latest timestamp of everything under node."""
1020

    
1021
        if until is None:
1022
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1023
        else:
1024
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1025
        if stats is None:
1026
            stats = (0, 0, 0)
1027
        return stats
1028

    
1029
    def _get_version(self, node, version=None):
1030
        if version is None:
1031
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1032
            if props is None:
1033
                raise ItemNotExists('Object does not exist')
1034
        else:
1035
            try:
1036
                version = int(version)
1037
            except ValueError:
1038
                raise VersionNotExists('Version does not exist')
1039
            props = self.node.version_get_properties(version)
1040
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1041
                raise VersionNotExists('Version does not exist')
1042
        return props
1043

    
1044
    def _get_versions(self, nodes):
1045
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1046

    
1047
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1048
        """Create a new version of the node."""
1049

    
1050
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1051
        if props is not None:
1052
            src_version_id = props[self.SERIAL]
1053
            src_hash = props[self.HASH]
1054
            src_size = props[self.SIZE]
1055
            src_type = props[self.TYPE]
1056
            src_checksum = props[self.CHECKSUM]
1057
        else:
1058
            src_version_id = None
1059
            src_hash = None
1060
            src_size = 0
1061
            src_type = ''
1062
            src_checksum = ''
1063
        if size is None: # Set metadata.
1064
            hash = src_hash # This way hash can be set to None (account or container).
1065
            size = src_size
1066
        if type is None:
1067
            type = src_type
1068
        if checksum is None:
1069
            checksum = src_checksum
1070
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1071

    
1072
        if src_node is None:
1073
            pre_version_id = src_version_id
1074
        else:
1075
            pre_version_id = None
1076
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1077
            if props is not None:
1078
                pre_version_id = props[self.SERIAL]
1079
        if pre_version_id is not None:
1080
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1081

    
1082
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1083
        return pre_version_id, dest_version_id
1084

    
1085
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1086
        if src_version_id is not None:
1087
            self.node.attribute_copy(src_version_id, dest_version_id)
1088
        if not replace:
1089
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1090
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1091
        else:
1092
            self.node.attribute_del(dest_version_id, domain)
1093
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1094

    
1095
    def _put_metadata(self, user, node, domain, meta, replace=False):
1096
        """Create a new version and store metadata."""
1097

    
1098
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1099
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1100
        return src_version_id, dest_version_id
1101

    
1102
    def _list_limits(self, listing, marker, limit):
1103
        start = 0
1104
        if marker:
1105
            try:
1106
                start = listing.index(marker) + 1
1107
            except ValueError:
1108
                pass
1109
        if not limit or limit > 10000:
1110
            limit = 10000
1111
        return start, limit
1112

    
1113
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1114
        cont_prefix = path + '/'
1115
        prefix = cont_prefix + prefix
1116
        start = cont_prefix + marker if marker else None
1117
        before = until if until is not None else inf
1118
        filterq = keys if domain else []
1119
        sizeq = size_range
1120

    
1121
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1122
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1123
        objects.sort(key=lambda x: x[0])
1124
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1125
        return objects
1126

    
1127
    # Reporting functions.
1128

    
1129
    def _report_size_change(self, user, account, size, details={}):
1130
        account_node = self._lookup_account(account, True)[1]
1131
        total = self._get_statistics(account_node)[1]
1132
        details.update({'user': user, 'total': total})
1133
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1134
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1135

    
1136
    def _report_object_change(self, user, account, path, details={}):
1137
        details.update({'user': user})
1138
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1139
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1140

    
1141
    def _report_sharing_change(self, user, account, path, details={}):
1142
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1143
        details.update({'user': user})
1144
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1145

    
1146
    # Policy functions.
1147

    
1148
    def _check_policy(self, policy):
1149
        for k in policy.keys():
1150
            if policy[k] == '':
1151
                policy[k] = self.default_policy.get(k)
1152
        for k, v in policy.iteritems():
1153
            if k == 'quota':
1154
                q = int(v) # May raise ValueError.
1155
                if q < 0:
1156
                    raise ValueError
1157
            elif k == 'versioning':
1158
                if v not in ['auto', 'none']:
1159
                    raise ValueError
1160
            else:
1161
                raise ValueError
1162

    
1163
    def _put_policy(self, node, policy, replace):
1164
        if replace:
1165
            for k, v in self.default_policy.iteritems():
1166
                if k not in policy:
1167
                    policy[k] = v
1168
        self.node.policy_set(node, policy)
1169

    
1170
    def _get_policy(self, node):
1171
        policy = self.default_policy.copy()
1172
        policy.update(self.node.policy_get(node))
1173
        return policy
1174

    
1175
    def _apply_versioning(self, account, container, version_id):
1176
        """Delete the provided version if such is the policy.
1177
           Return size of object removed.
1178
        """
1179

    
1180
        if version_id is None:
1181
            return 0
1182
        path, node = self._lookup_container(account, container)
1183
        versioning = self._get_policy(node)['versioning']
1184
        if versioning != 'auto':
1185
            hash, size = self.node.version_remove(version_id)
1186
            self.store.map_delete(hash)
1187
            return size
1188
        return 0
1189

    
1190
    # Access control functions.
1191

    
1192
    def _check_groups(self, groups):
1193
        # raise ValueError('Bad characters in groups')
1194
        pass
1195

    
1196
    def _check_permissions(self, path, permissions):
1197
        # raise ValueError('Bad characters in permissions')
1198
        pass
1199

    
1200
    def _get_formatted_paths(self, paths):
1201
        formatted = []
1202
        for p in paths:
1203
            node = self.node.node_lookup(p)
1204
            props = None
1205
            if node is not None:
1206
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1207
            if props is not None:
1208
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1209
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1210
                formatted.append((p, self.MATCH_EXACT))
1211
        return formatted
1212

    
1213
    def _get_permissions_path(self, account, container, name):
1214
        path = '/'.join((account, container, name))
1215
        permission_paths = self.permissions.access_inherit(path)
1216
        permission_paths.sort()
1217
        permission_paths.reverse()
1218
        for p in permission_paths:
1219
            if p == path:
1220
                return p
1221
            else:
1222
                if p.count('/') < 2:
1223
                    continue
1224
                node = self.node.node_lookup(p)
1225
                if node is not None:
1226
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1227
                if props is not None:
1228
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1229
                        return p
1230
        return None
1231

    
1232
    def _can_read(self, user, account, container, name):
1233
        if user == account:
1234
            return True
1235
        path = '/'.join((account, container, name))
1236
        if self.permissions.public_get(path) is not None:
1237
            return True
1238
        path = self._get_permissions_path(account, container, name)
1239
        if not path:
1240
            raise NotAllowedError
1241
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1242
            raise NotAllowedError
1243

    
1244
    def _can_write(self, user, account, container, name):
1245
        if user == account:
1246
            return True
1247
        path = '/'.join((account, container, name))
1248
        path = self._get_permissions_path(account, container, name)
1249
        if not path:
1250
            raise NotAllowedError
1251
        if not self.permissions.access_check(path, self.WRITE, user):
1252
            raise NotAllowedError
1253

    
1254
    def _allowed_accounts(self, user):
1255
        allow = set()
1256
        for path in self.permissions.access_list_paths(user):
1257
            allow.add(path.split('/', 1)[0])
1258
        return sorted(allow)
1259

    
1260
    def _allowed_containers(self, user, account):
1261
        allow = set()
1262
        for path in self.permissions.access_list_paths(user, account):
1263
            allow.add(path.split('/', 2)[1])
1264
        return sorted(allow)