Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b1dadd0e

History | View | Annotate | Download (59.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
83
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
84

    
85
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
86
QUEUE_CLIENT_ID = 'pithos'
87
QUEUE_INSTANCE_ID = '1'
88

    
89
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
90

    
91
inf = float('inf')
92

    
93
ULTIMATE_ANSWER = 42
94

    
95

    
96
logger = logging.getLogger(__name__)
97

    
98

    
99
def backend_method(func=None, autocommit=1):
100
    if func is None:
101
        def fn(func):
102
            return backend_method(func, autocommit)
103
        return fn
104

    
105
    if not autocommit:
106
        return func
107

    
108
    def fn(self, *args, **kw):
109
        self.wrapper.execute()
110
        try:
111
            self.messages = []
112
            ret = func(self, *args, **kw)
113
            for m in self.messages:
114
                self.queue.send(*m)
115
            self.wrapper.commit()
116
            return ret
117
        except:
118
            self.wrapper.rollback()
119
            raise
120
    return fn
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 queue_module=None, queue_hosts=None,
132
                 queue_exchange=None,
133
                 free_versioning=True):
134
        db_module = db_module or DEFAULT_DB_MODULE
135
        db_connection = db_connection or DEFAULT_DB_CONNECTION
136
        block_module = block_module or DEFAULT_BLOCK_MODULE
137
        block_path = block_path or DEFAULT_BLOCK_PATH
138
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
139
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
140
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
141
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
142
        
143
        self.hash_algorithm = 'sha256'
144
        self.block_size = 4 * 1024 * 1024  # 4MB
145
        self.free_versioning = free_versioning
146

    
147
        self.default_policy = {'quota': DEFAULT_QUOTA,
148
                               'versioning': DEFAULT_VERSIONING}
149

    
150
        def load_module(m):
151
            __import__(m)
152
            return sys.modules[m]
153

    
154
        self.db_module = load_module(db_module)
155
        self.wrapper = self.db_module.DBWrapper(db_connection)
156
        params = {'wrapper': self.wrapper}
157
        self.permissions = self.db_module.Permissions(**params)
158
        self.config = self.db_module.Config(**params)
159
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
160
        for x in ['READ', 'WRITE']:
161
            setattr(self, x, getattr(self.db_module, x))
162
        self.node = self.db_module.Node(**params)
163
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
164
            setattr(self, x, getattr(self.db_module, x))
165

    
166
        self.block_module = load_module(block_module)
167
        params = {'path': block_path,
168
                  'block_size': self.block_size,
169
                  'hash_algorithm': self.hash_algorithm,
170
                  'umask': block_umask}
171
        self.store = self.block_module.Store(**params)
172

    
173
        if queue_module and queue_hosts:
174
            self.queue_module = load_module(queue_module)
175
            params = {'hosts': queue_hosts,
176
                      'exchange': queue_exchange,
177
                      'client_id': QUEUE_CLIENT_ID}
178
            self.queue = self.queue_module.Queue(**params)
179
        else:
180
            class NoQueue:
181
                def send(self, *args):
182
                    pass
183

    
184
                def close(self):
185
                    pass
186

    
187
            self.queue = NoQueue()
188

    
189
    def close(self):
190
        self.wrapper.close()
191
        self.queue.close()
192

    
193
    @backend_method
194
    def list_accounts(self, user, marker=None, limit=10000):
195
        """Return a list of accounts the user can access."""
196

    
197
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
198
        allowed = self._allowed_accounts(user)
199
        start, limit = self._list_limits(allowed, marker, limit)
200
        return allowed[start:start + limit]
201

    
202
    @backend_method
203
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
204
        """Return a dictionary with the account metadata for the domain."""
205

    
206
        logger.debug(
207
            "get_account_meta: %s %s %s %s", user, account, domain, until)
208
        path, node = self._lookup_account(account, user == account)
209
        if user != account:
210
            if until or node is None or account not in self._allowed_accounts(user):
211
                raise NotAllowedError
212
        try:
213
            props = self._get_properties(node, until)
214
            mtime = props[self.MTIME]
215
        except NameError:
216
            props = None
217
            mtime = until
218
        count, bytes, tstamp = self._get_statistics(node, until)
219
        tstamp = max(tstamp, mtime)
220
        if until is None:
221
            modified = tstamp
222
        else:
223
            modified = self._get_statistics(
224
                node)[2]  # Overall last modification.
225
            modified = max(modified, mtime)
226

    
227
        if user != account:
228
            meta = {'name': account}
229
        else:
230
            meta = {}
231
            if props is not None and include_user_defined:
232
                meta.update(
233
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
234
            if until is not None:
235
                meta.update({'until_timestamp': tstamp})
236
            meta.update({'name': account, 'count': count, 'bytes': bytes})
237
        meta.update({'modified': modified})
238
        return meta
239

    
240
    @backend_method
241
    def update_account_meta(self, user, account, domain, meta, replace=False):
242
        """Update the metadata associated with the account for the domain."""
243

    
244
        logger.debug("update_account_meta: %s %s %s %s %s", user,
245
                     account, domain, meta, replace)
246
        if user != account:
247
            raise NotAllowedError
248
        path, node = self._lookup_account(account, True)
249
        self._put_metadata(user, node, domain, meta, replace)
250

    
251
    @backend_method
252
    def get_account_groups(self, user, account):
253
        """Return a dictionary with the user groups defined for this account."""
254

    
255
        logger.debug("get_account_groups: %s %s", user, account)
256
        if user != account:
257
            if account not in self._allowed_accounts(user):
258
                raise NotAllowedError
259
            return {}
260
        self._lookup_account(account, True)
261
        return self.permissions.group_dict(account)
262

    
263
    @backend_method
264
    def update_account_groups(self, user, account, groups, replace=False):
265
        """Update the groups associated with the account."""
266

    
267
        logger.debug("update_account_groups: %s %s %s %s", user,
268
                     account, groups, replace)
269
        if user != account:
270
            raise NotAllowedError
271
        self._lookup_account(account, True)
272
        self._check_groups(groups)
273
        if replace:
274
            self.permissions.group_destroy(account)
275
        for k, v in groups.iteritems():
276
            if not replace:  # If not already deleted.
277
                self.permissions.group_delete(account, k)
278
            if v:
279
                self.permissions.group_addmany(account, k, v)
280

    
281
    @backend_method
282
    def get_account_policy(self, user, account):
283
        """Return a dictionary with the account policy."""
284

    
285
        logger.debug("get_account_policy: %s %s", user, account)
286
        if user != account:
287
            if account not in self._allowed_accounts(user):
288
                raise NotAllowedError
289
            return {}
290
        path, node = self._lookup_account(account, True)
291
        return self._get_policy(node)
292

    
293
    @backend_method
294
    def update_account_policy(self, user, account, policy, replace=False):
295
        """Update the policy associated with the account."""
296

    
297
        logger.debug("update_account_policy: %s %s %s %s", user,
298
                     account, policy, replace)
299
        if user != account:
300
            raise NotAllowedError
301
        path, node = self._lookup_account(account, True)
302
        self._check_policy(policy)
303
        self._put_policy(node, policy, replace)
304

    
305
    @backend_method
306
    def put_account(self, user, account, policy={}):
307
        """Create a new account with the given name."""
308

    
309
        logger.debug("put_account: %s %s %s", user, account, policy)
310
        if user != account:
311
            raise NotAllowedError
312
        node = self.node.node_lookup(account)
313
        if node is not None:
314
            raise AccountExists('Account already exists')
315
        if policy:
316
            self._check_policy(policy)
317
        node = self._put_path(user, self.ROOTNODE, account)
318
        self._put_policy(node, policy, True)
319

    
320
    @backend_method
321
    def delete_account(self, user, account):
322
        """Delete the account with the given name."""
323

    
324
        logger.debug("delete_account: %s %s", user, account)
325
        if user != account:
326
            raise NotAllowedError
327
        node = self.node.node_lookup(account)
328
        if node is None:
329
            return
330
        if not self.node.node_remove(node):
331
            raise AccountNotEmpty('Account is not empty')
332
        self.permissions.group_destroy(account)
333

    
334
    @backend_method
335
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
336
        """Return a list of containers existing under an account."""
337

    
338
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
339
                     account, marker, limit, shared, until, public)
340
        if user != account:
341
            if until or account not in self._allowed_accounts(user):
342
                raise NotAllowedError
343
            allowed = self._allowed_containers(user, account)
344
            start, limit = self._list_limits(allowed, marker, limit)
345
            return allowed[start:start + limit]
346
        if shared or public:
347
            allowed = set()
348
            if shared:
349
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
350
            if public:
351
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
352
            allowed = sorted(allowed)
353
            start, limit = self._list_limits(allowed, marker, limit)
354
            return allowed[start:start + limit]
355
        node = self.node.node_lookup(account)
356
        containers = [x[0] for x in self._list_object_properties(
357
            node, account, '', '/', marker, limit, False, None, [], until)]
358
        start, limit = self._list_limits(
359
            [x[0] for x in containers], marker, limit)
360
        return containers[start:start + limit]
361

    
362
    @backend_method
363
    def list_container_meta(self, user, account, container, domain, until=None):
364
        """Return a list with all the container's object meta keys for the domain."""
365

    
366
        logger.debug("list_container_meta: %s %s %s %s %s", user,
367
                     account, container, domain, until)
368
        allowed = []
369
        if user != account:
370
            if until:
371
                raise NotAllowedError
372
            allowed = self.permissions.access_list_paths(
373
                user, '/'.join((account, container)))
374
            if not allowed:
375
                raise NotAllowedError
376
        path, node = self._lookup_container(account, container)
377
        before = until if until is not None else inf
378
        allowed = self._get_formatted_paths(allowed)
379
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
380

    
381
    @backend_method
382
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
383
        """Return a dictionary with the container metadata for the domain."""
384

    
385
        logger.debug("get_container_meta: %s %s %s %s %s", user,
386
                     account, container, domain, until)
387
        if user != account:
388
            if until or container not in self._allowed_containers(user, account):
389
                raise NotAllowedError
390
        path, node = self._lookup_container(account, container)
391
        props = self._get_properties(node, until)
392
        mtime = props[self.MTIME]
393
        count, bytes, tstamp = self._get_statistics(node, until)
394
        tstamp = max(tstamp, mtime)
395
        if until is None:
396
            modified = tstamp
397
        else:
398
            modified = self._get_statistics(
399
                node)[2]  # Overall last modification.
400
            modified = max(modified, mtime)
401

    
402
        if user != account:
403
            meta = {'name': container}
404
        else:
405
            meta = {}
406
            if include_user_defined:
407
                meta.update(
408
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
409
            if until is not None:
410
                meta.update({'until_timestamp': tstamp})
411
            meta.update({'name': container, 'count': count, 'bytes': bytes})
412
        meta.update({'modified': modified})
413
        return meta
414

    
415
    @backend_method
416
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
417
        """Update the metadata associated with the container for the domain."""
418

    
419
        logger.debug("update_container_meta: %s %s %s %s %s %s",
420
                     user, account, container, domain, meta, replace)
421
        if user != account:
422
            raise NotAllowedError
423
        path, node = self._lookup_container(account, container)
424
        src_version_id, dest_version_id = self._put_metadata(
425
            user, node, domain, meta, replace)
426
        if src_version_id is not None:
427
            versioning = self._get_policy(node)['versioning']
428
            if versioning != 'auto':
429
                self.node.version_remove(src_version_id)
430

    
431
    @backend_method
432
    def get_container_policy(self, user, account, container):
433
        """Return a dictionary with the container policy."""
434

    
435
        logger.debug(
436
            "get_container_policy: %s %s %s", user, account, container)
437
        if user != account:
438
            if container not in self._allowed_containers(user, account):
439
                raise NotAllowedError
440
            return {}
441
        path, node = self._lookup_container(account, container)
442
        return self._get_policy(node)
443

    
444
    @backend_method
445
    def update_container_policy(self, user, account, container, policy, replace=False):
446
        """Update the policy associated with the container."""
447

    
448
        logger.debug("update_container_policy: %s %s %s %s %s",
449
                     user, account, container, policy, replace)
450
        if user != account:
451
            raise NotAllowedError
452
        path, node = self._lookup_container(account, container)
453
        self._check_policy(policy)
454
        self._put_policy(node, policy, replace)
455

    
456
    @backend_method
457
    def put_container(self, user, account, container, policy={}):
458
        """Create a new container with the given name."""
459

    
460
        logger.debug(
461
            "put_container: %s %s %s %s", user, account, container, policy)
462
        if user != account:
463
            raise NotAllowedError
464
        try:
465
            path, node = self._lookup_container(account, container)
466
        except NameError:
467
            pass
468
        else:
469
            raise ContainerExists('Container already exists')
470
        if policy:
471
            self._check_policy(policy)
472
        path = '/'.join((account, container))
473
        node = self._put_path(
474
            user, self._lookup_account(account, True)[1], path)
475
        self._put_policy(node, policy, True)
476

    
477
    @backend_method
478
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
479
        """Delete/purge the container with the given name."""
480

    
481
        logger.debug("delete_container: %s %s %s %s %s %s", user,
482
                     account, container, until, prefix, delimiter)
483
        if user != account:
484
            raise NotAllowedError
485
        path, node = self._lookup_container(account, container)
486

    
487
        if until is not None:
488
            hashes, size, serials = self.node.node_purge_children(
489
                node, until, CLUSTER_HISTORY)
490
            for h in hashes:
491
                self.store.map_delete(h)
492
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
493
            self._report_size_change(user, account, -size,
494
                                     {'action':'container purge', 'path': path,
495
                                      'versions': ','.join(str(i) for i in serials)})
496
            return
497

    
498
        if not delimiter:
499
            if self._get_statistics(node)[0] > 0:
500
                raise ContainerNotEmpty('Container is not empty')
501
            hashes, size, serials = self.node.node_purge_children(
502
                node, inf, CLUSTER_HISTORY)
503
            for h in hashes:
504
                self.store.map_delete(h)
505
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
506
            self.node.node_remove(node)
507
            self._report_size_change(user, account, -size,
508
                                     {'action': 'container delete',
509
                                      'path': path,
510
                                      'versions': ','.join(str(i) for i in serials)})
511
        else:
512
            # remove only contents
513
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
514
            paths = []
515
            for t in src_names:
516
                path = '/'.join((account, container, t[0]))
517
                node = t[2]
518
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
519
                del_size = self._apply_versioning(
520
                    account, container, src_version_id)
521
                self._report_size_change(
522
                        user, account, -del_size, {
523
                                'action': 'object delete',
524
                                'path': path,
525
                             'versions': ','.join([str(dest_version_id)])
526
                     }
527
                )
528
                self._report_object_change(
529
                    user, account, path, details={'action': 'object delete'})
530
                paths.append(path)
531
            self.permissions.access_clear_bulk(paths)
532

    
533
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
534
        if user != account and until:
535
            raise NotAllowedError
536
        if shared and public:
537
            # get shared first
538
            shared = self._list_object_permissions(
539
                user, account, container, prefix, shared=True, public=False)
540
            objects = set()
541
            if shared:
542
                path, node = self._lookup_container(account, container)
543
                shared = self._get_formatted_paths(shared)
544
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
545

    
546
            # get public
547
            objects |= set(self._list_public_object_properties(
548
                user, account, container, prefix, all_props))
549
            objects = list(objects)
550

    
551
            objects.sort(key=lambda x: x[0])
552
            start, limit = self._list_limits(
553
                [x[0] for x in objects], marker, limit)
554
            return objects[start:start + limit]
555
        elif public:
556
            objects = self._list_public_object_properties(
557
                user, account, container, prefix, all_props)
558
            start, limit = self._list_limits(
559
                [x[0] for x in objects], marker, limit)
560
            return objects[start:start + limit]
561

    
562
        allowed = self._list_object_permissions(
563
            user, account, container, prefix, shared, public)
564
        if shared and not allowed:
565
            return []
566
        path, node = self._lookup_container(account, container)
567
        allowed = self._get_formatted_paths(allowed)
568
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
569
        start, limit = self._list_limits(
570
            [x[0] for x in objects], marker, limit)
571
        return objects[start:start + limit]
572

    
573
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
574
        public = self._list_object_permissions(
575
            user, account, container, prefix, shared=False, public=True)
576
        paths, nodes = self._lookup_objects(public)
577
        path = '/'.join((account, container))
578
        cont_prefix = path + '/'
579
        paths = [x[len(cont_prefix):] for x in paths]
580
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
581
        objects = [(path,) + props for path, props in zip(paths, props)]
582
        return objects
583

    
584
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
585
        objects = []
586
        while True:
587
            marker = objects[-1] if objects else None
588
            limit = 10000
589
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
590
            objects.extend(l)
591
            if not l or len(l) < limit:
592
                break
593
        return objects
594

    
595
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
596
        allowed = []
597
        path = '/'.join((account, container, prefix)).rstrip('/')
598
        if user != account:
599
            allowed = self.permissions.access_list_paths(user, path)
600
            if not allowed:
601
                raise NotAllowedError
602
        else:
603
            allowed = set()
604
            if shared:
605
                allowed.update(self.permissions.access_list_shared(path))
606
            if public:
607
                allowed.update(
608
                    [x[0] for x in self.permissions.public_list(path)])
609
            allowed = sorted(allowed)
610
            if not allowed:
611
                return []
612
        return allowed
613

    
614
    @backend_method
615
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
616
        """Return a list of object (name, version_id) tuples existing under a container."""
617

    
618
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
619
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
620

    
621
    @backend_method
622
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
623
        """Return a list of object metadata dicts existing under a container."""
624

    
625
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
626
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
627
        objects = []
628
        for p in props:
629
            if len(p) == 2:
630
                objects.append({'subdir': p[0]})
631
            else:
632
                objects.append({'name': p[0],
633
                                'bytes': p[self.SIZE + 1],
634
                                'type': p[self.TYPE + 1],
635
                                'hash': p[self.HASH + 1],
636
                                'version': p[self.SERIAL + 1],
637
                                'version_timestamp': p[self.MTIME + 1],
638
                                'modified': p[self.MTIME + 1] if until is None else None,
639
                                'modified_by': p[self.MUSER + 1],
640
                                'uuid': p[self.UUID + 1],
641
                                'checksum': p[self.CHECKSUM + 1]})
642
        return objects
643

    
644
    @backend_method
645
    def list_object_permissions(self, user, account, container, prefix=''):
646
        """Return a list of paths that enforce permissions under a container."""
647

    
648
        logger.debug("list_object_permissions: %s %s %s %s", user,
649
                     account, container, prefix)
650
        return self._list_object_permissions(user, account, container, prefix, True, False)
651

    
652
    @backend_method
653
    def list_object_public(self, user, account, container, prefix=''):
654
        """Return a dict mapping paths to public ids for objects that are public under a container."""
655

    
656
        logger.debug("list_object_public: %s %s %s %s", user,
657
                     account, container, prefix)
658
        public = {}
659
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
660
            public[path] = p + ULTIMATE_ANSWER
661
        return public
662

    
663
    @backend_method
664
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
665
        """Return a dictionary with the object metadata for the domain."""
666

    
667
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
668
                     account, container, name, domain, version)
669
        self._can_read(user, account, container, name)
670
        path, node = self._lookup_object(account, container, name)
671
        props = self._get_version(node, version)
672
        if version is None:
673
            modified = props[self.MTIME]
674
        else:
675
            try:
676
                modified = self._get_version(
677
                    node)[self.MTIME]  # Overall last modification.
678
            except NameError:  # Object may be deleted.
679
                del_props = self.node.version_lookup(
680
                    node, inf, CLUSTER_DELETED)
681
                if del_props is None:
682
                    raise ItemNotExists('Object does not exist')
683
                modified = del_props[self.MTIME]
684

    
685
        meta = {}
686
        if include_user_defined:
687
            meta.update(
688
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
689
        meta.update({'name': name,
690
                     'bytes': props[self.SIZE],
691
                     'type': props[self.TYPE],
692
                     'hash': props[self.HASH],
693
                     'version': props[self.SERIAL],
694
                     'version_timestamp': props[self.MTIME],
695
                     'modified': modified,
696
                     'modified_by': props[self.MUSER],
697
                     'uuid': props[self.UUID],
698
                     'checksum': props[self.CHECKSUM]})
699
        return meta
700

    
701
    @backend_method
702
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
703
        """Update the metadata associated with the object for the domain and return the new version."""
704

    
705
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
706
                     user, account, container, name, domain, meta, replace)
707
        self._can_write(user, account, container, name)
708
        path, node = self._lookup_object(account, container, name)
709
        src_version_id, dest_version_id = self._put_metadata(
710
            user, node, domain, meta, replace)
711
        self._apply_versioning(account, container, src_version_id)
712
        return dest_version_id
713

    
714
    @backend_method
715
    def get_object_permissions(self, user, account, container, name):
716
        """Return the action allowed on the object, the path
717
        from which the object gets its permissions from,
718
        along with a dictionary containing the permissions."""
719

    
720
        logger.debug("get_object_permissions: %s %s %s %s", user,
721
                     account, container, name)
722
        allowed = 'write'
723
        permissions_path = self._get_permissions_path(account, container, name)
724
        if user != account:
725
            if self.permissions.access_check(permissions_path, self.WRITE, user):
726
                allowed = 'write'
727
            elif self.permissions.access_check(permissions_path, self.READ, user):
728
                allowed = 'read'
729
            else:
730
                raise NotAllowedError
731
        self._lookup_object(account, container, name)
732
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
733

    
734
    @backend_method
735
    def update_object_permissions(self, user, account, container, name, permissions):
736
        """Update the permissions associated with the object."""
737

    
738
        logger.debug("update_object_permissions: %s %s %s %s %s",
739
                     user, account, container, name, permissions)
740
        if user != account:
741
            raise NotAllowedError
742
        path = self._lookup_object(account, container, name)[0]
743
        self._check_permissions(path, permissions)
744
        self.permissions.access_set(path, permissions)
745
        self._report_sharing_change(user, account, path, {'members':
746
                                    self.permissions.access_members(path)})
747

    
748
    @backend_method
749
    def get_object_public(self, user, account, container, name):
750
        """Return the public id of the object if applicable."""
751

    
752
        logger.debug(
753
            "get_object_public: %s %s %s %s", user, account, container, name)
754
        self._can_read(user, account, container, name)
755
        path = self._lookup_object(account, container, name)[0]
756
        p = self.permissions.public_get(path)
757
        if p is not None:
758
            p += ULTIMATE_ANSWER
759
        return p
760

    
761
    @backend_method
762
    def update_object_public(self, user, account, container, name, public):
763
        """Update the public status of the object."""
764

    
765
        logger.debug("update_object_public: %s %s %s %s %s", user,
766
                     account, container, name, public)
767
        self._can_write(user, account, container, name)
768
        path = self._lookup_object(account, container, name)[0]
769
        if not public:
770
            self.permissions.public_unset(path)
771
        else:
772
            self.permissions.public_set(path)
773

    
774
    @backend_method
775
    def get_object_hashmap(self, user, account, container, name, version=None):
776
        """Return the object's size and a list with partial hashes."""
777

    
778
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
779
                     account, container, name, version)
780
        self._can_read(user, account, container, name)
781
        path, node = self._lookup_object(account, container, name)
782
        props = self._get_version(node, version)
783
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
784
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
785

    
786
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
787
        if permissions is not None and user != account:
788
            raise NotAllowedError
789
        self._can_write(user, account, container, name)
790
        if permissions is not None:
791
            path = '/'.join((account, container, name))
792
            self._check_permissions(path, permissions)
793

    
794
        account_path, account_node = self._lookup_account(account, True)
795
        container_path, container_node = self._lookup_container(
796
            account, container)
797
        path, node = self._put_object_node(
798
            container_path, container_node, name)
799
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
800

    
801
        # Handle meta.
802
        if src_version_id is None:
803
            src_version_id = pre_version_id
804
        self._put_metadata_duplicate(
805
            src_version_id, dest_version_id, domain, meta, replace_meta)
806

    
807
        del_size = self._apply_versioning(account, container, pre_version_id)
808
        size_delta = size - del_size
809
#        # Check quota.
810
#         if size_delta > 0:
811
#             account_quota = long(self._get_policy(account_node)['quota'])
812
#             container_quota = long(self._get_policy(container_node)['quota'])
813
#             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
814
#                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
815
#                 # This must be executed in a transaction, so the version is never created if it fails.
816
#                 raise QuotaError
817
        self._report_size_change(user, account, size_delta,
818
                                 {'action': 'object update', 'path': path,
819
                                  'versions': ','.join([str(dest_version_id)])})
820

    
821
        if permissions is not None:
822
            self.permissions.access_set(path, permissions)
823
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
824

    
825
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
826
        return dest_version_id
827

    
828
    @backend_method
829
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
830
        """Create/update an object with the specified size and partial hashes."""
831

    
832
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
833
                     account, container, name, size, type, hashmap, checksum)
834
        if size == 0:  # No such thing as an empty hashmap.
835
            hashmap = [self.put_block('')]
836
        map = HashMap(self.block_size, self.hash_algorithm)
837
        map.extend([binascii.unhexlify(x) for x in hashmap])
838
        missing = self.store.block_search(map)
839
        if missing:
840
            ie = IndexError()
841
            ie.data = [binascii.hexlify(x) for x in missing]
842
            raise ie
843

    
844
        hash = map.hash()
845
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
846
        self.store.map_put(hash, map)
847
        return dest_version_id
848

    
849
    @backend_method
850
    def update_object_checksum(self, user, account, container, name, version, checksum):
851
        """Update an object's checksum."""
852

    
853
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
854
                     user, account, container, name, version, checksum)
855
        # Update objects with greater version and same hashmap and size (fix metadata updates).
856
        self._can_write(user, account, container, name)
857
        path, node = self._lookup_object(account, container, name)
858
        props = self._get_version(node, version)
859
        versions = self.node.node_get_versions(node)
860
        for x in versions:
861
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
862
                self.node.version_put_property(
863
                    x[self.SERIAL], 'checksum', checksum)
864

    
865
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
866
        dest_version_ids = []
867
        self._can_read(user, src_account, src_container, src_name)
868
        path, node = self._lookup_object(src_account, src_container, src_name)
869
        # TODO: Will do another fetch of the properties in duplicate version...
870
        props = self._get_version(
871
            node, src_version)  # Check to see if source exists.
872
        src_version_id = props[self.SERIAL]
873
        hash = props[self.HASH]
874
        size = props[self.SIZE]
875
        is_copy = not is_move and (src_account, src_container, src_name) != (
876
            dest_account, dest_container, dest_name)  # New uuid.
877
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
878
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
879
            self._delete_object(user, src_account, src_container, src_name)
880

    
881
        if delimiter:
882
            prefix = src_name + \
883
                delimiter if not src_name.endswith(delimiter) else src_name
884
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
885
            src_names.sort(key=lambda x: x[2])  # order by nodes
886
            paths = [elem[0] for elem in src_names]
887
            nodes = [elem[2] for elem in src_names]
888
            # TODO: Will do another fetch of the properties in duplicate version...
889
            props = self._get_versions(nodes)  # Check to see if source exists.
890

    
891
            for prop, path, node in zip(props, paths, nodes):
892
                src_version_id = prop[self.SERIAL]
893
                hash = prop[self.HASH]
894
                vtype = prop[self.TYPE]
895
                size = prop[self.SIZE]
896
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
897
                    delimiter) else dest_name
898
                vdest_name = path.replace(prefix, dest_prefix, 1)
899
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
900
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
901
                    self._delete_object(user, src_account, src_container, path)
902
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
903

    
904
    @backend_method
905
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
906
        """Copy an object's data and metadata."""
907

    
908
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
909
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
910
        return dest_version_id
911

    
912
    @backend_method
913
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
914
        """Move an object's data and metadata."""
915

    
916
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
917
        if user != src_account:
918
            raise NotAllowedError
919
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
920
        return dest_version_id
921

    
922
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
923
        if user != account:
924
            raise NotAllowedError
925

    
926
        if until is not None:
927
            path = '/'.join((account, container, name))
928
            node = self.node.node_lookup(path)
929
            if node is None:
930
                return
931
            hashes = []
932
            size = 0
933
            serials = []
934
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
935
            hashes += h
936
            size += s
937
            serials += v
938
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
939
            hashes += h
940
            size += s
941
            serials += v
942
            for h in hashes:
943
                self.store.map_delete(h)
944
            self.node.node_purge(node, until, CLUSTER_DELETED)
945
            try:
946
                props = self._get_version(node)
947
            except NameError:
948
                self.permissions.access_clear(path)
949
            self._report_size_change(user, account, -size,
950
                                    {'action': 'object purge', 'path': path,
951
                                     'versions': ','.join(str(i) for i in serials)})
952
            return
953

    
954
        path, node = self._lookup_object(account, container, name)
955
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
956
        del_size = self._apply_versioning(account, container, src_version_id)
957
        self._report_size_change(user, account, -del_size,
958
                                 {'action': 'object delete', 'path': path,
959
                                  'versions': ','.join([str(dest_version_id)])})
960
        self._report_object_change(
961
            user, account, path, details={'action': 'object delete'})
962
        self.permissions.access_clear(path)
963

    
964
        if delimiter:
965
            prefix = name + delimiter if not name.endswith(delimiter) else name
966
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
967
            paths = []
968
            for t in src_names:
969
                path = '/'.join((account, container, t[0]))
970
                node = t[2]
971
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
972
                del_size = self._apply_versioning(
973
                    account, container, src_version_id)
974
                self._report_size_change(user, account, -del_size,
975
                                         {'action': 'object delete',
976
                                          'path': path,
977
                                          'versions': ','.join([str(dest_version_id)])})
978
                self._report_object_change(
979
                    user, account, path, details={'action': 'object delete'})
980
                paths.append(path)
981
            self.permissions.access_clear_bulk(paths)
982

    
983
    @backend_method
984
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
985
        """Delete/purge an object."""
986

    
987
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
988
                     account, container, name, until, prefix, delimiter)
989
        self._delete_object(user, account, container, name, until, delimiter)
990

    
991
    @backend_method
992
    def list_versions(self, user, account, container, name):
993
        """Return a list of all (version, version_timestamp) tuples for an object."""
994

    
995
        logger.debug(
996
            "list_versions: %s %s %s %s", user, account, container, name)
997
        self._can_read(user, account, container, name)
998
        path, node = self._lookup_object(account, container, name)
999
        versions = self.node.node_get_versions(node)
1000
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1001

    
1002
    @backend_method
1003
    def get_uuid(self, user, uuid):
1004
        """Return the (account, container, name) for the UUID given."""
1005

    
1006
        logger.debug("get_uuid: %s %s", user, uuid)
1007
        info = self.node.latest_uuid(uuid)
1008
        if info is None:
1009
            raise NameError
1010
        path, serial = info
1011
        account, container, name = path.split('/', 2)
1012
        self._can_read(user, account, container, name)
1013
        return (account, container, name)
1014

    
1015
    @backend_method
1016
    def get_public(self, user, public):
1017
        """Return the (account, container, name) for the public id given."""
1018

    
1019
        logger.debug("get_public: %s %s", user, public)
1020
        if public is None or public < ULTIMATE_ANSWER:
1021
            raise NameError
1022
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1023
        if path is None:
1024
            raise NameError
1025
        account, container, name = path.split('/', 2)
1026
        self._can_read(user, account, container, name)
1027
        return (account, container, name)
1028

    
1029
    @backend_method(autocommit=0)
1030
    def get_block(self, hash):
1031
        """Return a block's data."""
1032

    
1033
        logger.debug("get_block: %s", hash)
1034
        block = self.store.block_get(binascii.unhexlify(hash))
1035
        if not block:
1036
            raise ItemNotExists('Block does not exist')
1037
        return block
1038

    
1039
    @backend_method(autocommit=0)
1040
    def put_block(self, data):
1041
        """Store a block and return the hash."""
1042

    
1043
        logger.debug("put_block: %s", len(data))
1044
        return binascii.hexlify(self.store.block_put(data))
1045

    
1046
    @backend_method(autocommit=0)
1047
    def update_block(self, hash, data, offset=0):
1048
        """Update a known block and return the hash."""
1049

    
1050
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1051
        if offset == 0 and len(data) == self.block_size:
1052
            return self.put_block(data)
1053
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1054
        return binascii.hexlify(h)
1055

    
1056
    # Path functions.
1057

    
1058
    def _generate_uuid(self):
1059
        return str(uuidlib.uuid4())
1060

    
1061
    def _put_object_node(self, path, parent, name):
1062
        path = '/'.join((path, name))
1063
        node = self.node.node_lookup(path)
1064
        if node is None:
1065
            node = self.node.node_create(parent, path)
1066
        return path, node
1067

    
1068
    def _put_path(self, user, parent, path):
1069
        node = self.node.node_create(parent, path)
1070
        self.node.version_create(node, None, 0, '', None, user,
1071
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1072
        return node
1073

    
1074
    def _lookup_account(self, account, create=True):
1075
        node = self.node.node_lookup(account)
1076
        if node is None and create:
1077
            node = self._put_path(
1078
                account, self.ROOTNODE, account)  # User is account.
1079
        return account, node
1080

    
1081
    def _lookup_container(self, account, container):
1082
        path = '/'.join((account, container))
1083
        node = self.node.node_lookup(path)
1084
        if node is None:
1085
            raise ItemNotExists('Container does not exist')
1086
        return path, node
1087

    
1088
    def _lookup_object(self, account, container, name):
1089
        path = '/'.join((account, container, name))
1090
        node = self.node.node_lookup(path)
1091
        if node is None:
1092
            raise ItemNotExists('Object does not exist')
1093
        return path, node
1094

    
1095
    def _lookup_objects(self, paths):
1096
        nodes = self.node.node_lookup_bulk(paths)
1097
        return paths, nodes
1098

    
1099
    def _get_properties(self, node, until=None):
1100
        """Return properties until the timestamp given."""
1101

    
1102
        before = until if until is not None else inf
1103
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1104
        if props is None and until is not None:
1105
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1106
        if props is None:
1107
            raise ItemNotExists('Path does not exist')
1108
        return props
1109

    
1110
    def _get_statistics(self, node, until=None):
1111
        """Return count, sum of size and latest timestamp of everything under node."""
1112

    
1113
        if until is None:
1114
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1115
        else:
1116
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1117
        if stats is None:
1118
            stats = (0, 0, 0)
1119
        return stats
1120

    
1121
    def _get_version(self, node, version=None):
1122
        if version is None:
1123
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1124
            if props is None:
1125
                raise ItemNotExists('Object does not exist')
1126
        else:
1127
            try:
1128
                version = int(version)
1129
            except ValueError:
1130
                raise VersionNotExists('Version does not exist')
1131
            props = self.node.version_get_properties(version)
1132
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1133
                raise VersionNotExists('Version does not exist')
1134
        return props
1135

    
1136
    def _get_versions(self, nodes):
1137
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1138

    
1139
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1140
        """Create a new version of the node."""
1141

    
1142
        props = self.node.version_lookup(
1143
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1144
        if props is not None:
1145
            src_version_id = props[self.SERIAL]
1146
            src_hash = props[self.HASH]
1147
            src_size = props[self.SIZE]
1148
            src_type = props[self.TYPE]
1149
            src_checksum = props[self.CHECKSUM]
1150
        else:
1151
            src_version_id = None
1152
            src_hash = None
1153
            src_size = 0
1154
            src_type = ''
1155
            src_checksum = ''
1156
        if size is None:  # Set metadata.
1157
            hash = src_hash  # This way hash can be set to None (account or container).
1158
            size = src_size
1159
        if type is None:
1160
            type = src_type
1161
        if checksum is None:
1162
            checksum = src_checksum
1163
        uuid = self._generate_uuid(
1164
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1165

    
1166
        if src_node is None:
1167
            pre_version_id = src_version_id
1168
        else:
1169
            pre_version_id = None
1170
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1171
            if props is not None:
1172
                pre_version_id = props[self.SERIAL]
1173
        if pre_version_id is not None:
1174
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1175

    
1176
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1177
        return pre_version_id, dest_version_id
1178

    
1179
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1180
        if src_version_id is not None:
1181
            self.node.attribute_copy(src_version_id, dest_version_id)
1182
        if not replace:
1183
            self.node.attribute_del(dest_version_id, domain, (
1184
                k for k, v in meta.iteritems() if v == ''))
1185
            self.node.attribute_set(dest_version_id, domain, (
1186
                (k, v) for k, v in meta.iteritems() if v != ''))
1187
        else:
1188
            self.node.attribute_del(dest_version_id, domain)
1189
            self.node.attribute_set(dest_version_id, domain, ((
1190
                k, v) for k, v in meta.iteritems()))
1191

    
1192
    def _put_metadata(self, user, node, domain, meta, replace=False):
1193
        """Create a new version and store metadata."""
1194

    
1195
        src_version_id, dest_version_id = self._put_version_duplicate(
1196
            user, node)
1197
        self._put_metadata_duplicate(
1198
            src_version_id, dest_version_id, domain, meta, replace)
1199
        return src_version_id, dest_version_id
1200

    
1201
    def _list_limits(self, listing, marker, limit):
1202
        start = 0
1203
        if marker:
1204
            try:
1205
                start = listing.index(marker) + 1
1206
            except ValueError:
1207
                pass
1208
        if not limit or limit > 10000:
1209
            limit = 10000
1210
        return start, limit
1211

    
1212
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1213
        cont_prefix = path + '/'
1214
        prefix = cont_prefix + prefix
1215
        start = cont_prefix + marker if marker else None
1216
        before = until if until is not None else inf
1217
        filterq = keys if domain else []
1218
        sizeq = size_range
1219

    
1220
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1221
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1222
        objects.sort(key=lambda x: x[0])
1223
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1224
        return objects
1225

    
1226
    # Reporting functions.
1227

    
1228
    def _report_size_change(self, user, account, size, details={}):
1229
        account_node = self._lookup_account(account, True)[1]
1230
        total = self._get_statistics(account_node)[1]
1231
        details.update({'user': user, 'total': total})
1232
        logger.debug(
1233
            "_report_size_change: %s %s %s %s", user, account, size, details)
1234
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1235
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1236
                              float(size), details))
1237

    
1238
    def _report_object_change(self, user, account, path, details={}):
1239
        details.update({'user': user})
1240
        logger.debug("_report_object_change: %s %s %s %s", user,
1241
                     account, path, details)
1242
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1243
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1244

    
1245
    def _report_sharing_change(self, user, account, path, details={}):
1246
        logger.debug("_report_permissions_change: %s %s %s %s",
1247
                     user, account, path, details)
1248
        details.update({'user': user})
1249
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1250
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1251

    
1252
    # Policy functions.
1253

    
1254
    def _check_policy(self, policy):
1255
        for k in policy.keys():
1256
            if policy[k] == '':
1257
                policy[k] = self.default_policy.get(k)
1258
        for k, v in policy.iteritems():
1259
            if k == 'quota':
1260
                q = int(v)  # May raise ValueError.
1261
                if q < 0:
1262
                    raise ValueError
1263
            elif k == 'versioning':
1264
                if v not in ['auto', 'none']:
1265
                    raise ValueError
1266
            else:
1267
                raise ValueError
1268

    
1269
    def _put_policy(self, node, policy, replace):
1270
        if replace:
1271
            for k, v in self.default_policy.iteritems():
1272
                if k not in policy:
1273
                    policy[k] = v
1274
        self.node.policy_set(node, policy)
1275

    
1276
    def _get_policy(self, node):
1277
        policy = self.default_policy.copy()
1278
        policy.update(self.node.policy_get(node))
1279
        return policy
1280

    
1281
    def _apply_versioning(self, account, container, version_id):
1282
        """Delete the provided version if such is the policy.
1283
           Return size of object removed.
1284
        """
1285

    
1286
        if version_id is None:
1287
            return 0
1288
        path, node = self._lookup_container(account, container)
1289
        versioning = self._get_policy(node)['versioning']
1290
        if versioning != 'auto' or self.free_versioning:
1291
            hash, size = self.node.version_remove(version_id)
1292
            self.store.map_delete(hash)
1293
            return size
1294
        return 0
1295

    
1296
    # Access control functions.
1297

    
1298
    def _check_groups(self, groups):
1299
        # raise ValueError('Bad characters in groups')
1300
        pass
1301

    
1302
    def _check_permissions(self, path, permissions):
1303
        # raise ValueError('Bad characters in permissions')
1304
        pass
1305

    
1306
    def _get_formatted_paths(self, paths):
1307
        formatted = []
1308
        for p in paths:
1309
            node = self.node.node_lookup(p)
1310
            props = None
1311
            if node is not None:
1312
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1313
            if props is not None:
1314
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1315
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1316
                formatted.append((p, self.MATCH_EXACT))
1317
        return formatted
1318

    
1319
    def _get_permissions_path(self, account, container, name):
1320
        path = '/'.join((account, container, name))
1321
        permission_paths = self.permissions.access_inherit(path)
1322
        permission_paths.sort()
1323
        permission_paths.reverse()
1324
        for p in permission_paths:
1325
            if p == path:
1326
                return p
1327
            else:
1328
                if p.count('/') < 2:
1329
                    continue
1330
                node = self.node.node_lookup(p)
1331
                if node is not None:
1332
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1333
                if props is not None:
1334
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1335
                        return p
1336
        return None
1337

    
1338
    def _can_read(self, user, account, container, name):
1339
        if user == account:
1340
            return True
1341
        path = '/'.join((account, container, name))
1342
        if self.permissions.public_get(path) is not None:
1343
            return True
1344
        path = self._get_permissions_path(account, container, name)
1345
        if not path:
1346
            raise NotAllowedError
1347
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1348
            raise NotAllowedError
1349

    
1350
    def _can_write(self, user, account, container, name):
1351
        if user == account:
1352
            return True
1353
        path = '/'.join((account, container, name))
1354
        path = self._get_permissions_path(account, container, name)
1355
        if not path:
1356
            raise NotAllowedError
1357
        if not self.permissions.access_check(path, self.WRITE, user):
1358
            raise NotAllowedError
1359

    
1360
    def _allowed_accounts(self, user):
1361
        allow = set()
1362
        for path in self.permissions.access_list_paths(user):
1363
            allow.add(path.split('/', 1)[0])
1364
        return sorted(allow)
1365

    
1366
    def _allowed_containers(self, user, account):
1367
        allow = set()
1368
        for path in self.permissions.access_list_paths(user, account):
1369
            allow.add(path.split('/', 2)[1])
1370
        return sorted(allow)