Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 094e8815

History | View | Annotate | Download (59.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
83
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
84

    
85
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
86
QUEUE_CLIENT_ID = 'pithos'
87
QUEUE_INSTANCE_ID = '1'
88

    
89
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
90

    
91
inf = float('inf')
92

    
93
ULTIMATE_ANSWER = 42
94

    
95

    
96
logger = logging.getLogger(__name__)
97

    
98

    
99
def backend_method(func=None, autocommit=1):
100
    if func is None:
101
        def fn(func):
102
            return backend_method(func, autocommit)
103
        return fn
104

    
105
    if not autocommit:
106
        return func
107

    
108
    def fn(self, *args, **kw):
109
        self.wrapper.execute()
110
        try:
111
            self.messages = []
112
            ret = func(self, *args, **kw)
113
            for m in self.messages:
114
                self.queue.send(*m)
115
            self.wrapper.commit()
116
            return ret
117
        except:
118
            self.wrapper.rollback()
119
            raise
120
    return fn
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 queue_module=None, queue_hosts=None,
132
                 queue_exchange=None):
133
        db_module = db_module or DEFAULT_DB_MODULE
134
        db_connection = db_connection or DEFAULT_DB_CONNECTION
135
        block_module = block_module or DEFAULT_BLOCK_MODULE
136
        block_path = block_path or DEFAULT_BLOCK_PATH
137
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
138
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
139
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
140
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
141
                
142
        self.hash_algorithm = 'sha256'
143
        self.block_size = 4 * 1024 * 1024  # 4MB
144

    
145
        self.default_policy = {'quota': DEFAULT_QUOTA,
146
                               'versioning': DEFAULT_VERSIONING}
147

    
148
        def load_module(m):
149
            __import__(m)
150
            return sys.modules[m]
151

    
152
        self.db_module = load_module(db_module)
153
        self.wrapper = self.db_module.DBWrapper(db_connection)
154
        params = {'wrapper': self.wrapper}
155
        self.permissions = self.db_module.Permissions(**params)
156
        self.config = self.db_module.Config(**params)
157
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
158
        for x in ['READ', 'WRITE']:
159
            setattr(self, x, getattr(self.db_module, x))
160
        self.node = self.db_module.Node(**params)
161
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
162
            setattr(self, x, getattr(self.db_module, x))
163

    
164
        self.block_module = load_module(block_module)
165
        params = {'path': block_path,
166
                  'block_size': self.block_size,
167
                  'hash_algorithm': self.hash_algorithm,
168
                  'umask': block_umask}
169
        self.store = self.block_module.Store(**params)
170

    
171
        if queue_module and queue_hosts:
172
            self.queue_module = load_module(queue_module)
173
            params = {'hosts': queue_hosts,
174
                              'exchange': queue_exchange,
175
                      'client_id': QUEUE_CLIENT_ID}
176
            self.queue = self.queue_module.Queue(**params)
177
        else:
178
            class NoQueue:
179
                def send(self, *args):
180
                    pass
181

    
182
                def close(self):
183
                    pass
184

    
185
            self.queue = NoQueue()
186

    
187
    def close(self):
188
        self.wrapper.close()
189
        self.queue.close()
190

    
191
    @backend_method
192
    def list_accounts(self, user, marker=None, limit=10000):
193
        """Return a list of accounts the user can access."""
194

    
195
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
196
        allowed = self._allowed_accounts(user)
197
        start, limit = self._list_limits(allowed, marker, limit)
198
        return allowed[start:start + limit]
199

    
200
    @backend_method
201
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
202
        """Return a dictionary with the account metadata for the domain."""
203

    
204
        logger.debug(
205
            "get_account_meta: %s %s %s %s", user, account, domain, until)
206
        path, node = self._lookup_account(account, user == account)
207
        if user != account:
208
            if until or node is None or account not in self._allowed_accounts(user):
209
                raise NotAllowedError
210
        try:
211
            props = self._get_properties(node, until)
212
            mtime = props[self.MTIME]
213
        except NameError:
214
            props = None
215
            mtime = until
216
        count, bytes, tstamp = self._get_statistics(node, until)
217
        tstamp = max(tstamp, mtime)
218
        if until is None:
219
            modified = tstamp
220
        else:
221
            modified = self._get_statistics(
222
                node)[2]  # Overall last modification.
223
            modified = max(modified, mtime)
224

    
225
        if user != account:
226
            meta = {'name': account}
227
        else:
228
            meta = {}
229
            if props is not None and include_user_defined:
230
                meta.update(
231
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
232
            if until is not None:
233
                meta.update({'until_timestamp': tstamp})
234
            meta.update({'name': account, 'count': count, 'bytes': bytes})
235
        meta.update({'modified': modified})
236
        return meta
237

    
238
    @backend_method
239
    def update_account_meta(self, user, account, domain, meta, replace=False):
240
        """Update the metadata associated with the account for the domain."""
241

    
242
        logger.debug("update_account_meta: %s %s %s %s %s", user,
243
                     account, domain, meta, replace)
244
        if user != account:
245
            raise NotAllowedError
246
        path, node = self._lookup_account(account, True)
247
        self._put_metadata(user, node, domain, meta, replace)
248

    
249
    @backend_method
250
    def get_account_groups(self, user, account):
251
        """Return a dictionary with the user groups defined for this account."""
252

    
253
        logger.debug("get_account_groups: %s %s", user, account)
254
        if user != account:
255
            if account not in self._allowed_accounts(user):
256
                raise NotAllowedError
257
            return {}
258
        self._lookup_account(account, True)
259
        return self.permissions.group_dict(account)
260

    
261
    @backend_method
262
    def update_account_groups(self, user, account, groups, replace=False):
263
        """Update the groups associated with the account."""
264

    
265
        logger.debug("update_account_groups: %s %s %s %s", user,
266
                     account, groups, replace)
267
        if user != account:
268
            raise NotAllowedError
269
        self._lookup_account(account, True)
270
        self._check_groups(groups)
271
        if replace:
272
            self.permissions.group_destroy(account)
273
        for k, v in groups.iteritems():
274
            if not replace:  # If not already deleted.
275
                self.permissions.group_delete(account, k)
276
            if v:
277
                self.permissions.group_addmany(account, k, v)
278

    
279
    @backend_method
280
    def get_account_policy(self, user, account):
281
        """Return a dictionary with the account policy."""
282

    
283
        logger.debug("get_account_policy: %s %s", user, account)
284
        if user != account:
285
            if account not in self._allowed_accounts(user):
286
                raise NotAllowedError
287
            return {}
288
        path, node = self._lookup_account(account, True)
289
        return self._get_policy(node)
290

    
291
    @backend_method
292
    def update_account_policy(self, user, account, policy, replace=False):
293
        """Update the policy associated with the account."""
294

    
295
        logger.debug("update_account_policy: %s %s %s %s", user,
296
                     account, policy, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, node = self._lookup_account(account, True)
300
        self._check_policy(policy)
301
        self._put_policy(node, policy, replace)
302

    
303
    @backend_method
304
    def put_account(self, user, account, policy={}):
305
        """Create a new account with the given name."""
306

    
307
        logger.debug("put_account: %s %s %s", user, account, policy)
308
        if user != account:
309
            raise NotAllowedError
310
        node = self.node.node_lookup(account)
311
        if node is not None:
312
            raise AccountExists('Account already exists')
313
        if policy:
314
            self._check_policy(policy)
315
        node = self._put_path(user, self.ROOTNODE, account)
316
        self._put_policy(node, policy, True)
317

    
318
    @backend_method
319
    def delete_account(self, user, account):
320
        """Delete the account with the given name."""
321

    
322
        logger.debug("delete_account: %s %s", user, account)
323
        if user != account:
324
            raise NotAllowedError
325
        node = self.node.node_lookup(account)
326
        if node is None:
327
            return
328
        if not self.node.node_remove(node):
329
            raise AccountNotEmpty('Account is not empty')
330
        self.permissions.group_destroy(account)
331

    
332
    @backend_method
333
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
334
        """Return a list of containers existing under an account."""
335

    
336
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
337
                     account, marker, limit, shared, until, public)
338
        if user != account:
339
            if until or account not in self._allowed_accounts(user):
340
                raise NotAllowedError
341
            allowed = self._allowed_containers(user, account)
342
            start, limit = self._list_limits(allowed, marker, limit)
343
            return allowed[start:start + limit]
344
        if shared or public:
345
            allowed = set()
346
            if shared:
347
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
348
            if public:
349
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
350
            allowed = sorted(allowed)
351
            start, limit = self._list_limits(allowed, marker, limit)
352
            return allowed[start:start + limit]
353
        node = self.node.node_lookup(account)
354
        containers = [x[0] for x in self._list_object_properties(
355
            node, account, '', '/', marker, limit, False, None, [], until)]
356
        start, limit = self._list_limits(
357
            [x[0] for x in containers], marker, limit)
358
        return containers[start:start + limit]
359

    
360
    @backend_method
361
    def list_container_meta(self, user, account, container, domain, until=None):
362
        """Return a list with all the container's object meta keys for the domain."""
363

    
364
        logger.debug("list_container_meta: %s %s %s %s %s", user,
365
                     account, container, domain, until)
366
        allowed = []
367
        if user != account:
368
            if until:
369
                raise NotAllowedError
370
            allowed = self.permissions.access_list_paths(
371
                user, '/'.join((account, container)))
372
            if not allowed:
373
                raise NotAllowedError
374
        path, node = self._lookup_container(account, container)
375
        before = until if until is not None else inf
376
        allowed = self._get_formatted_paths(allowed)
377
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
378

    
379
    @backend_method
380
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
381
        """Return a dictionary with the container metadata for the domain."""
382

    
383
        logger.debug("get_container_meta: %s %s %s %s %s", user,
384
                     account, container, domain, until)
385
        if user != account:
386
            if until or container not in self._allowed_containers(user, account):
387
                raise NotAllowedError
388
        path, node = self._lookup_container(account, container)
389
        props = self._get_properties(node, until)
390
        mtime = props[self.MTIME]
391
        count, bytes, tstamp = self._get_statistics(node, until)
392
        tstamp = max(tstamp, mtime)
393
        if until is None:
394
            modified = tstamp
395
        else:
396
            modified = self._get_statistics(
397
                node)[2]  # Overall last modification.
398
            modified = max(modified, mtime)
399

    
400
        if user != account:
401
            meta = {'name': container}
402
        else:
403
            meta = {}
404
            if include_user_defined:
405
                meta.update(
406
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
407
            if until is not None:
408
                meta.update({'until_timestamp': tstamp})
409
            meta.update({'name': container, 'count': count, 'bytes': bytes})
410
        meta.update({'modified': modified})
411
        return meta
412

    
413
    @backend_method
414
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
415
        """Update the metadata associated with the container for the domain."""
416

    
417
        logger.debug("update_container_meta: %s %s %s %s %s %s",
418
                     user, account, container, domain, meta, replace)
419
        if user != account:
420
            raise NotAllowedError
421
        path, node = self._lookup_container(account, container)
422
        src_version_id, dest_version_id = self._put_metadata(
423
            user, node, domain, meta, replace)
424
        if src_version_id is not None:
425
            versioning = self._get_policy(node)['versioning']
426
            if versioning != 'auto':
427
                self.node.version_remove(src_version_id)
428

    
429
    @backend_method
430
    def get_container_policy(self, user, account, container):
431
        """Return a dictionary with the container policy."""
432

    
433
        logger.debug(
434
            "get_container_policy: %s %s %s", user, account, container)
435
        if user != account:
436
            if container not in self._allowed_containers(user, account):
437
                raise NotAllowedError
438
            return {}
439
        path, node = self._lookup_container(account, container)
440
        return self._get_policy(node)
441

    
442
    @backend_method
443
    def update_container_policy(self, user, account, container, policy, replace=False):
444
        """Update the policy associated with the container."""
445

    
446
        logger.debug("update_container_policy: %s %s %s %s %s",
447
                     user, account, container, policy, replace)
448
        if user != account:
449
            raise NotAllowedError
450
        path, node = self._lookup_container(account, container)
451
        self._check_policy(policy)
452
        self._put_policy(node, policy, replace)
453

    
454
    @backend_method
455
    def put_container(self, user, account, container, policy={}):
456
        """Create a new container with the given name."""
457

    
458
        logger.debug(
459
            "put_container: %s %s %s %s", user, account, container, policy)
460
        if user != account:
461
            raise NotAllowedError
462
        try:
463
            path, node = self._lookup_container(account, container)
464
        except NameError:
465
            pass
466
        else:
467
            raise ContainerExists('Container already exists')
468
        if policy:
469
            self._check_policy(policy)
470
        path = '/'.join((account, container))
471
        node = self._put_path(
472
            user, self._lookup_account(account, True)[1], path)
473
        self._put_policy(node, policy, True)
474

    
475
    @backend_method
476
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
477
        """Delete/purge the container with the given name."""
478

    
479
        logger.debug("delete_container: %s %s %s %s %s %s", user,
480
                     account, container, until, prefix, delimiter)
481
        if user != account:
482
            raise NotAllowedError
483
        path, node = self._lookup_container(account, container)
484

    
485
        if until is not None:
486
            hashes, size, serials = self.node.node_purge_children(
487
                node, until, CLUSTER_HISTORY)
488
            for h in hashes:
489
                self.store.map_delete(h)
490
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
491
            self._report_size_change(user, account, -size,
492
                                                             {'action':'container purge', 'path': path,
493
                                                              'versions': ','.join(str(i) for i in serials)})
494
            return
495

    
496
        if not delimiter:
497
            if self._get_statistics(node)[0] > 0:
498
                raise ContainerNotEmpty('Container is not empty')
499
            hashes, size, serials = self.node.node_purge_children(
500
                node, inf, CLUSTER_HISTORY)
501
            for h in hashes:
502
                self.store.map_delete(h)
503
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
504
            self.node.node_remove(node)
505
            self._report_size_change(user, account, -size,
506
                                                             {'action': 'container delete',
507
                                                              'path': path,
508
                                                              'versions': ','.join(str(i) for i in serials)})
509
        else:
510
                # remove only contents
511
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
512
            paths = []
513
            for t in src_names:
514
                path = '/'.join((account, container, t[0]))
515
                node = t[2]
516
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
517
                del_size = self._apply_versioning(
518
                    account, container, src_version_id)
519
                if del_size:
520
                    self._report_size_change(user, account, -del_size,
521
                                                                     {'action': 'object delete',
522
                                                                      'path': path,
523
                                                                      'versions': ','.join([str(dest_version_id)])})
524
                self._report_object_change(
525
                    user, account, path, details={'action': 'object delete'})
526
                paths.append(path)
527
            self.permissions.access_clear_bulk(paths)
528

    
529
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
530
        if user != account and until:
531
            raise NotAllowedError
532
        if shared and public:
533
            # get shared first
534
            shared = self._list_object_permissions(
535
                user, account, container, prefix, shared=True, public=False)
536
            objects = set()
537
            if shared:
538
                path, node = self._lookup_container(account, container)
539
                shared = self._get_formatted_paths(shared)
540
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
541

    
542
            # get public
543
            objects |= set(self._list_public_object_properties(
544
                user, account, container, prefix, all_props))
545
            objects = list(objects)
546

    
547
            objects.sort(key=lambda x: x[0])
548
            start, limit = self._list_limits(
549
                [x[0] for x in objects], marker, limit)
550
            return objects[start:start + limit]
551
        elif public:
552
            objects = self._list_public_object_properties(
553
                user, account, container, prefix, all_props)
554
            start, limit = self._list_limits(
555
                [x[0] for x in objects], marker, limit)
556
            return objects[start:start + limit]
557

    
558
        allowed = self._list_object_permissions(
559
            user, account, container, prefix, shared, public)
560
        if shared and not allowed:
561
            return []
562
        path, node = self._lookup_container(account, container)
563
        allowed = self._get_formatted_paths(allowed)
564
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
565
        start, limit = self._list_limits(
566
            [x[0] for x in objects], marker, limit)
567
        return objects[start:start + limit]
568

    
569
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
570
        public = self._list_object_permissions(
571
            user, account, container, prefix, shared=False, public=True)
572
        paths, nodes = self._lookup_objects(public)
573
        path = '/'.join((account, container))
574
        cont_prefix = path + '/'
575
        paths = [x[len(cont_prefix):] for x in paths]
576
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
577
        objects = [(path,) + props for path, props in zip(paths, props)]
578
        return objects
579

    
580
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
581
        objects = []
582
        while True:
583
            marker = objects[-1] if objects else None
584
            limit = 10000
585
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
586
            objects.extend(l)
587
            if not l or len(l) < limit:
588
                break
589
        return objects
590

    
591
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
592
        allowed = []
593
        path = '/'.join((account, container, prefix)).rstrip('/')
594
        if user != account:
595
            allowed = self.permissions.access_list_paths(user, path)
596
            if not allowed:
597
                raise NotAllowedError
598
        else:
599
            allowed = set()
600
            if shared:
601
                allowed.update(self.permissions.access_list_shared(path))
602
            if public:
603
                allowed.update(
604
                    [x[0] for x in self.permissions.public_list(path)])
605
            allowed = sorted(allowed)
606
            if not allowed:
607
                return []
608
        return allowed
609

    
610
    @backend_method
611
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
612
        """Return a list of object (name, version_id) tuples existing under a container."""
613

    
614
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
615
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
616

    
617
    @backend_method
618
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
619
        """Return a list of object metadata dicts existing under a container."""
620

    
621
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
622
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
623
        objects = []
624
        for p in props:
625
            if len(p) == 2:
626
                objects.append({'subdir': p[0]})
627
            else:
628
                objects.append({'name': p[0],
629
                                'bytes': p[self.SIZE + 1],
630
                                'type': p[self.TYPE + 1],
631
                                'hash': p[self.HASH + 1],
632
                                'version': p[self.SERIAL + 1],
633
                                'version_timestamp': p[self.MTIME + 1],
634
                                'modified': p[self.MTIME + 1] if until is None else None,
635
                                'modified_by': p[self.MUSER + 1],
636
                                'uuid': p[self.UUID + 1],
637
                                'checksum': p[self.CHECKSUM + 1]})
638
        return objects
639

    
640
    @backend_method
641
    def list_object_permissions(self, user, account, container, prefix=''):
642
        """Return a list of paths that enforce permissions under a container."""
643

    
644
        logger.debug("list_object_permissions: %s %s %s %s", user,
645
                     account, container, prefix)
646
        return self._list_object_permissions(user, account, container, prefix, True, False)
647

    
648
    @backend_method
649
    def list_object_public(self, user, account, container, prefix=''):
650
        """Return a dict mapping paths to public ids for objects that are public under a container."""
651

    
652
        logger.debug("list_object_public: %s %s %s %s", user,
653
                     account, container, prefix)
654
        public = {}
655
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
656
            public[path] = p + ULTIMATE_ANSWER
657
        return public
658

    
659
    @backend_method
660
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
661
        """Return a dictionary with the object metadata for the domain."""
662

    
663
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
664
                     account, container, name, domain, version)
665
        self._can_read(user, account, container, name)
666
        path, node = self._lookup_object(account, container, name)
667
        props = self._get_version(node, version)
668
        if version is None:
669
            modified = props[self.MTIME]
670
        else:
671
            try:
672
                modified = self._get_version(
673
                    node)[self.MTIME]  # Overall last modification.
674
            except NameError:  # Object may be deleted.
675
                del_props = self.node.version_lookup(
676
                    node, inf, CLUSTER_DELETED)
677
                if del_props is None:
678
                    raise ItemNotExists('Object does not exist')
679
                modified = del_props[self.MTIME]
680

    
681
        meta = {}
682
        if include_user_defined:
683
            meta.update(
684
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
685
        meta.update({'name': name,
686
                     'bytes': props[self.SIZE],
687
                     'type': props[self.TYPE],
688
                     'hash': props[self.HASH],
689
                     'version': props[self.SERIAL],
690
                     'version_timestamp': props[self.MTIME],
691
                     'modified': modified,
692
                     'modified_by': props[self.MUSER],
693
                     'uuid': props[self.UUID],
694
                     'checksum': props[self.CHECKSUM]})
695
        return meta
696

    
697
    @backend_method
698
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
699
        """Update the metadata associated with the object for the domain and return the new version."""
700

    
701
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
702
                     user, account, container, name, domain, meta, replace)
703
        self._can_write(user, account, container, name)
704
        path, node = self._lookup_object(account, container, name)
705
        src_version_id, dest_version_id = self._put_metadata(
706
            user, node, domain, meta, replace)
707
        self._apply_versioning(account, container, src_version_id)
708
        return dest_version_id
709

    
710
    @backend_method
711
    def get_object_permissions(self, user, account, container, name):
712
        """Return the action allowed on the object, the path
713
        from which the object gets its permissions from,
714
        along with a dictionary containing the permissions."""
715

    
716
        logger.debug("get_object_permissions: %s %s %s %s", user,
717
                     account, container, name)
718
        allowed = 'write'
719
        permissions_path = self._get_permissions_path(account, container, name)
720
        if user != account:
721
            if self.permissions.access_check(permissions_path, self.WRITE, user):
722
                allowed = 'write'
723
            elif self.permissions.access_check(permissions_path, self.READ, user):
724
                allowed = 'read'
725
            else:
726
                raise NotAllowedError
727
        self._lookup_object(account, container, name)
728
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
729

    
730
    @backend_method
731
    def update_object_permissions(self, user, account, container, name, permissions):
732
        """Update the permissions associated with the object."""
733

    
734
        logger.debug("update_object_permissions: %s %s %s %s %s",
735
                     user, account, container, name, permissions)
736
        if user != account:
737
            raise NotAllowedError
738
        path = self._lookup_object(account, container, name)[0]
739
        self._check_permissions(path, permissions)
740
        self.permissions.access_set(path, permissions)
741
        self._report_sharing_change(user, account, path, {'members':
742
                                    self.permissions.access_members(path)})
743

    
744
    @backend_method
745
    def get_object_public(self, user, account, container, name):
746
        """Return the public id of the object if applicable."""
747

    
748
        logger.debug(
749
            "get_object_public: %s %s %s %s", user, account, container, name)
750
        self._can_read(user, account, container, name)
751
        path = self._lookup_object(account, container, name)[0]
752
        p = self.permissions.public_get(path)
753
        if p is not None:
754
            p += ULTIMATE_ANSWER
755
        return p
756

    
757
    @backend_method
758
    def update_object_public(self, user, account, container, name, public):
759
        """Update the public status of the object."""
760

    
761
        logger.debug("update_object_public: %s %s %s %s %s", user,
762
                     account, container, name, public)
763
        self._can_write(user, account, container, name)
764
        path = self._lookup_object(account, container, name)[0]
765
        if not public:
766
            self.permissions.public_unset(path)
767
        else:
768
            self.permissions.public_set(path)
769

    
770
    @backend_method
771
    def get_object_hashmap(self, user, account, container, name, version=None):
772
        """Return the object's size and a list with partial hashes."""
773

    
774
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
775
                     account, container, name, version)
776
        self._can_read(user, account, container, name)
777
        path, node = self._lookup_object(account, container, name)
778
        props = self._get_version(node, version)
779
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
780
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
781

    
782
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
783
        if permissions is not None and user != account:
784
            raise NotAllowedError
785
        self._can_write(user, account, container, name)
786
        if permissions is not None:
787
            path = '/'.join((account, container, name))
788
            self._check_permissions(path, permissions)
789

    
790
        account_path, account_node = self._lookup_account(account, True)
791
        container_path, container_node = self._lookup_container(
792
            account, container)
793
        path, node = self._put_object_node(
794
            container_path, container_node, name)
795
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
796

    
797
        # Handle meta.
798
        if src_version_id is None:
799
            src_version_id = pre_version_id
800
        self._put_metadata_duplicate(
801
            src_version_id, dest_version_id, domain, meta, replace_meta)
802

    
803
        # Check quota.
804
        del_size = self._apply_versioning(account, container, pre_version_id)
805
        size_delta = size - del_size
806
        if size_delta > 0:
807
            account_quota = long(self._get_policy(account_node)['quota'])
808
            container_quota = long(self._get_policy(container_node)['quota'])
809
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
810
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
811
                # This must be executed in a transaction, so the version is never created if it fails.
812
                raise QuotaError
813
        self._report_size_change(user, account, size_delta,
814
                                                         {'action': 'object update', 'path': path,
815
                                                          'versions': ','.join([str(dest_version_id)])})
816

    
817
        if permissions is not None:
818
            self.permissions.access_set(path, permissions)
819
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
820

    
821
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
822
        return dest_version_id
823

    
824
    @backend_method
825
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
826
        """Create/update an object with the specified size and partial hashes."""
827

    
828
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
829
                     account, container, name, size, type, hashmap, checksum)
830
        if size == 0:  # No such thing as an empty hashmap.
831
            hashmap = [self.put_block('')]
832
        map = HashMap(self.block_size, self.hash_algorithm)
833
        map.extend([binascii.unhexlify(x) for x in hashmap])
834
        missing = self.store.block_search(map)
835
        if missing:
836
            ie = IndexError()
837
            ie.data = [binascii.hexlify(x) for x in missing]
838
            raise ie
839

    
840
        hash = map.hash()
841
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
842
        self.store.map_put(hash, map)
843
        return dest_version_id
844

    
845
    @backend_method
846
    def update_object_checksum(self, user, account, container, name, version, checksum):
847
        """Update an object's checksum."""
848

    
849
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
850
                     user, account, container, name, version, checksum)
851
        # Update objects with greater version and same hashmap and size (fix metadata updates).
852
        self._can_write(user, account, container, name)
853
        path, node = self._lookup_object(account, container, name)
854
        props = self._get_version(node, version)
855
        versions = self.node.node_get_versions(node)
856
        for x in versions:
857
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
858
                self.node.version_put_property(
859
                    x[self.SERIAL], 'checksum', checksum)
860

    
861
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
862
        dest_version_ids = []
863
        self._can_read(user, src_account, src_container, src_name)
864
        path, node = self._lookup_object(src_account, src_container, src_name)
865
        # TODO: Will do another fetch of the properties in duplicate version...
866
        props = self._get_version(
867
            node, src_version)  # Check to see if source exists.
868
        src_version_id = props[self.SERIAL]
869
        hash = props[self.HASH]
870
        size = props[self.SIZE]
871
        is_copy = not is_move and (src_account, src_container, src_name) != (
872
            dest_account, dest_container, dest_name)  # New uuid.
873
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
874
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
875
            self._delete_object(user, src_account, src_container, src_name)
876

    
877
        if delimiter:
878
            prefix = src_name + \
879
                delimiter if not src_name.endswith(delimiter) else src_name
880
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
881
            src_names.sort(key=lambda x: x[2])  # order by nodes
882
            paths = [elem[0] for elem in src_names]
883
            nodes = [elem[2] for elem in src_names]
884
            # TODO: Will do another fetch of the properties in duplicate version...
885
            props = self._get_versions(nodes)  # Check to see if source exists.
886

    
887
            for prop, path, node in zip(props, paths, nodes):
888
                src_version_id = prop[self.SERIAL]
889
                hash = prop[self.HASH]
890
                vtype = prop[self.TYPE]
891
                size = prop[self.SIZE]
892
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
893
                    delimiter) else dest_name
894
                vdest_name = path.replace(prefix, dest_prefix, 1)
895
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
896
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
897
                    self._delete_object(user, src_account, src_container, path)
898
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
899

    
900
    @backend_method
901
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
902
        """Copy an object's data and metadata."""
903

    
904
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
905
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
906
        return dest_version_id
907

    
908
    @backend_method
909
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
910
        """Move an object's data and metadata."""
911

    
912
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
913
        if user != src_account:
914
            raise NotAllowedError
915
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
916
        return dest_version_id
917

    
918
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
919
        if user != account:
920
            raise NotAllowedError
921

    
922
        if until is not None:
923
            path = '/'.join((account, container, name))
924
            node = self.node.node_lookup(path)
925
            if node is None:
926
                return
927
            hashes = []
928
            size = 0
929
            serials = []
930
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
931
            hashes += h
932
            size += s
933
            serials += v
934
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
935
            hashes += h
936
            size += s
937
            serials += v
938
            for h in hashes:
939
                self.store.map_delete(h)
940
            self.node.node_purge(node, until, CLUSTER_DELETED)
941
            try:
942
                props = self._get_version(node)
943
            except NameError:
944
                self.permissions.access_clear(path)
945
            self._report_size_change(user, account, -size,
946
                                                            {'action': 'object purge', 'path': path,
947
                                                             'versions': ','.join(str(i) for i in serials)})
948
            return
949

    
950
        path, node = self._lookup_object(account, container, name)
951
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
952
        del_size = self._apply_versioning(account, container, src_version_id)
953
        if del_size:
954
            self._report_size_change(user, account, -del_size,
955
                                                             {'action': 'object delete', 'path': path,
956
                                                              'versions': ','.join([str(dest_version_id)])})
957
        self._report_object_change(
958
            user, account, path, details={'action': 'object delete'})
959
        self.permissions.access_clear(path)
960

    
961
        if delimiter:
962
            prefix = name + delimiter if not name.endswith(delimiter) else name
963
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
964
            paths = []
965
            for t in src_names:
966
                path = '/'.join((account, container, t[0]))
967
                node = t[2]
968
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
969
                del_size = self._apply_versioning(
970
                    account, container, src_version_id)
971
                if del_size:
972
                    self._report_size_change(user, account, -del_size,
973
                                                                     {'action': 'object delete',
974
                                                                      'path': path,
975
                                                                      'versions': ','.join([str(dest_version_id)])})
976
                self._report_object_change(
977
                    user, account, path, details={'action': 'object delete'})
978
                paths.append(path)
979
            self.permissions.access_clear_bulk(paths)
980

    
981
    @backend_method
982
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
983
        """Delete/purge an object."""
984

    
985
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
986
                     account, container, name, until, prefix, delimiter)
987
        self._delete_object(user, account, container, name, until, delimiter)
988

    
989
    @backend_method
990
    def list_versions(self, user, account, container, name):
991
        """Return a list of all (version, version_timestamp) tuples for an object."""
992

    
993
        logger.debug(
994
            "list_versions: %s %s %s %s", user, account, container, name)
995
        self._can_read(user, account, container, name)
996
        path, node = self._lookup_object(account, container, name)
997
        versions = self.node.node_get_versions(node)
998
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
999

    
1000
    @backend_method
1001
    def get_uuid(self, user, uuid):
1002
        """Return the (account, container, name) for the UUID given."""
1003

    
1004
        logger.debug("get_uuid: %s %s", user, uuid)
1005
        info = self.node.latest_uuid(uuid)
1006
        if info is None:
1007
            raise NameError
1008
        path, serial = info
1009
        account, container, name = path.split('/', 2)
1010
        self._can_read(user, account, container, name)
1011
        return (account, container, name)
1012

    
1013
    @backend_method
1014
    def get_public(self, user, public):
1015
        """Return the (account, container, name) for the public id given."""
1016

    
1017
        logger.debug("get_public: %s %s", user, public)
1018
        if public is None or public < ULTIMATE_ANSWER:
1019
            raise NameError
1020
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1021
        if path is None:
1022
            raise NameError
1023
        account, container, name = path.split('/', 2)
1024
        self._can_read(user, account, container, name)
1025
        return (account, container, name)
1026

    
1027
    @backend_method(autocommit=0)
1028
    def get_block(self, hash):
1029
        """Return a block's data."""
1030

    
1031
        logger.debug("get_block: %s", hash)
1032
        block = self.store.block_get(binascii.unhexlify(hash))
1033
        if not block:
1034
            raise ItemNotExists('Block does not exist')
1035
        return block
1036

    
1037
    @backend_method(autocommit=0)
1038
    def put_block(self, data):
1039
        """Store a block and return the hash."""
1040

    
1041
        logger.debug("put_block: %s", len(data))
1042
        return binascii.hexlify(self.store.block_put(data))
1043

    
1044
    @backend_method(autocommit=0)
1045
    def update_block(self, hash, data, offset=0):
1046
        """Update a known block and return the hash."""
1047

    
1048
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1049
        if offset == 0 and len(data) == self.block_size:
1050
            return self.put_block(data)
1051
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1052
        return binascii.hexlify(h)
1053

    
1054
    # Path functions.
1055

    
1056
    def _generate_uuid(self):
1057
        return str(uuidlib.uuid4())
1058

    
1059
    def _put_object_node(self, path, parent, name):
1060
        path = '/'.join((path, name))
1061
        node = self.node.node_lookup(path)
1062
        if node is None:
1063
            node = self.node.node_create(parent, path)
1064
        return path, node
1065

    
1066
    def _put_path(self, user, parent, path):
1067
        node = self.node.node_create(parent, path)
1068
        self.node.version_create(node, None, 0, '', None, user,
1069
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1070
        return node
1071

    
1072
    def _lookup_account(self, account, create=True):
1073
        node = self.node.node_lookup(account)
1074
        if node is None and create:
1075
            node = self._put_path(
1076
                account, self.ROOTNODE, account)  # User is account.
1077
        return account, node
1078

    
1079
    def _lookup_container(self, account, container):
1080
        path = '/'.join((account, container))
1081
        node = self.node.node_lookup(path)
1082
        if node is None:
1083
            raise ItemNotExists('Container does not exist')
1084
        return path, node
1085

    
1086
    def _lookup_object(self, account, container, name):
1087
        path = '/'.join((account, container, name))
1088
        node = self.node.node_lookup(path)
1089
        if node is None:
1090
            raise ItemNotExists('Object does not exist')
1091
        return path, node
1092

    
1093
    def _lookup_objects(self, paths):
1094
        nodes = self.node.node_lookup_bulk(paths)
1095
        return paths, nodes
1096

    
1097
    def _get_properties(self, node, until=None):
1098
        """Return properties until the timestamp given."""
1099

    
1100
        before = until if until is not None else inf
1101
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1102
        if props is None and until is not None:
1103
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1104
        if props is None:
1105
            raise ItemNotExists('Path does not exist')
1106
        return props
1107

    
1108
    def _get_statistics(self, node, until=None):
1109
        """Return count, sum of size and latest timestamp of everything under node."""
1110

    
1111
        if until is None:
1112
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1113
        else:
1114
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1115
        if stats is None:
1116
            stats = (0, 0, 0)
1117
        return stats
1118

    
1119
    def _get_version(self, node, version=None):
1120
        if version is None:
1121
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1122
            if props is None:
1123
                raise ItemNotExists('Object does not exist')
1124
        else:
1125
            try:
1126
                version = int(version)
1127
            except ValueError:
1128
                raise VersionNotExists('Version does not exist')
1129
            props = self.node.version_get_properties(version)
1130
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1131
                raise VersionNotExists('Version does not exist')
1132
        return props
1133

    
1134
    def _get_versions(self, nodes):
1135
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1136

    
1137
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1138
        """Create a new version of the node."""
1139

    
1140
        props = self.node.version_lookup(
1141
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1142
        if props is not None:
1143
            src_version_id = props[self.SERIAL]
1144
            src_hash = props[self.HASH]
1145
            src_size = props[self.SIZE]
1146
            src_type = props[self.TYPE]
1147
            src_checksum = props[self.CHECKSUM]
1148
        else:
1149
            src_version_id = None
1150
            src_hash = None
1151
            src_size = 0
1152
            src_type = ''
1153
            src_checksum = ''
1154
        if size is None:  # Set metadata.
1155
            hash = src_hash  # This way hash can be set to None (account or container).
1156
            size = src_size
1157
        if type is None:
1158
            type = src_type
1159
        if checksum is None:
1160
            checksum = src_checksum
1161
        uuid = self._generate_uuid(
1162
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1163

    
1164
        if src_node is None:
1165
            pre_version_id = src_version_id
1166
        else:
1167
            pre_version_id = None
1168
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1169
            if props is not None:
1170
                pre_version_id = props[self.SERIAL]
1171
        if pre_version_id is not None:
1172
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1173

    
1174
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1175
        return pre_version_id, dest_version_id
1176

    
1177
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1178
        if src_version_id is not None:
1179
            self.node.attribute_copy(src_version_id, dest_version_id)
1180
        if not replace:
1181
            self.node.attribute_del(dest_version_id, domain, (
1182
                k for k, v in meta.iteritems() if v == ''))
1183
            self.node.attribute_set(dest_version_id, domain, (
1184
                (k, v) for k, v in meta.iteritems() if v != ''))
1185
        else:
1186
            self.node.attribute_del(dest_version_id, domain)
1187
            self.node.attribute_set(dest_version_id, domain, ((
1188
                k, v) for k, v in meta.iteritems()))
1189

    
1190
    def _put_metadata(self, user, node, domain, meta, replace=False):
1191
        """Create a new version and store metadata."""
1192

    
1193
        src_version_id, dest_version_id = self._put_version_duplicate(
1194
            user, node)
1195
        self._put_metadata_duplicate(
1196
            src_version_id, dest_version_id, domain, meta, replace)
1197
        return src_version_id, dest_version_id
1198

    
1199
    def _list_limits(self, listing, marker, limit):
1200
        start = 0
1201
        if marker:
1202
            try:
1203
                start = listing.index(marker) + 1
1204
            except ValueError:
1205
                pass
1206
        if not limit or limit > 10000:
1207
            limit = 10000
1208
        return start, limit
1209

    
1210
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1211
        cont_prefix = path + '/'
1212
        prefix = cont_prefix + prefix
1213
        start = cont_prefix + marker if marker else None
1214
        before = until if until is not None else inf
1215
        filterq = keys if domain else []
1216
        sizeq = size_range
1217

    
1218
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1219
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1220
        objects.sort(key=lambda x: x[0])
1221
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1222
        return objects
1223

    
1224
    # Reporting functions.
1225

    
1226
    def _report_size_change(self, user, account, size, details={}):
1227
        account_node = self._lookup_account(account, True)[1]
1228
        total = self._get_statistics(account_node)[1]
1229
        details.update({'user': user, 'total': total})
1230
        logger.debug(
1231
            "_report_size_change: %s %s %s %s", user, account, size, details)
1232
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1233
                                                  account, QUEUE_INSTANCE_ID, 'diskspace',
1234
                                                  float(size), details))
1235

    
1236
    def _report_object_change(self, user, account, path, details={}):
1237
        details.update({'user': user})
1238
        logger.debug("_report_object_change: %s %s %s %s", user,
1239
                     account, path, details)
1240
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1241
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1242

    
1243
    def _report_sharing_change(self, user, account, path, details={}):
1244
        logger.debug("_report_permissions_change: %s %s %s %s",
1245
                     user, account, path, details)
1246
        details.update({'user': user})
1247
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1248
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1249

    
1250
    # Policy functions.
1251

    
1252
    def _check_policy(self, policy):
1253
        for k in policy.keys():
1254
            if policy[k] == '':
1255
                policy[k] = self.default_policy.get(k)
1256
        for k, v in policy.iteritems():
1257
            if k == 'quota':
1258
                q = int(v)  # May raise ValueError.
1259
                if q < 0:
1260
                    raise ValueError
1261
            elif k == 'versioning':
1262
                if v not in ['auto', 'none']:
1263
                    raise ValueError
1264
            else:
1265
                raise ValueError
1266

    
1267
    def _put_policy(self, node, policy, replace):
1268
        if replace:
1269
            for k, v in self.default_policy.iteritems():
1270
                if k not in policy:
1271
                    policy[k] = v
1272
        self.node.policy_set(node, policy)
1273

    
1274
    def _get_policy(self, node):
1275
        policy = self.default_policy.copy()
1276
        policy.update(self.node.policy_get(node))
1277
        return policy
1278

    
1279
    def _apply_versioning(self, account, container, version_id):
1280
        """Delete the provided version if such is the policy.
1281
           Return size of object removed.
1282
        """
1283

    
1284
        if version_id is None:
1285
            return 0
1286
        path, node = self._lookup_container(account, container)
1287
        versioning = self._get_policy(node)['versioning']
1288
        if versioning != 'auto':
1289
            hash, size = self.node.version_remove(version_id)
1290
            self.store.map_delete(hash)
1291
            return size
1292
        return 0
1293

    
1294
    # Access control functions.
1295

    
1296
    def _check_groups(self, groups):
1297
        # raise ValueError('Bad characters in groups')
1298
        pass
1299

    
1300
    def _check_permissions(self, path, permissions):
1301
        # raise ValueError('Bad characters in permissions')
1302
        pass
1303

    
1304
    def _get_formatted_paths(self, paths):
1305
        formatted = []
1306
        for p in paths:
1307
            node = self.node.node_lookup(p)
1308
            if node is not None:
1309
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1310
            if props is not None:
1311
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1312
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1313
                formatted.append((p, self.MATCH_EXACT))
1314
        return formatted
1315

    
1316
    def _get_permissions_path(self, account, container, name):
1317
        path = '/'.join((account, container, name))
1318
        permission_paths = self.permissions.access_inherit(path)
1319
        permission_paths.sort()
1320
        permission_paths.reverse()
1321
        for p in permission_paths:
1322
            if p == path:
1323
                return p
1324
            else:
1325
                if p.count('/') < 2:
1326
                    continue
1327
                node = self.node.node_lookup(p)
1328
                if node is not None:
1329
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1330
                if props is not None:
1331
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1332
                        return p
1333
        return None
1334

    
1335
    def _can_read(self, user, account, container, name):
1336
        if user == account:
1337
            return True
1338
        path = '/'.join((account, container, name))
1339
        if self.permissions.public_get(path) is not None:
1340
            return True
1341
        path = self._get_permissions_path(account, container, name)
1342
        if not path:
1343
            raise NotAllowedError
1344
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1345
            raise NotAllowedError
1346

    
1347
    def _can_write(self, user, account, container, name):
1348
        if user == account:
1349
            return True
1350
        path = '/'.join((account, container, name))
1351
        path = self._get_permissions_path(account, container, name)
1352
        if not path:
1353
            raise NotAllowedError
1354
        if not self.permissions.access_check(path, self.WRITE, user):
1355
            raise NotAllowedError
1356

    
1357
    def _allowed_accounts(self, user):
1358
        allow = set()
1359
        for path in self.permissions.access_list_paths(user):
1360
            allow.add(path.split('/', 1)[0])
1361
        return sorted(allow)
1362

    
1363
    def _allowed_containers(self, user, account):
1364
        allow = set()
1365
        for path in self.permissions.access_list_paths(user, account):
1366
            allow.add(path.split('/', 2)[1])
1367
        return sorted(allow)