Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 49f44d54

History | View | Annotate | Download (58.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
83

    
84
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
85
QUEUE_CLIENT_ID = 'pithos'
86
QUEUE_INSTANCE_ID = '1'
87

    
88
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
89

    
90
inf = float('inf')
91

    
92
ULTIMATE_ANSWER = 42
93

    
94

    
95
logger = logging.getLogger(__name__)
96

    
97

    
98
def backend_method(func=None, autocommit=1):
99
    if func is None:
100
        def fn(func):
101
            return backend_method(func, autocommit)
102
        return fn
103

    
104
    if not autocommit:
105
        return func
106

    
107
    def fn(self, *args, **kw):
108
        self.wrapper.execute()
109
        try:
110
            self.messages = []
111
            ret = func(self, *args, **kw)
112
            for m in self.messages:
113
                self.queue.send(*m)
114
            self.wrapper.commit()
115
            return ret
116
        except:
117
            self.wrapper.rollback()
118
            raise
119
    return fn
120

    
121

    
122
class ModularBackend(BaseBackend):
123
    """A modular backend.
124

125
    Uses modules for SQL functions and storage.
126
    """
127

    
128
    def __init__(self, db_module=None, db_connection=None,
129
                 block_module=None, block_path=None, block_umask=None,
130
                 queue_module=None, queue_connection=None):
131
        db_module = db_module or DEFAULT_DB_MODULE
132
        db_connection = db_connection or DEFAULT_DB_CONNECTION
133
        block_module = block_module or DEFAULT_BLOCK_MODULE
134
        block_path = block_path or DEFAULT_BLOCK_PATH
135
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
136
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
138

    
139
        self.hash_algorithm = 'sha256'
140
        self.block_size = 4 * 1024 * 1024  # 4MB
141

    
142
        self.default_policy = {'quota': DEFAULT_QUOTA,
143
                               'versioning': DEFAULT_VERSIONING}
144

    
145
        def load_module(m):
146
            __import__(m)
147
            return sys.modules[m]
148

    
149
        self.db_module = load_module(db_module)
150
        self.wrapper = self.db_module.DBWrapper(db_connection)
151
        params = {'wrapper': self.wrapper}
152
        self.permissions = self.db_module.Permissions(**params)
153
        self.config = self.db_module.Config(**params)
154
        self.quotaholder_sync = self.db_module.QuotaholderSync(**params)
155
        for x in ['READ', 'WRITE']:
156
            setattr(self, x, getattr(self.db_module, x))
157
        self.node = self.db_module.Node(**params)
158
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
159
            setattr(self, x, getattr(self.db_module, x))
160

    
161
        self.block_module = load_module(block_module)
162
        params = {'path': block_path,
163
                  'block_size': self.block_size,
164
                  'hash_algorithm': self.hash_algorithm,
165
                  'umask': block_umask}
166
        self.store = self.block_module.Store(**params)
167

    
168
        if queue_module and queue_connection:
169
            self.queue_module = load_module(queue_module)
170
            params = {'exchange': queue_connection,
171
                      'client_id': QUEUE_CLIENT_ID}
172
            self.queue = self.queue_module.Queue(**params)
173
        else:
174
            class NoQueue:
175
                def send(self, *args):
176
                    pass
177

    
178
                def close(self):
179
                    pass
180

    
181
            self.queue = NoQueue()
182

    
183
    def close(self):
184
        self.wrapper.close()
185
        self.queue.close()
186

    
187
    @backend_method
188
    def list_accounts(self, user, marker=None, limit=10000):
189
        """Return a list of accounts the user can access."""
190

    
191
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
192
        allowed = self._allowed_accounts(user)
193
        start, limit = self._list_limits(allowed, marker, limit)
194
        return allowed[start:start + limit]
195

    
196
    @backend_method
197
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
198
        """Return a dictionary with the account metadata for the domain."""
199

    
200
        logger.debug(
201
            "get_account_meta: %s %s %s %s", user, account, domain, until)
202
        path, node = self._lookup_account(account, user == account)
203
        if user != account:
204
            if until or node is None or account not in self._allowed_accounts(user):
205
                raise NotAllowedError
206
        try:
207
            props = self._get_properties(node, until)
208
            mtime = props[self.MTIME]
209
        except NameError:
210
            props = None
211
            mtime = until
212
        count, bytes, tstamp = self._get_statistics(node, until)
213
        tstamp = max(tstamp, mtime)
214
        if until is None:
215
            modified = tstamp
216
        else:
217
            modified = self._get_statistics(
218
                node)[2]  # Overall last modification.
219
            modified = max(modified, mtime)
220

    
221
        if user != account:
222
            meta = {'name': account}
223
        else:
224
            meta = {}
225
            if props is not None and include_user_defined:
226
                meta.update(
227
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
228
            if until is not None:
229
                meta.update({'until_timestamp': tstamp})
230
            meta.update({'name': account, 'count': count, 'bytes': bytes})
231
        meta.update({'modified': modified})
232
        return meta
233

    
234
    @backend_method
235
    def update_account_meta(self, user, account, domain, meta, replace=False):
236
        """Update the metadata associated with the account for the domain."""
237

    
238
        logger.debug("update_account_meta: %s %s %s %s %s", user,
239
                     account, domain, meta, replace)
240
        if user != account:
241
            raise NotAllowedError
242
        path, node = self._lookup_account(account, True)
243
        self._put_metadata(user, node, domain, meta, replace)
244

    
245
    @backend_method
246
    def get_account_groups(self, user, account):
247
        """Return a dictionary with the user groups defined for this account."""
248

    
249
        logger.debug("get_account_groups: %s %s", user, account)
250
        if user != account:
251
            if account not in self._allowed_accounts(user):
252
                raise NotAllowedError
253
            return {}
254
        self._lookup_account(account, True)
255
        return self.permissions.group_dict(account)
256

    
257
    @backend_method
258
    def update_account_groups(self, user, account, groups, replace=False):
259
        """Update the groups associated with the account."""
260

    
261
        logger.debug("update_account_groups: %s %s %s %s", user,
262
                     account, groups, replace)
263
        if user != account:
264
            raise NotAllowedError
265
        self._lookup_account(account, True)
266
        self._check_groups(groups)
267
        if replace:
268
            self.permissions.group_destroy(account)
269
        for k, v in groups.iteritems():
270
            if not replace:  # If not already deleted.
271
                self.permissions.group_delete(account, k)
272
            if v:
273
                self.permissions.group_addmany(account, k, v)
274

    
275
    @backend_method
276
    def get_account_policy(self, user, account):
277
        """Return a dictionary with the account policy."""
278

    
279
        logger.debug("get_account_policy: %s %s", user, account)
280
        if user != account:
281
            if account not in self._allowed_accounts(user):
282
                raise NotAllowedError
283
            return {}
284
        path, node = self._lookup_account(account, True)
285
        return self._get_policy(node)
286

    
287
    @backend_method
288
    def update_account_policy(self, user, account, policy, replace=False):
289
        """Update the policy associated with the account."""
290

    
291
        logger.debug("update_account_policy: %s %s %s %s", user,
292
                     account, policy, replace)
293
        if user != account:
294
            raise NotAllowedError
295
        path, node = self._lookup_account(account, True)
296
        self._check_policy(policy)
297
        self._put_policy(node, policy, replace)
298

    
299
    @backend_method
300
    def put_account(self, user, account, policy={}):
301
        """Create a new account with the given name."""
302

    
303
        logger.debug("put_account: %s %s %s", user, account, policy)
304
        if user != account:
305
            raise NotAllowedError
306
        node = self.node.node_lookup(account)
307
        if node is not None:
308
            raise AccountExists('Account already exists')
309
        if policy:
310
            self._check_policy(policy)
311
        node = self._put_path(user, self.ROOTNODE, account)
312
        self._put_policy(node, policy, True)
313

    
314
    @backend_method
315
    def delete_account(self, user, account):
316
        """Delete the account with the given name."""
317

    
318
        logger.debug("delete_account: %s %s", user, account)
319
        if user != account:
320
            raise NotAllowedError
321
        node = self.node.node_lookup(account)
322
        if node is None:
323
            return
324
        if not self.node.node_remove(node):
325
            raise AccountNotEmpty('Account is not empty')
326
        self.permissions.group_destroy(account)
327

    
328
    @backend_method
329
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
330
        """Return a list of containers existing under an account."""
331

    
332
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
333
                     account, marker, limit, shared, until, public)
334
        if user != account:
335
            if until or account not in self._allowed_accounts(user):
336
                raise NotAllowedError
337
            allowed = self._allowed_containers(user, account)
338
            start, limit = self._list_limits(allowed, marker, limit)
339
            return allowed[start:start + limit]
340
        if shared or public:
341
            allowed = set()
342
            if shared:
343
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
344
            if public:
345
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
346
            allowed = sorted(allowed)
347
            start, limit = self._list_limits(allowed, marker, limit)
348
            return allowed[start:start + limit]
349
        node = self.node.node_lookup(account)
350
        containers = [x[0] for x in self._list_object_properties(
351
            node, account, '', '/', marker, limit, False, None, [], until)]
352
        start, limit = self._list_limits(
353
            [x[0] for x in containers], marker, limit)
354
        return containers[start:start + limit]
355

    
356
    @backend_method
357
    def list_container_meta(self, user, account, container, domain, until=None):
358
        """Return a list with all the container's object meta keys for the domain."""
359

    
360
        logger.debug("list_container_meta: %s %s %s %s %s", user,
361
                     account, container, domain, until)
362
        allowed = []
363
        if user != account:
364
            if until:
365
                raise NotAllowedError
366
            allowed = self.permissions.access_list_paths(
367
                user, '/'.join((account, container)))
368
            if not allowed:
369
                raise NotAllowedError
370
        path, node = self._lookup_container(account, container)
371
        before = until if until is not None else inf
372
        allowed = self._get_formatted_paths(allowed)
373
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
374

    
375
    @backend_method
376
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
377
        """Return a dictionary with the container metadata for the domain."""
378

    
379
        logger.debug("get_container_meta: %s %s %s %s %s", user,
380
                     account, container, domain, until)
381
        if user != account:
382
            if until or container not in self._allowed_containers(user, account):
383
                raise NotAllowedError
384
        path, node = self._lookup_container(account, container)
385
        props = self._get_properties(node, until)
386
        mtime = props[self.MTIME]
387
        count, bytes, tstamp = self._get_statistics(node, until)
388
        tstamp = max(tstamp, mtime)
389
        if until is None:
390
            modified = tstamp
391
        else:
392
            modified = self._get_statistics(
393
                node)[2]  # Overall last modification.
394
            modified = max(modified, mtime)
395

    
396
        if user != account:
397
            meta = {'name': container}
398
        else:
399
            meta = {}
400
            if include_user_defined:
401
                meta.update(
402
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
403
            if until is not None:
404
                meta.update({'until_timestamp': tstamp})
405
            meta.update({'name': container, 'count': count, 'bytes': bytes})
406
        meta.update({'modified': modified})
407
        return meta
408

    
409
    @backend_method
410
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
411
        """Update the metadata associated with the container for the domain."""
412

    
413
        logger.debug("update_container_meta: %s %s %s %s %s %s",
414
                     user, account, container, domain, meta, replace)
415
        if user != account:
416
            raise NotAllowedError
417
        path, node = self._lookup_container(account, container)
418
        src_version_id, dest_version_id = self._put_metadata(
419
            user, node, domain, meta, replace)
420
        if src_version_id is not None:
421
            versioning = self._get_policy(node)['versioning']
422
            if versioning != 'auto':
423
                self.node.version_remove(src_version_id)
424

    
425
    @backend_method
426
    def get_container_policy(self, user, account, container):
427
        """Return a dictionary with the container policy."""
428

    
429
        logger.debug(
430
            "get_container_policy: %s %s %s", user, account, container)
431
        if user != account:
432
            if container not in self._allowed_containers(user, account):
433
                raise NotAllowedError
434
            return {}
435
        path, node = self._lookup_container(account, container)
436
        return self._get_policy(node)
437

    
438
    @backend_method
439
    def update_container_policy(self, user, account, container, policy, replace=False):
440
        """Update the policy associated with the container."""
441

    
442
        logger.debug("update_container_policy: %s %s %s %s %s",
443
                     user, account, container, policy, replace)
444
        if user != account:
445
            raise NotAllowedError
446
        path, node = self._lookup_container(account, container)
447
        self._check_policy(policy)
448
        self._put_policy(node, policy, replace)
449

    
450
    @backend_method
451
    def put_container(self, user, account, container, policy={}):
452
        """Create a new container with the given name."""
453

    
454
        logger.debug(
455
            "put_container: %s %s %s %s", user, account, container, policy)
456
        if user != account:
457
            raise NotAllowedError
458
        try:
459
            path, node = self._lookup_container(account, container)
460
        except NameError:
461
            pass
462
        else:
463
            raise ContainerExists('Container already exists')
464
        if policy:
465
            self._check_policy(policy)
466
        path = '/'.join((account, container))
467
        node = self._put_path(
468
            user, self._lookup_account(account, True)[1], path)
469
        self._put_policy(node, policy, True)
470

    
471
    @backend_method
472
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
473
        """Delete/purge the container with the given name."""
474

    
475
        logger.debug("delete_container: %s %s %s %s %s %s", user,
476
                     account, container, until, prefix, delimiter)
477
        if user != account:
478
            raise NotAllowedError
479
        path, node = self._lookup_container(account, container)
480

    
481
        if until is not None:
482
            hashes, size = self.node.node_purge_children(
483
                node, until, CLUSTER_HISTORY)
484
            for h in hashes:
485
                self.store.map_delete(h)
486
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
487
            self._report_size_change(user, account, -size, {'action':
488
                                     'container purge', 'path': path})
489
            return
490

    
491
        if not delimiter:
492
            if self._get_statistics(node)[0] > 0:
493
                raise ContainerNotEmpty('Container is not empty')
494
            hashes, size = self.node.node_purge_children(
495
                node, inf, CLUSTER_HISTORY)
496
            for h in hashes:
497
                self.store.map_delete(h)
498
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
499
            self.node.node_remove(node)
500
            self._report_size_change(user, account, -size, {'action':
501
                                     'container delete', 'path': path})
502
        else:
503
                # remove only contents
504
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
505
            paths = []
506
            for t in src_names:
507
                path = '/'.join((account, container, t[0]))
508
                node = t[2]
509
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
510
                del_size = self._apply_versioning(
511
                    account, container, src_version_id)
512
                if del_size:
513
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
514
                self._report_object_change(
515
                    user, account, path, details={'action': 'object delete'})
516
                paths.append(path)
517
            self.permissions.access_clear_bulk(paths)
518

    
519
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
520
        if user != account and until:
521
            raise NotAllowedError
522
        if shared and public:
523
            # get shared first
524
            shared = self._list_object_permissions(
525
                user, account, container, prefix, shared=True, public=False)
526
            objects = set()
527
            if shared:
528
                path, node = self._lookup_container(account, container)
529
                shared = self._get_formatted_paths(shared)
530
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
531

    
532
            # get public
533
            objects |= set(self._list_public_object_properties(
534
                user, account, container, prefix, all_props))
535
            objects = list(objects)
536

    
537
            objects.sort(key=lambda x: x[0])
538
            start, limit = self._list_limits(
539
                [x[0] for x in objects], marker, limit)
540
            return objects[start:start + limit]
541
        elif public:
542
            objects = self._list_public_object_properties(
543
                user, account, container, prefix, all_props)
544
            start, limit = self._list_limits(
545
                [x[0] for x in objects], marker, limit)
546
            return objects[start:start + limit]
547

    
548
        allowed = self._list_object_permissions(
549
            user, account, container, prefix, shared, public)
550
        if shared and not allowed:
551
            return []
552
        path, node = self._lookup_container(account, container)
553
        allowed = self._get_formatted_paths(allowed)
554
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
555
        start, limit = self._list_limits(
556
            [x[0] for x in objects], marker, limit)
557
        return objects[start:start + limit]
558

    
559
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
560
        public = self._list_object_permissions(
561
            user, account, container, prefix, shared=False, public=True)
562
        paths, nodes = self._lookup_objects(public)
563
        path = '/'.join((account, container))
564
        cont_prefix = path + '/'
565
        paths = [x[len(cont_prefix):] for x in paths]
566
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
567
        objects = [(path,) + props for path, props in zip(paths, props)]
568
        return objects
569

    
570
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
571
        objects = []
572
        while True:
573
            marker = objects[-1] if objects else None
574
            limit = 10000
575
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
576
            objects.extend(l)
577
            if not l or len(l) < limit:
578
                break
579
        return objects
580

    
581
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
582
        allowed = []
583
        path = '/'.join((account, container, prefix)).rstrip('/')
584
        if user != account:
585
            allowed = self.permissions.access_list_paths(user, path)
586
            if not allowed:
587
                raise NotAllowedError
588
        else:
589
            allowed = set()
590
            if shared:
591
                allowed.update(self.permissions.access_list_shared(path))
592
            if public:
593
                allowed.update(
594
                    [x[0] for x in self.permissions.public_list(path)])
595
            allowed = sorted(allowed)
596
            if not allowed:
597
                return []
598
        return allowed
599

    
600
    @backend_method
601
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
602
        """Return a list of object (name, version_id) tuples existing under a container."""
603

    
604
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
605
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
606

    
607
    @backend_method
608
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
609
        """Return a list of object metadata dicts existing under a container."""
610

    
611
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
612
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
613
        objects = []
614
        for p in props:
615
            if len(p) == 2:
616
                objects.append({'subdir': p[0]})
617
            else:
618
                objects.append({'name': p[0],
619
                                'bytes': p[self.SIZE + 1],
620
                                'type': p[self.TYPE + 1],
621
                                'hash': p[self.HASH + 1],
622
                                'version': p[self.SERIAL + 1],
623
                                'version_timestamp': p[self.MTIME + 1],
624
                                'modified': p[self.MTIME + 1] if until is None else None,
625
                                'modified_by': p[self.MUSER + 1],
626
                                'uuid': p[self.UUID + 1],
627
                                'checksum': p[self.CHECKSUM + 1]})
628
        return objects
629

    
630
    @backend_method
631
    def list_object_permissions(self, user, account, container, prefix=''):
632
        """Return a list of paths that enforce permissions under a container."""
633

    
634
        logger.debug("list_object_permissions: %s %s %s %s", user,
635
                     account, container, prefix)
636
        return self._list_object_permissions(user, account, container, prefix, True, False)
637

    
638
    @backend_method
639
    def list_object_public(self, user, account, container, prefix=''):
640
        """Return a dict mapping paths to public ids for objects that are public under a container."""
641

    
642
        logger.debug("list_object_public: %s %s %s %s", user,
643
                     account, container, prefix)
644
        public = {}
645
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
646
            public[path] = p + ULTIMATE_ANSWER
647
        return public
648

    
649
    @backend_method
650
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
651
        """Return a dictionary with the object metadata for the domain."""
652

    
653
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
654
                     account, container, name, domain, version)
655
        self._can_read(user, account, container, name)
656
        path, node = self._lookup_object(account, container, name)
657
        props = self._get_version(node, version)
658
        if version is None:
659
            modified = props[self.MTIME]
660
        else:
661
            try:
662
                modified = self._get_version(
663
                    node)[self.MTIME]  # Overall last modification.
664
            except NameError:  # Object may be deleted.
665
                del_props = self.node.version_lookup(
666
                    node, inf, CLUSTER_DELETED)
667
                if del_props is None:
668
                    raise ItemNotExists('Object does not exist')
669
                modified = del_props[self.MTIME]
670

    
671
        meta = {}
672
        if include_user_defined:
673
            meta.update(
674
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
675
        meta.update({'name': name,
676
                     'bytes': props[self.SIZE],
677
                     'type': props[self.TYPE],
678
                     'hash': props[self.HASH],
679
                     'version': props[self.SERIAL],
680
                     'version_timestamp': props[self.MTIME],
681
                     'modified': modified,
682
                     'modified_by': props[self.MUSER],
683
                     'uuid': props[self.UUID],
684
                     'checksum': props[self.CHECKSUM]})
685
        return meta
686

    
687
    @backend_method
688
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
689
        """Update the metadata associated with the object for the domain and return the new version."""
690

    
691
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
692
                     user, account, container, name, domain, meta, replace)
693
        self._can_write(user, account, container, name)
694
        path, node = self._lookup_object(account, container, name)
695
        src_version_id, dest_version_id = self._put_metadata(
696
            user, node, domain, meta, replace)
697
        self._apply_versioning(account, container, src_version_id)
698
        return dest_version_id
699

    
700
    @backend_method
701
    def get_object_permissions(self, user, account, container, name):
702
        """Return the action allowed on the object, the path
703
        from which the object gets its permissions from,
704
        along with a dictionary containing the permissions."""
705

    
706
        logger.debug("get_object_permissions: %s %s %s %s", user,
707
                     account, container, name)
708
        allowed = 'write'
709
        permissions_path = self._get_permissions_path(account, container, name)
710
        if user != account:
711
            if self.permissions.access_check(permissions_path, self.WRITE, user):
712
                allowed = 'write'
713
            elif self.permissions.access_check(permissions_path, self.READ, user):
714
                allowed = 'read'
715
            else:
716
                raise NotAllowedError
717
        self._lookup_object(account, container, name)
718
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
719

    
720
    @backend_method
721
    def update_object_permissions(self, user, account, container, name, permissions):
722
        """Update the permissions associated with the object."""
723

    
724
        logger.debug("update_object_permissions: %s %s %s %s %s",
725
                     user, account, container, name, permissions)
726
        if user != account:
727
            raise NotAllowedError
728
        path = self._lookup_object(account, container, name)[0]
729
        self._check_permissions(path, permissions)
730
        self.permissions.access_set(path, permissions)
731
        self._report_sharing_change(user, account, path, {'members':
732
                                    self.permissions.access_members(path)})
733

    
734
    @backend_method
735
    def get_object_public(self, user, account, container, name):
736
        """Return the public id of the object if applicable."""
737

    
738
        logger.debug(
739
            "get_object_public: %s %s %s %s", user, account, container, name)
740
        self._can_read(user, account, container, name)
741
        path = self._lookup_object(account, container, name)[0]
742
        p = self.permissions.public_get(path)
743
        if p is not None:
744
            p += ULTIMATE_ANSWER
745
        return p
746

    
747
    @backend_method
748
    def update_object_public(self, user, account, container, name, public):
749
        """Update the public status of the object."""
750

    
751
        logger.debug("update_object_public: %s %s %s %s %s", user,
752
                     account, container, name, public)
753
        self._can_write(user, account, container, name)
754
        path = self._lookup_object(account, container, name)[0]
755
        if not public:
756
            self.permissions.public_unset(path)
757
        else:
758
            self.permissions.public_set(path)
759

    
760
    @backend_method
761
    def get_object_hashmap(self, user, account, container, name, version=None):
762
        """Return the object's size and a list with partial hashes."""
763

    
764
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
765
                     account, container, name, version)
766
        self._can_read(user, account, container, name)
767
        path, node = self._lookup_object(account, container, name)
768
        props = self._get_version(node, version)
769
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
770
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
771

    
772
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
773
        if permissions is not None and user != account:
774
            raise NotAllowedError
775
        self._can_write(user, account, container, name)
776
        if permissions is not None:
777
            path = '/'.join((account, container, name))
778
            self._check_permissions(path, permissions)
779

    
780
        account_path, account_node = self._lookup_account(account, True)
781
        container_path, container_node = self._lookup_container(
782
            account, container)
783
        path, node = self._put_object_node(
784
            container_path, container_node, name)
785
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
786

    
787
        # Handle meta.
788
        if src_version_id is None:
789
            src_version_id = pre_version_id
790
        self._put_metadata_duplicate(
791
            src_version_id, dest_version_id, domain, meta, replace_meta)
792

    
793
        # Check quota.
794
        del_size = self._apply_versioning(account, container, pre_version_id)
795
        size_delta = size - del_size
796
        if size_delta > 0:
797
            account_quota = long(self._get_policy(account_node)['quota'])
798
            container_quota = long(self._get_policy(container_node)['quota'])
799
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
800
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
801
                # This must be executed in a transaction, so the version is never created if it fails.
802
                raise QuotaError
803
        self._report_size_change(user, account, size_delta, {
804
                                 'action': 'object update', 'path': path})
805

    
806
        if permissions is not None:
807
            self.permissions.access_set(path, permissions)
808
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
809

    
810
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
811
        return dest_version_id
812

    
813
    @backend_method
814
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
815
        """Create/update an object with the specified size and partial hashes."""
816

    
817
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
818
                     account, container, name, size, type, hashmap, checksum)
819
        if size == 0:  # No such thing as an empty hashmap.
820
            hashmap = [self.put_block('')]
821
        map = HashMap(self.block_size, self.hash_algorithm)
822
        map.extend([binascii.unhexlify(x) for x in hashmap])
823
        missing = self.store.block_search(map)
824
        if missing:
825
            ie = IndexError()
826
            ie.data = [binascii.hexlify(x) for x in missing]
827
            raise ie
828

    
829
        hash = map.hash()
830
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
831
        self.store.map_put(hash, map)
832
        return dest_version_id
833

    
834
    @backend_method
835
    def update_object_checksum(self, user, account, container, name, version, checksum):
836
        """Update an object's checksum."""
837

    
838
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
839
                     user, account, container, name, version, checksum)
840
        # Update objects with greater version and same hashmap and size (fix metadata updates).
841
        self._can_write(user, account, container, name)
842
        path, node = self._lookup_object(account, container, name)
843
        props = self._get_version(node, version)
844
        versions = self.node.node_get_versions(node)
845
        for x in versions:
846
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
847
                self.node.version_put_property(
848
                    x[self.SERIAL], 'checksum', checksum)
849

    
850
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
851
        dest_version_ids = []
852
        self._can_read(user, src_account, src_container, src_name)
853
        path, node = self._lookup_object(src_account, src_container, src_name)
854
        # TODO: Will do another fetch of the properties in duplicate version...
855
        props = self._get_version(
856
            node, src_version)  # Check to see if source exists.
857
        src_version_id = props[self.SERIAL]
858
        hash = props[self.HASH]
859
        size = props[self.SIZE]
860
        is_copy = not is_move and (src_account, src_container, src_name) != (
861
            dest_account, dest_container, dest_name)  # New uuid.
862
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
863
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
864
            self._delete_object(user, src_account, src_container, src_name)
865

    
866
        if delimiter:
867
            prefix = src_name + \
868
                delimiter if not src_name.endswith(delimiter) else src_name
869
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
870
            src_names.sort(key=lambda x: x[2])  # order by nodes
871
            paths = [elem[0] for elem in src_names]
872
            nodes = [elem[2] for elem in src_names]
873
            # TODO: Will do another fetch of the properties in duplicate version...
874
            props = self._get_versions(nodes)  # Check to see if source exists.
875

    
876
            for prop, path, node in zip(props, paths, nodes):
877
                src_version_id = prop[self.SERIAL]
878
                hash = prop[self.HASH]
879
                vtype = prop[self.TYPE]
880
                size = prop[self.SIZE]
881
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
882
                    delimiter) else dest_name
883
                vdest_name = path.replace(prefix, dest_prefix, 1)
884
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
885
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
886
                    self._delete_object(user, src_account, src_container, path)
887
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
888

    
889
    @backend_method
890
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
891
        """Copy an object's data and metadata."""
892

    
893
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
894
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
895
        return dest_version_id
896

    
897
    @backend_method
898
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
899
        """Move an object's data and metadata."""
900

    
901
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
902
        if user != src_account:
903
            raise NotAllowedError
904
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
905
        return dest_version_id
906

    
907
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
908
        if user != account:
909
            raise NotAllowedError
910

    
911
        if until is not None:
912
            path = '/'.join((account, container, name))
913
            node = self.node.node_lookup(path)
914
            if node is None:
915
                return
916
            hashes = []
917
            size = 0
918
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
919
            hashes += h
920
            size += s
921
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
922
            hashes += h
923
            size += s
924
            for h in hashes:
925
                self.store.map_delete(h)
926
            self.node.node_purge(node, until, CLUSTER_DELETED)
927
            try:
928
                props = self._get_version(node)
929
            except NameError:
930
                self.permissions.access_clear(path)
931
            self._report_size_change(user, account, -size, {
932
                                     'action': 'object purge', 'path': path})
933
            return
934

    
935
        path, node = self._lookup_object(account, container, name)
936
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
937
        del_size = self._apply_versioning(account, container, src_version_id)
938
        if del_size:
939
            self._report_size_change(user, account, -del_size, {
940
                                     'action': 'object delete', 'path': path})
941
        self._report_object_change(
942
            user, account, path, details={'action': 'object delete'})
943
        self.permissions.access_clear(path)
944

    
945
        if delimiter:
946
            prefix = name + delimiter if not name.endswith(delimiter) else name
947
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
948
            paths = []
949
            for t in src_names:
950
                path = '/'.join((account, container, t[0]))
951
                node = t[2]
952
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
953
                del_size = self._apply_versioning(
954
                    account, container, src_version_id)
955
                if del_size:
956
                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
957
                self._report_object_change(
958
                    user, account, path, details={'action': 'object delete'})
959
                paths.append(path)
960
            self.permissions.access_clear_bulk(paths)
961

    
962
    @backend_method
963
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
964
        """Delete/purge an object."""
965

    
966
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
967
                     account, container, name, until, prefix, delimiter)
968
        self._delete_object(user, account, container, name, until, delimiter)
969

    
970
    @backend_method
971
    def list_versions(self, user, account, container, name):
972
        """Return a list of all (version, version_timestamp) tuples for an object."""
973

    
974
        logger.debug(
975
            "list_versions: %s %s %s %s", user, account, container, name)
976
        self._can_read(user, account, container, name)
977
        path, node = self._lookup_object(account, container, name)
978
        versions = self.node.node_get_versions(node)
979
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
980

    
981
    @backend_method
982
    def get_uuid(self, user, uuid):
983
        """Return the (account, container, name) for the UUID given."""
984

    
985
        logger.debug("get_uuid: %s %s", user, uuid)
986
        info = self.node.latest_uuid(uuid)
987
        if info is None:
988
            raise NameError
989
        path, serial = info
990
        account, container, name = path.split('/', 2)
991
        self._can_read(user, account, container, name)
992
        return (account, container, name)
993

    
994
    @backend_method
995
    def get_public(self, user, public):
996
        """Return the (account, container, name) for the public id given."""
997

    
998
        logger.debug("get_public: %s %s", user, public)
999
        if public is None or public < ULTIMATE_ANSWER:
1000
            raise NameError
1001
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1002
        if path is None:
1003
            raise NameError
1004
        account, container, name = path.split('/', 2)
1005
        self._can_read(user, account, container, name)
1006
        return (account, container, name)
1007

    
1008
    @backend_method(autocommit=0)
1009
    def get_block(self, hash):
1010
        """Return a block's data."""
1011

    
1012
        logger.debug("get_block: %s", hash)
1013
        block = self.store.block_get(binascii.unhexlify(hash))
1014
        if not block:
1015
            raise ItemNotExists('Block does not exist')
1016
        return block
1017

    
1018
    @backend_method(autocommit=0)
1019
    def put_block(self, data):
1020
        """Store a block and return the hash."""
1021

    
1022
        logger.debug("put_block: %s", len(data))
1023
        return binascii.hexlify(self.store.block_put(data))
1024

    
1025
    @backend_method(autocommit=0)
1026
    def update_block(self, hash, data, offset=0):
1027
        """Update a known block and return the hash."""
1028

    
1029
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1030
        if offset == 0 and len(data) == self.block_size:
1031
            return self.put_block(data)
1032
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1033
        return binascii.hexlify(h)
1034

    
1035
    # Path functions.
1036

    
1037
    def _generate_uuid(self):
1038
        return str(uuidlib.uuid4())
1039

    
1040
    def _put_object_node(self, path, parent, name):
1041
        path = '/'.join((path, name))
1042
        node = self.node.node_lookup(path)
1043
        if node is None:
1044
            node = self.node.node_create(parent, path)
1045
        return path, node
1046

    
1047
    def _put_path(self, user, parent, path):
1048
        node = self.node.node_create(parent, path)
1049
        self.node.version_create(node, None, 0, '', None, user,
1050
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1051
        return node
1052

    
1053
    def _lookup_account(self, account, create=True):
1054
        node = self.node.node_lookup(account)
1055
        if node is None and create:
1056
            node = self._put_path(
1057
                account, self.ROOTNODE, account)  # User is account.
1058
        return account, node
1059

    
1060
    def _lookup_container(self, account, container):
1061
        path = '/'.join((account, container))
1062
        node = self.node.node_lookup(path)
1063
        if node is None:
1064
            raise ItemNotExists('Container does not exist')
1065
        return path, node
1066

    
1067
    def _lookup_object(self, account, container, name):
1068
        path = '/'.join((account, container, name))
1069
        node = self.node.node_lookup(path)
1070
        if node is None:
1071
            raise ItemNotExists('Object does not exist')
1072
        return path, node
1073

    
1074
    def _lookup_objects(self, paths):
1075
        nodes = self.node.node_lookup_bulk(paths)
1076
        return paths, nodes
1077

    
1078
    def _get_properties(self, node, until=None):
1079
        """Return properties until the timestamp given."""
1080

    
1081
        before = until if until is not None else inf
1082
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1083
        if props is None and until is not None:
1084
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1085
        if props is None:
1086
            raise ItemNotExists('Path does not exist')
1087
        return props
1088

    
1089
    def _get_statistics(self, node, until=None):
1090
        """Return count, sum of size and latest timestamp of everything under node."""
1091

    
1092
        if until is None:
1093
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1094
        else:
1095
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1096
        if stats is None:
1097
            stats = (0, 0, 0)
1098
        return stats
1099

    
1100
    def _get_version(self, node, version=None):
1101
        if version is None:
1102
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1103
            if props is None:
1104
                raise ItemNotExists('Object does not exist')
1105
        else:
1106
            try:
1107
                version = int(version)
1108
            except ValueError:
1109
                raise VersionNotExists('Version does not exist')
1110
            props = self.node.version_get_properties(version)
1111
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1112
                raise VersionNotExists('Version does not exist')
1113
        return props
1114

    
1115
    def _get_versions(self, nodes):
1116
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1117

    
1118
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1119
        """Create a new version of the node."""
1120

    
1121
        props = self.node.version_lookup(
1122
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1123
        if props is not None:
1124
            src_version_id = props[self.SERIAL]
1125
            src_hash = props[self.HASH]
1126
            src_size = props[self.SIZE]
1127
            src_type = props[self.TYPE]
1128
            src_checksum = props[self.CHECKSUM]
1129
        else:
1130
            src_version_id = None
1131
            src_hash = None
1132
            src_size = 0
1133
            src_type = ''
1134
            src_checksum = ''
1135
        if size is None:  # Set metadata.
1136
            hash = src_hash  # This way hash can be set to None (account or container).
1137
            size = src_size
1138
        if type is None:
1139
            type = src_type
1140
        if checksum is None:
1141
            checksum = src_checksum
1142
        uuid = self._generate_uuid(
1143
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1144

    
1145
        if src_node is None:
1146
            pre_version_id = src_version_id
1147
        else:
1148
            pre_version_id = None
1149
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1150
            if props is not None:
1151
                pre_version_id = props[self.SERIAL]
1152
        if pre_version_id is not None:
1153
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1154

    
1155
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1156
        return pre_version_id, dest_version_id
1157

    
1158
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1159
        if src_version_id is not None:
1160
            self.node.attribute_copy(src_version_id, dest_version_id)
1161
        if not replace:
1162
            self.node.attribute_del(dest_version_id, domain, (
1163
                k for k, v in meta.iteritems() if v == ''))
1164
            self.node.attribute_set(dest_version_id, domain, (
1165
                (k, v) for k, v in meta.iteritems() if v != ''))
1166
        else:
1167
            self.node.attribute_del(dest_version_id, domain)
1168
            self.node.attribute_set(dest_version_id, domain, ((
1169
                k, v) for k, v in meta.iteritems()))
1170

    
1171
    def _put_metadata(self, user, node, domain, meta, replace=False):
1172
        """Create a new version and store metadata."""
1173

    
1174
        src_version_id, dest_version_id = self._put_version_duplicate(
1175
            user, node)
1176
        self._put_metadata_duplicate(
1177
            src_version_id, dest_version_id, domain, meta, replace)
1178
        return src_version_id, dest_version_id
1179

    
1180
    def _list_limits(self, listing, marker, limit):
1181
        start = 0
1182
        if marker:
1183
            try:
1184
                start = listing.index(marker) + 1
1185
            except ValueError:
1186
                pass
1187
        if not limit or limit > 10000:
1188
            limit = 10000
1189
        return start, limit
1190

    
1191
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1192
        cont_prefix = path + '/'
1193
        prefix = cont_prefix + prefix
1194
        start = cont_prefix + marker if marker else None
1195
        before = until if until is not None else inf
1196
        filterq = keys if domain else []
1197
        sizeq = size_range
1198

    
1199
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1200
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1201
        objects.sort(key=lambda x: x[0])
1202
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1203
        return objects
1204

    
1205
    # Reporting functions.
1206

    
1207
    def _report_size_change(self, user, account, size, details={}):
1208
        account_node = self._lookup_account(account, True)[1]
1209
        total = self._get_statistics(account_node)[1]
1210
        details.update({'user': user, 'total': total})
1211
        logger.debug(
1212
            "_report_size_change: %s %s %s %s", user, account, size, details)
1213
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1214
                                                  account,
1215
                                                  QUEUE_INSTANCE_ID,
1216
                                                  'diskspace',
1217
                                                  float(size),
1218
                                                  details))
1219

    
1220
    def _report_object_change(self, user, account, path, details={}):
1221
        details.update({'user': user})
1222
        logger.debug("_report_object_change: %s %s %s %s", user,
1223
                     account, path, details)
1224
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
1225
            'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1226

    
1227
    def _report_sharing_change(self, user, account, path, details={}):
1228
        logger.debug("_report_permissions_change: %s %s %s %s",
1229
                     user, account, path, details)
1230
        details.update({'user': user})
1231
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
1232
                             QUEUE_INSTANCE_ID, 'sharing', path, details))
1233

    
1234
    # Policy functions.
1235

    
1236
    def _check_policy(self, policy):
1237
        for k in policy.keys():
1238
            if policy[k] == '':
1239
                policy[k] = self.default_policy.get(k)
1240
        for k, v in policy.iteritems():
1241
            if k == 'quota':
1242
                q = int(v)  # May raise ValueError.
1243
                if q < 0:
1244
                    raise ValueError
1245
            elif k == 'versioning':
1246
                if v not in ['auto', 'none']:
1247
                    raise ValueError
1248
            else:
1249
                raise ValueError
1250

    
1251
    def _put_policy(self, node, policy, replace):
1252
        if replace:
1253
            for k, v in self.default_policy.iteritems():
1254
                if k not in policy:
1255
                    policy[k] = v
1256
        self.node.policy_set(node, policy)
1257

    
1258
    def _get_policy(self, node):
1259
        policy = self.default_policy.copy()
1260
        policy.update(self.node.policy_get(node))
1261
        return policy
1262

    
1263
    def _apply_versioning(self, account, container, version_id):
1264
        """Delete the provided version if such is the policy.
1265
           Return size of object removed.
1266
        """
1267

    
1268
        if version_id is None:
1269
            return 0
1270
        path, node = self._lookup_container(account, container)
1271
        versioning = self._get_policy(node)['versioning']
1272
        if versioning != 'auto':
1273
            hash, size = self.node.version_remove(version_id)
1274
            self.store.map_delete(hash)
1275
            return size
1276
        return 0
1277

    
1278
    # Access control functions.
1279

    
1280
    def _check_groups(self, groups):
1281
        # raise ValueError('Bad characters in groups')
1282
        pass
1283

    
1284
    def _check_permissions(self, path, permissions):
1285
        # raise ValueError('Bad characters in permissions')
1286
        pass
1287

    
1288
    def _get_formatted_paths(self, paths):
1289
        formatted = []
1290
        for p in paths:
1291
            node = self.node.node_lookup(p)
1292
            props = None
1293
            if node is not None:
1294
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1295
            if props is not None:
1296
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1297
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1298
                formatted.append((p, self.MATCH_EXACT))
1299
        return formatted
1300

    
1301
    def _get_permissions_path(self, account, container, name):
1302
        path = '/'.join((account, container, name))
1303
        permission_paths = self.permissions.access_inherit(path)
1304
        permission_paths.sort()
1305
        permission_paths.reverse()
1306
        for p in permission_paths:
1307
            if p == path:
1308
                return p
1309
            else:
1310
                if p.count('/') < 2:
1311
                    continue
1312
                node = self.node.node_lookup(p)
1313
                if node is not None:
1314
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1315
                if props is not None:
1316
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1317
                        return p
1318
        return None
1319

    
1320
    def _can_read(self, user, account, container, name):
1321
        if user == account:
1322
            return True
1323
        path = '/'.join((account, container, name))
1324
        if self.permissions.public_get(path) is not None:
1325
            return True
1326
        path = self._get_permissions_path(account, container, name)
1327
        if not path:
1328
            raise NotAllowedError
1329
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1330
            raise NotAllowedError
1331

    
1332
    def _can_write(self, user, account, container, name):
1333
        if user == account:
1334
            return True
1335
        path = '/'.join((account, container, name))
1336
        path = self._get_permissions_path(account, container, name)
1337
        if not path:
1338
            raise NotAllowedError
1339
        if not self.permissions.access_check(path, self.WRITE, user):
1340
            raise NotAllowedError
1341

    
1342
    def _allowed_accounts(self, user):
1343
        allow = set()
1344
        for path in self.permissions.access_list_paths(user):
1345
            allow.add(path.split('/', 1)[0])
1346
        return sorted(allow)
1347

    
1348
    def _allowed_containers(self, user, account):
1349
        allow = set()
1350
        for path in self.permissions.access_list_paths(user, account):
1351
            allow.add(path.split('/', 2)[1])
1352
        return sorted(allow)