Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 0307b47f

History | View | Annotate | Download (59.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        try:
113
            self.messages = []
114
            ret = func(self, *args, **kw)
115
            for m in self.messages:
116
                self.queue.send(*m)
117
            self.wrapper.commit()
118
            return ret
119
        except:
120
            self.wrapper.rollback()
121
            raise
122
    return fn
123

    
124

    
125
class ModularBackend(BaseBackend):
126
    """A modular backend.
127

128
    Uses modules for SQL functions and storage.
129
    """
130

    
131
    def __init__(self, db_module=None, db_connection=None,
132
                 block_module=None, block_path=None, block_umask=None,
133
                 queue_module=None, queue_hosts=None,
134
                 queue_exchange=None, quotaholder_url=None):
135
        db_module = db_module or DEFAULT_DB_MODULE
136
        db_connection = db_connection or DEFAULT_DB_CONNECTION
137
        block_module = block_module or DEFAULT_BLOCK_MODULE
138
        block_path = block_path or DEFAULT_BLOCK_PATH
139
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
140
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
141
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
142
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
143
                
144
        self.hash_algorithm = 'sha256'
145
        self.block_size = 4 * 1024 * 1024  # 4MB
146

    
147
        self.default_policy = {'quota': DEFAULT_QUOTA,
148
                               'versioning': DEFAULT_VERSIONING}
149

    
150
        def load_module(m):
151
            __import__(m)
152
            return sys.modules[m]
153

    
154
        self.db_module = load_module(db_module)
155
        self.wrapper = self.db_module.DBWrapper(db_connection)
156
        params = {'wrapper': self.wrapper}
157
        self.permissions = self.db_module.Permissions(**params)
158
        self.config = self.db_module.Config(**params)
159
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
160
        for x in ['READ', 'WRITE']:
161
            setattr(self, x, getattr(self.db_module, x))
162
        self.node = self.db_module.Node(**params)
163
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
164
            setattr(self, x, getattr(self.db_module, x))
165

    
166
        self.block_module = load_module(block_module)
167
        params = {'path': block_path,
168
                  'block_size': self.block_size,
169
                  'hash_algorithm': self.hash_algorithm,
170
                  'umask': block_umask}
171
        self.store = self.block_module.Store(**params)
172

    
173
        if queue_module and queue_hosts:
174
            self.queue_module = load_module(queue_module)
175
            params = {'hosts': queue_hosts,
176
                          'exchange': queue_exchange,
177
                      'client_id': QUEUE_CLIENT_ID}
178
            self.queue = self.queue_module.Queue(**params)
179
        else:
180
            class NoQueue:
181
                def send(self, *args):
182
                    pass
183

    
184
                def close(self):
185
                    pass
186

    
187
            self.queue = NoQueue()
188

    
189
        self.quotaholder = QuotaholderHTTP('http://127.0.0.1/api/quotaholder/v')
190

    
191
    def close(self):
192
        self.wrapper.close()
193
        self.queue.close()
194

    
195
    @backend_method
196
    def list_accounts(self, user, marker=None, limit=10000):
197
        """Return a list of accounts the user can access."""
198

    
199
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
200
        allowed = self._allowed_accounts(user)
201
        start, limit = self._list_limits(allowed, marker, limit)
202
        return allowed[start:start + limit]
203

    
204
    @backend_method
205
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
206
        """Return a dictionary with the account metadata for the domain."""
207

    
208
        logger.debug(
209
            "get_account_meta: %s %s %s %s", user, account, domain, until)
210
        path, node = self._lookup_account(account, user == account)
211
        if user != account:
212
            if until or node is None or account not in self._allowed_accounts(user):
213
                raise NotAllowedError
214
        try:
215
            props = self._get_properties(node, until)
216
            mtime = props[self.MTIME]
217
        except NameError:
218
            props = None
219
            mtime = until
220
        count, bytes, tstamp = self._get_statistics(node, until)
221
        tstamp = max(tstamp, mtime)
222
        if until is None:
223
            modified = tstamp
224
        else:
225
            modified = self._get_statistics(
226
                node)[2]  # Overall last modification.
227
            modified = max(modified, mtime)
228

    
229
        if user != account:
230
            meta = {'name': account}
231
        else:
232
            meta = {}
233
            if props is not None and include_user_defined:
234
                meta.update(
235
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
236
            if until is not None:
237
                meta.update({'until_timestamp': tstamp})
238
            meta.update({'name': account, 'count': count, 'bytes': bytes})
239
        meta.update({'modified': modified})
240
        return meta
241

    
242
    @backend_method
243
    def update_account_meta(self, user, account, domain, meta, replace=False):
244
        """Update the metadata associated with the account for the domain."""
245

    
246
        logger.debug("update_account_meta: %s %s %s %s %s", user,
247
                     account, domain, meta, replace)
248
        if user != account:
249
            raise NotAllowedError
250
        path, node = self._lookup_account(account, True)
251
        self._put_metadata(user, node, domain, meta, replace)
252

    
253
    @backend_method
254
    def get_account_groups(self, user, account):
255
        """Return a dictionary with the user groups defined for this account."""
256

    
257
        logger.debug("get_account_groups: %s %s", user, account)
258
        if user != account:
259
            if account not in self._allowed_accounts(user):
260
                raise NotAllowedError
261
            return {}
262
        self._lookup_account(account, True)
263
        return self.permissions.group_dict(account)
264

    
265
    @backend_method
266
    def update_account_groups(self, user, account, groups, replace=False):
267
        """Update the groups associated with the account."""
268

    
269
        logger.debug("update_account_groups: %s %s %s %s", user,
270
                     account, groups, replace)
271
        if user != account:
272
            raise NotAllowedError
273
        self._lookup_account(account, True)
274
        self._check_groups(groups)
275
        if replace:
276
            self.permissions.group_destroy(account)
277
        for k, v in groups.iteritems():
278
            if not replace:  # If not already deleted.
279
                self.permissions.group_delete(account, k)
280
            if v:
281
                self.permissions.group_addmany(account, k, v)
282

    
283
    @backend_method
284
    def get_account_policy(self, user, account):
285
        """Return a dictionary with the account policy."""
286

    
287
        logger.debug("get_account_policy: %s %s", user, account)
288
        if user != account:
289
            if account not in self._allowed_accounts(user):
290
                raise NotAllowedError
291
            return {}
292
        path, node = self._lookup_account(account, True)
293
        return self._get_policy(node)
294

    
295
    @backend_method
296
    def update_account_policy(self, user, account, policy, replace=False):
297
        """Update the policy associated with the account."""
298

    
299
        logger.debug("update_account_policy: %s %s %s %s", user,
300
                     account, policy, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        path, node = self._lookup_account(account, True)
304
        self._check_policy(policy)
305
        self._put_policy(node, policy, replace)
306

    
307
    @backend_method
308
    def put_account(self, user, account, policy={}):
309
        """Create a new account with the given name."""
310

    
311
        logger.debug("put_account: %s %s %s", user, account, policy)
312
        if user != account:
313
            raise NotAllowedError
314
        node = self.node.node_lookup(account)
315
        if node is not None:
316
            raise AccountExists('Account already exists')
317
        if policy:
318
            self._check_policy(policy)
319
        node = self._put_path(user, self.ROOTNODE, account)
320
        self._put_policy(node, policy, True)
321

    
322
    @backend_method
323
    def delete_account(self, user, account):
324
        """Delete the account with the given name."""
325

    
326
        logger.debug("delete_account: %s %s", user, account)
327
        if user != account:
328
            raise NotAllowedError
329
        node = self.node.node_lookup(account)
330
        if node is None:
331
            return
332
        if not self.node.node_remove(node):
333
            raise AccountNotEmpty('Account is not empty')
334
        self.permissions.group_destroy(account)
335

    
336
    @backend_method
337
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
338
        """Return a list of containers existing under an account."""
339

    
340
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
341
                     account, marker, limit, shared, until, public)
342
        if user != account:
343
            if until or account not in self._allowed_accounts(user):
344
                raise NotAllowedError
345
            allowed = self._allowed_containers(user, account)
346
            start, limit = self._list_limits(allowed, marker, limit)
347
            return allowed[start:start + limit]
348
        if shared or public:
349
            allowed = set()
350
            if shared:
351
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
352
            if public:
353
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
354
            allowed = sorted(allowed)
355
            start, limit = self._list_limits(allowed, marker, limit)
356
            return allowed[start:start + limit]
357
        node = self.node.node_lookup(account)
358
        containers = [x[0] for x in self._list_object_properties(
359
            node, account, '', '/', marker, limit, False, None, [], until)]
360
        start, limit = self._list_limits(
361
            [x[0] for x in containers], marker, limit)
362
        return containers[start:start + limit]
363

    
364
    @backend_method
365
    def list_container_meta(self, user, account, container, domain, until=None):
366
        """Return a list with all the container's object meta keys for the domain."""
367

    
368
        logger.debug("list_container_meta: %s %s %s %s %s", user,
369
                     account, container, domain, until)
370
        allowed = []
371
        if user != account:
372
            if until:
373
                raise NotAllowedError
374
            allowed = self.permissions.access_list_paths(
375
                user, '/'.join((account, container)))
376
            if not allowed:
377
                raise NotAllowedError
378
        path, node = self._lookup_container(account, container)
379
        before = until if until is not None else inf
380
        allowed = self._get_formatted_paths(allowed)
381
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
382

    
383
    @backend_method
384
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
385
        """Return a dictionary with the container metadata for the domain."""
386

    
387
        logger.debug("get_container_meta: %s %s %s %s %s", user,
388
                     account, container, domain, until)
389
        if user != account:
390
            if until or container not in self._allowed_containers(user, account):
391
                raise NotAllowedError
392
        path, node = self._lookup_container(account, container)
393
        props = self._get_properties(node, until)
394
        mtime = props[self.MTIME]
395
        count, bytes, tstamp = self._get_statistics(node, until)
396
        tstamp = max(tstamp, mtime)
397
        if until is None:
398
            modified = tstamp
399
        else:
400
            modified = self._get_statistics(
401
                node)[2]  # Overall last modification.
402
            modified = max(modified, mtime)
403

    
404
        if user != account:
405
            meta = {'name': container}
406
        else:
407
            meta = {}
408
            if include_user_defined:
409
                meta.update(
410
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
411
            if until is not None:
412
                meta.update({'until_timestamp': tstamp})
413
            meta.update({'name': container, 'count': count, 'bytes': bytes})
414
        meta.update({'modified': modified})
415
        return meta
416

    
417
    @backend_method
418
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
419
        """Update the metadata associated with the container for the domain."""
420

    
421
        logger.debug("update_container_meta: %s %s %s %s %s %s",
422
                     user, account, container, domain, meta, replace)
423
        if user != account:
424
            raise NotAllowedError
425
        path, node = self._lookup_container(account, container)
426
        src_version_id, dest_version_id = self._put_metadata(
427
            user, node, domain, meta, replace)
428
        if src_version_id is not None:
429
            versioning = self._get_policy(node)['versioning']
430
            if versioning != 'auto':
431
                self.node.version_remove(src_version_id)
432

    
433
    @backend_method
434
    def get_container_policy(self, user, account, container):
435
        """Return a dictionary with the container policy."""
436

    
437
        logger.debug(
438
            "get_container_policy: %s %s %s", user, account, container)
439
        if user != account:
440
            if container not in self._allowed_containers(user, account):
441
                raise NotAllowedError
442
            return {}
443
        path, node = self._lookup_container(account, container)
444
        return self._get_policy(node)
445

    
446
    @backend_method
447
    def update_container_policy(self, user, account, container, policy, replace=False):
448
        """Update the policy associated with the container."""
449

    
450
        logger.debug("update_container_policy: %s %s %s %s %s",
451
                     user, account, container, policy, replace)
452
        if user != account:
453
            raise NotAllowedError
454
        path, node = self._lookup_container(account, container)
455
        self._check_policy(policy)
456
        self._put_policy(node, policy, replace)
457

    
458
    @backend_method
459
    def put_container(self, user, account, container, policy={}):
460
        """Create a new container with the given name."""
461

    
462
        logger.debug(
463
            "put_container: %s %s %s %s", user, account, container, policy)
464
        if user != account:
465
            raise NotAllowedError
466
        try:
467
            path, node = self._lookup_container(account, container)
468
        except NameError:
469
            pass
470
        else:
471
            raise ContainerExists('Container already exists')
472
        if policy:
473
            self._check_policy(policy)
474
        path = '/'.join((account, container))
475
        node = self._put_path(
476
            user, self._lookup_account(account, True)[1], path)
477
        self._put_policy(node, policy, True)
478

    
479
    @backend_method
480
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
481
        """Delete/purge the container with the given name."""
482

    
483
        logger.debug("delete_container: %s %s %s %s %s %s", user,
484
                     account, container, until, prefix, delimiter)
485
        if user != account:
486
            raise NotAllowedError
487
        path, node = self._lookup_container(account, container)
488

    
489
        if until is not None:
490
            hashes, size, serials = self.node.node_purge_children(
491
                node, until, CLUSTER_HISTORY)
492
            for h in hashes:
493
                self.store.map_delete(h)
494
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
495
            self._report_size_change(user, account, -size,
496
                                                             {'action':'container purge', 'path': path,
497
                                                              'versions': ','.join(str(i) for i in serials)})
498
            return
499

    
500
        if not delimiter:
501
            if self._get_statistics(node)[0] > 0:
502
                raise ContainerNotEmpty('Container is not empty')
503
            hashes, size, serials = self.node.node_purge_children(
504
                node, inf, CLUSTER_HISTORY)
505
            for h in hashes:
506
                self.store.map_delete(h)
507
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
508
            self.node.node_remove(node)
509
            self._report_size_change(user, account, -size,
510
                                                             {'action': 'container delete',
511
                                                              'path': path,
512
                                                              'versions': ','.join(str(i) for i in serials)})
513
        else:
514
                # remove only contents
515
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
516
            paths = []
517
            for t in src_names:
518
                path = '/'.join((account, container, t[0]))
519
                node = t[2]
520
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
521
                del_size = self._apply_versioning(
522
                    account, container, src_version_id)
523
                if del_size:
524
                    self._report_size_change(user, account, -del_size,
525
                                                                     {'action': 'object delete',
526
                                                                      'path': path,
527
                                                                      'versions': ','.join([str(dest_version_id)])})
528
                self._report_object_change(
529
                    user, account, path, details={'action': 'object delete'})
530
                paths.append(path)
531
            self.permissions.access_clear_bulk(paths)
532

    
533
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
534
        if user != account and until:
535
            raise NotAllowedError
536
        if shared and public:
537
            # get shared first
538
            shared = self._list_object_permissions(
539
                user, account, container, prefix, shared=True, public=False)
540
            objects = set()
541
            if shared:
542
                path, node = self._lookup_container(account, container)
543
                shared = self._get_formatted_paths(shared)
544
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
545

    
546
            # get public
547
            objects |= set(self._list_public_object_properties(
548
                user, account, container, prefix, all_props))
549
            objects = list(objects)
550

    
551
            objects.sort(key=lambda x: x[0])
552
            start, limit = self._list_limits(
553
                [x[0] for x in objects], marker, limit)
554
            return objects[start:start + limit]
555
        elif public:
556
            objects = self._list_public_object_properties(
557
                user, account, container, prefix, all_props)
558
            start, limit = self._list_limits(
559
                [x[0] for x in objects], marker, limit)
560
            return objects[start:start + limit]
561

    
562
        allowed = self._list_object_permissions(
563
            user, account, container, prefix, shared, public)
564
        if shared and not allowed:
565
            return []
566
        path, node = self._lookup_container(account, container)
567
        allowed = self._get_formatted_paths(allowed)
568
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
569
        start, limit = self._list_limits(
570
            [x[0] for x in objects], marker, limit)
571
        return objects[start:start + limit]
572

    
573
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
574
        public = self._list_object_permissions(
575
            user, account, container, prefix, shared=False, public=True)
576
        paths, nodes = self._lookup_objects(public)
577
        path = '/'.join((account, container))
578
        cont_prefix = path + '/'
579
        paths = [x[len(cont_prefix):] for x in paths]
580
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
581
        objects = [(path,) + props for path, props in zip(paths, props)]
582
        return objects
583

    
584
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
585
        objects = []
586
        while True:
587
            marker = objects[-1] if objects else None
588
            limit = 10000
589
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
590
            objects.extend(l)
591
            if not l or len(l) < limit:
592
                break
593
        return objects
594

    
595
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
596
        allowed = []
597
        path = '/'.join((account, container, prefix)).rstrip('/')
598
        if user != account:
599
            allowed = self.permissions.access_list_paths(user, path)
600
            if not allowed:
601
                raise NotAllowedError
602
        else:
603
            allowed = set()
604
            if shared:
605
                allowed.update(self.permissions.access_list_shared(path))
606
            if public:
607
                allowed.update(
608
                    [x[0] for x in self.permissions.public_list(path)])
609
            allowed = sorted(allowed)
610
            if not allowed:
611
                return []
612
        return allowed
613

    
614
    @backend_method
615
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
616
        """Return a list of object (name, version_id) tuples existing under a container."""
617

    
618
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
619
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
620

    
621
    @backend_method
622
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
623
        """Return a list of object metadata dicts existing under a container."""
624

    
625
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
626
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
627
        objects = []
628
        for p in props:
629
            if len(p) == 2:
630
                objects.append({'subdir': p[0]})
631
            else:
632
                objects.append({'name': p[0],
633
                                'bytes': p[self.SIZE + 1],
634
                                'type': p[self.TYPE + 1],
635
                                'hash': p[self.HASH + 1],
636
                                'version': p[self.SERIAL + 1],
637
                                'version_timestamp': p[self.MTIME + 1],
638
                                'modified': p[self.MTIME + 1] if until is None else None,
639
                                'modified_by': p[self.MUSER + 1],
640
                                'uuid': p[self.UUID + 1],
641
                                'checksum': p[self.CHECKSUM + 1]})
642
        return objects
643

    
644
    @backend_method
645
    def list_object_permissions(self, user, account, container, prefix=''):
646
        """Return a list of paths that enforce permissions under a container."""
647

    
648
        logger.debug("list_object_permissions: %s %s %s %s", user,
649
                     account, container, prefix)
650
        return self._list_object_permissions(user, account, container, prefix, True, False)
651

    
652
    @backend_method
653
    def list_object_public(self, user, account, container, prefix=''):
654
        """Return a dict mapping paths to public ids for objects that are public under a container."""
655

    
656
        logger.debug("list_object_public: %s %s %s %s", user,
657
                     account, container, prefix)
658
        public = {}
659
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
660
            public[path] = p + ULTIMATE_ANSWER
661
        return public
662

    
663
    @backend_method
664
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
665
        """Return a dictionary with the object metadata for the domain."""
666

    
667
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
668
                     account, container, name, domain, version)
669
        self._can_read(user, account, container, name)
670
        path, node = self._lookup_object(account, container, name)
671
        props = self._get_version(node, version)
672
        if version is None:
673
            modified = props[self.MTIME]
674
        else:
675
            try:
676
                modified = self._get_version(
677
                    node)[self.MTIME]  # Overall last modification.
678
            except NameError:  # Object may be deleted.
679
                del_props = self.node.version_lookup(
680
                    node, inf, CLUSTER_DELETED)
681
                if del_props is None:
682
                    raise ItemNotExists('Object does not exist')
683
                modified = del_props[self.MTIME]
684

    
685
        meta = {}
686
        if include_user_defined:
687
            meta.update(
688
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
689
        meta.update({'name': name,
690
                     'bytes': props[self.SIZE],
691
                     'type': props[self.TYPE],
692
                     'hash': props[self.HASH],
693
                     'version': props[self.SERIAL],
694
                     'version_timestamp': props[self.MTIME],
695
                     'modified': modified,
696
                     'modified_by': props[self.MUSER],
697
                     'uuid': props[self.UUID],
698
                     'checksum': props[self.CHECKSUM]})
699
        return meta
700

    
701
    @backend_method
702
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
703
        """Update the metadata associated with the object for the domain and return the new version."""
704

    
705
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
706
                     user, account, container, name, domain, meta, replace)
707
        self._can_write(user, account, container, name)
708
        path, node = self._lookup_object(account, container, name)
709
        src_version_id, dest_version_id = self._put_metadata(
710
            user, node, domain, meta, replace)
711
        self._apply_versioning(account, container, src_version_id)
712
        return dest_version_id
713

    
714
    @backend_method
715
    def get_object_permissions(self, user, account, container, name):
716
        """Return the action allowed on the object, the path
717
        from which the object gets its permissions from,
718
        along with a dictionary containing the permissions."""
719

    
720
        logger.debug("get_object_permissions: %s %s %s %s", user,
721
                     account, container, name)
722
        allowed = 'write'
723
        permissions_path = self._get_permissions_path(account, container, name)
724
        if user != account:
725
            if self.permissions.access_check(permissions_path, self.WRITE, user):
726
                allowed = 'write'
727
            elif self.permissions.access_check(permissions_path, self.READ, user):
728
                allowed = 'read'
729
            else:
730
                raise NotAllowedError
731
        self._lookup_object(account, container, name)
732
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
733

    
734
    @backend_method
735
    def update_object_permissions(self, user, account, container, name, permissions):
736
        """Update the permissions associated with the object."""
737

    
738
        logger.debug("update_object_permissions: %s %s %s %s %s",
739
                     user, account, container, name, permissions)
740
        if user != account:
741
            raise NotAllowedError
742
        path = self._lookup_object(account, container, name)[0]
743
        self._check_permissions(path, permissions)
744
        self.permissions.access_set(path, permissions)
745
        self._report_sharing_change(user, account, path, {'members':
746
                                    self.permissions.access_members(path)})
747

    
748
    @backend_method
749
    def get_object_public(self, user, account, container, name):
750
        """Return the public id of the object if applicable."""
751

    
752
        logger.debug(
753
            "get_object_public: %s %s %s %s", user, account, container, name)
754
        self._can_read(user, account, container, name)
755
        path = self._lookup_object(account, container, name)[0]
756
        p = self.permissions.public_get(path)
757
        if p is not None:
758
            p += ULTIMATE_ANSWER
759
        return p
760

    
761
    @backend_method
762
    def update_object_public(self, user, account, container, name, public):
763
        """Update the public status of the object."""
764

    
765
        logger.debug("update_object_public: %s %s %s %s %s", user,
766
                     account, container, name, public)
767
        self._can_write(user, account, container, name)
768
        path = self._lookup_object(account, container, name)[0]
769
        if not public:
770
            self.permissions.public_unset(path)
771
        else:
772
            self.permissions.public_set(path)
773

    
774
    @backend_method
775
    def get_object_hashmap(self, user, account, container, name, version=None):
776
        """Return the object's size and a list with partial hashes."""
777

    
778
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
779
                     account, container, name, version)
780
        self._can_read(user, account, container, name)
781
        path, node = self._lookup_object(account, container, name)
782
        props = self._get_version(node, version)
783
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
784
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
785

    
786
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
787
        if permissions is not None and user != account:
788
            raise NotAllowedError
789
        self._can_write(user, account, container, name)
790
        if permissions is not None:
791
            path = '/'.join((account, container, name))
792
            self._check_permissions(path, permissions)
793

    
794
        account_path, account_node = self._lookup_account(account, True)
795
        container_path, container_node = self._lookup_container(
796
            account, container)
797
        path, node = self._put_object_node(
798
            container_path, container_node, name)
799
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
800

    
801
        # Handle meta.
802
        if src_version_id is None:
803
            src_version_id = pre_version_id
804
        self._put_metadata_duplicate(
805
            src_version_id, dest_version_id, domain, meta, replace_meta)
806

    
807
        # Check quota.
808
        del_size = self._apply_versioning(account, container, pre_version_id)
809
        size_delta = size - del_size
810
        if size_delta > 0:
811
            account_quota = long(self._get_policy(account_node)['quota'])
812
            container_quota = long(self._get_policy(container_node)['quota'])
813
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
814
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
815
                # This must be executed in a transaction, so the version is never created if it fails.
816
                raise QuotaError
817
        self._report_size_change(user, account, size_delta,
818
                                                         {'action': 'object update', 'path': path,
819
                                                          'versions': ','.join([str(dest_version_id)])})
820

    
821
        if permissions is not None:
822
            self.permissions.access_set(path, permissions)
823
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
824

    
825
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
826
        return dest_version_id
827

    
828
    @backend_method
829
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
830
        """Create/update an object with the specified size and partial hashes."""
831

    
832
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
833
                     account, container, name, size, type, hashmap, checksum)
834
        if size == 0:  # No such thing as an empty hashmap.
835
            hashmap = [self.put_block('')]
836
        map = HashMap(self.block_size, self.hash_algorithm)
837
        map.extend([binascii.unhexlify(x) for x in hashmap])
838
        missing = self.store.block_search(map)
839
        if missing:
840
            ie = IndexError()
841
            ie.data = [binascii.hexlify(x) for x in missing]
842
            raise ie
843

    
844
        hash = map.hash()
845
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
846
        self.store.map_put(hash, map)
847
        return dest_version_id
848

    
849
    @backend_method
850
    def update_object_checksum(self, user, account, container, name, version, checksum):
851
        """Update an object's checksum."""
852

    
853
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
854
                     user, account, container, name, version, checksum)
855
        # Update objects with greater version and same hashmap and size (fix metadata updates).
856
        self._can_write(user, account, container, name)
857
        path, node = self._lookup_object(account, container, name)
858
        props = self._get_version(node, version)
859
        versions = self.node.node_get_versions(node)
860
        for x in versions:
861
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
862
                self.node.version_put_property(
863
                    x[self.SERIAL], 'checksum', checksum)
864

    
865
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
866
        dest_version_ids = []
867
        self._can_read(user, src_account, src_container, src_name)
868
        path, node = self._lookup_object(src_account, src_container, src_name)
869
        # TODO: Will do another fetch of the properties in duplicate version...
870
        props = self._get_version(
871
            node, src_version)  # Check to see if source exists.
872
        src_version_id = props[self.SERIAL]
873
        hash = props[self.HASH]
874
        size = props[self.SIZE]
875
        is_copy = not is_move and (src_account, src_container, src_name) != (
876
            dest_account, dest_container, dest_name)  # New uuid.
877
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
878
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
879
            self._delete_object(user, src_account, src_container, src_name)
880

    
881
        if delimiter:
882
            prefix = src_name + \
883
                delimiter if not src_name.endswith(delimiter) else src_name
884
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
885
            src_names.sort(key=lambda x: x[2])  # order by nodes
886
            paths = [elem[0] for elem in src_names]
887
            nodes = [elem[2] for elem in src_names]
888
            # TODO: Will do another fetch of the properties in duplicate version...
889
            props = self._get_versions(nodes)  # Check to see if source exists.
890

    
891
            for prop, path, node in zip(props, paths, nodes):
892
                src_version_id = prop[self.SERIAL]
893
                hash = prop[self.HASH]
894
                vtype = prop[self.TYPE]
895
                size = prop[self.SIZE]
896
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
897
                    delimiter) else dest_name
898
                vdest_name = path.replace(prefix, dest_prefix, 1)
899
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
900
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
901
                    self._delete_object(user, src_account, src_container, path)
902
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
903

    
904
    @backend_method
905
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
906
        """Copy an object's data and metadata."""
907

    
908
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
909
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
910
        return dest_version_id
911

    
912
    @backend_method
913
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
914
        """Move an object's data and metadata."""
915

    
916
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
917
        if user != src_account:
918
            raise NotAllowedError
919
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
920
        return dest_version_id
921

    
922
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
923
        if user != account:
924
            raise NotAllowedError
925

    
926
        if until is not None:
927
            path = '/'.join((account, container, name))
928
            node = self.node.node_lookup(path)
929
            if node is None:
930
                return
931
            hashes = []
932
            size = 0
933
            serials = []
934
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
935
            hashes += h
936
            size += s
937
            serials += v
938
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
939
            hashes += h
940
            size += s
941
            serials += v
942
            for h in hashes:
943
                self.store.map_delete(h)
944
            self.node.node_purge(node, until, CLUSTER_DELETED)
945
            try:
946
                props = self._get_version(node)
947
            except NameError:
948
                self.permissions.access_clear(path)
949
            self._report_size_change(user, account, -size,
950
                                                            {'action': 'object purge', 'path': path,
951
                                                             'versions': ','.join(str(i) for i in serials)})
952
            return
953

    
954
        path, node = self._lookup_object(account, container, name)
955
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
956
        del_size = self._apply_versioning(account, container, src_version_id)
957
        if del_size:
958
            self._report_size_change(user, account, -del_size,
959
                                                             {'action': 'object delete', 'path': path,
960
                                                              'versions': ','.join([str(dest_version_id)])})
961
        self._report_object_change(
962
            user, account, path, details={'action': 'object delete'})
963
        self.permissions.access_clear(path)
964

    
965
        if delimiter:
966
            prefix = name + delimiter if not name.endswith(delimiter) else name
967
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
968
            paths = []
969
            for t in src_names:
970
                path = '/'.join((account, container, t[0]))
971
                node = t[2]
972
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
973
                del_size = self._apply_versioning(
974
                    account, container, src_version_id)
975
                if del_size:
976
                    self._report_size_change(user, account, -del_size,
977
                                                                     {'action': 'object delete',
978
                                                                      'path': path,
979
                                                                      'versions': ','.join([str(dest_version_id)])})
980
                self._report_object_change(
981
                    user, account, path, details={'action': 'object delete'})
982
                paths.append(path)
983
            self.permissions.access_clear_bulk(paths)
984

    
985
    @backend_method
986
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
987
        """Delete/purge an object."""
988

    
989
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
990
                     account, container, name, until, prefix, delimiter)
991
        self._delete_object(user, account, container, name, until, delimiter)
992

    
993
    @backend_method
994
    def list_versions(self, user, account, container, name):
995
        """Return a list of all (version, version_timestamp) tuples for an object."""
996

    
997
        logger.debug(
998
            "list_versions: %s %s %s %s", user, account, container, name)
999
        self._can_read(user, account, container, name)
1000
        path, node = self._lookup_object(account, container, name)
1001
        versions = self.node.node_get_versions(node)
1002
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1003

    
1004
    @backend_method
1005
    def get_uuid(self, user, uuid):
1006
        """Return the (account, container, name) for the UUID given."""
1007

    
1008
        logger.debug("get_uuid: %s %s", user, uuid)
1009
        info = self.node.latest_uuid(uuid)
1010
        if info is None:
1011
            raise NameError
1012
        path, serial = info
1013
        account, container, name = path.split('/', 2)
1014
        self._can_read(user, account, container, name)
1015
        return (account, container, name)
1016

    
1017
    @backend_method
1018
    def get_public(self, user, public):
1019
        """Return the (account, container, name) for the public id given."""
1020

    
1021
        logger.debug("get_public: %s %s", user, public)
1022
        if public is None or public < ULTIMATE_ANSWER:
1023
            raise NameError
1024
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1025
        if path is None:
1026
            raise NameError
1027
        account, container, name = path.split('/', 2)
1028
        self._can_read(user, account, container, name)
1029
        return (account, container, name)
1030

    
1031
    @backend_method(autocommit=0)
1032
    def get_block(self, hash):
1033
        """Return a block's data."""
1034

    
1035
        logger.debug("get_block: %s", hash)
1036
        block = self.store.block_get(binascii.unhexlify(hash))
1037
        if not block:
1038
            raise ItemNotExists('Block does not exist')
1039
        return block
1040

    
1041
    @backend_method(autocommit=0)
1042
    def put_block(self, data):
1043
        """Store a block and return the hash."""
1044

    
1045
        logger.debug("put_block: %s", len(data))
1046
        return binascii.hexlify(self.store.block_put(data))
1047

    
1048
    @backend_method(autocommit=0)
1049
    def update_block(self, hash, data, offset=0):
1050
        """Update a known block and return the hash."""
1051

    
1052
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1053
        if offset == 0 and len(data) == self.block_size:
1054
            return self.put_block(data)
1055
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1056
        return binascii.hexlify(h)
1057

    
1058
    # Path functions.
1059

    
1060
    def _generate_uuid(self):
1061
        return str(uuidlib.uuid4())
1062

    
1063
    def _put_object_node(self, path, parent, name):
1064
        path = '/'.join((path, name))
1065
        node = self.node.node_lookup(path)
1066
        if node is None:
1067
            node = self.node.node_create(parent, path)
1068
        return path, node
1069

    
1070
    def _put_path(self, user, parent, path):
1071
        node = self.node.node_create(parent, path)
1072
        self.node.version_create(node, None, 0, '', None, user,
1073
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1074
        return node
1075

    
1076
    def _lookup_account(self, account, create=True):
1077
        node = self.node.node_lookup(account)
1078
        if node is None and create:
1079
            node = self._put_path(
1080
                account, self.ROOTNODE, account)  # User is account.
1081
        return account, node
1082

    
1083
    def _lookup_container(self, account, container):
1084
        path = '/'.join((account, container))
1085
        node = self.node.node_lookup(path)
1086
        if node is None:
1087
            raise ItemNotExists('Container does not exist')
1088
        return path, node
1089

    
1090
    def _lookup_object(self, account, container, name):
1091
        path = '/'.join((account, container, name))
1092
        node = self.node.node_lookup(path)
1093
        if node is None:
1094
            raise ItemNotExists('Object does not exist')
1095
        return path, node
1096

    
1097
    def _lookup_objects(self, paths):
1098
        nodes = self.node.node_lookup_bulk(paths)
1099
        return paths, nodes
1100

    
1101
    def _get_properties(self, node, until=None):
1102
        """Return properties until the timestamp given."""
1103

    
1104
        before = until if until is not None else inf
1105
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1106
        if props is None and until is not None:
1107
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1108
        if props is None:
1109
            raise ItemNotExists('Path does not exist')
1110
        return props
1111

    
1112
    def _get_statistics(self, node, until=None):
1113
        """Return count, sum of size and latest timestamp of everything under node."""
1114

    
1115
        if until is None:
1116
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1117
        else:
1118
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1119
        if stats is None:
1120
            stats = (0, 0, 0)
1121
        return stats
1122

    
1123
    def _get_version(self, node, version=None):
1124
        if version is None:
1125
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1126
            if props is None:
1127
                raise ItemNotExists('Object does not exist')
1128
        else:
1129
            try:
1130
                version = int(version)
1131
            except ValueError:
1132
                raise VersionNotExists('Version does not exist')
1133
            props = self.node.version_get_properties(version)
1134
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1135
                raise VersionNotExists('Version does not exist')
1136
        return props
1137

    
1138
    def _get_versions(self, nodes):
1139
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1140

    
1141
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1142
        """Create a new version of the node."""
1143

    
1144
        props = self.node.version_lookup(
1145
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1146
        if props is not None:
1147
            src_version_id = props[self.SERIAL]
1148
            src_hash = props[self.HASH]
1149
            src_size = props[self.SIZE]
1150
            src_type = props[self.TYPE]
1151
            src_checksum = props[self.CHECKSUM]
1152
        else:
1153
            src_version_id = None
1154
            src_hash = None
1155
            src_size = 0
1156
            src_type = ''
1157
            src_checksum = ''
1158
        if size is None:  # Set metadata.
1159
            hash = src_hash  # This way hash can be set to None (account or container).
1160
            size = src_size
1161
        if type is None:
1162
            type = src_type
1163
        if checksum is None:
1164
            checksum = src_checksum
1165
        uuid = self._generate_uuid(
1166
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1167

    
1168
        if src_node is None:
1169
            pre_version_id = src_version_id
1170
        else:
1171
            pre_version_id = None
1172
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1173
            if props is not None:
1174
                pre_version_id = props[self.SERIAL]
1175
        if pre_version_id is not None:
1176
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1177

    
1178
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1179
        return pre_version_id, dest_version_id
1180

    
1181
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1182
        if src_version_id is not None:
1183
            self.node.attribute_copy(src_version_id, dest_version_id)
1184
        if not replace:
1185
            self.node.attribute_del(dest_version_id, domain, (
1186
                k for k, v in meta.iteritems() if v == ''))
1187
            self.node.attribute_set(dest_version_id, domain, (
1188
                (k, v) for k, v in meta.iteritems() if v != ''))
1189
        else:
1190
            self.node.attribute_del(dest_version_id, domain)
1191
            self.node.attribute_set(dest_version_id, domain, ((
1192
                k, v) for k, v in meta.iteritems()))
1193

    
1194
    def _put_metadata(self, user, node, domain, meta, replace=False):
1195
        """Create a new version and store metadata."""
1196

    
1197
        src_version_id, dest_version_id = self._put_version_duplicate(
1198
            user, node)
1199
        self._put_metadata_duplicate(
1200
            src_version_id, dest_version_id, domain, meta, replace)
1201
        return src_version_id, dest_version_id
1202

    
1203
    def _list_limits(self, listing, marker, limit):
1204
        start = 0
1205
        if marker:
1206
            try:
1207
                start = listing.index(marker) + 1
1208
            except ValueError:
1209
                pass
1210
        if not limit or limit > 10000:
1211
            limit = 10000
1212
        return start, limit
1213

    
1214
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1215
        cont_prefix = path + '/'
1216
        prefix = cont_prefix + prefix
1217
        start = cont_prefix + marker if marker else None
1218
        before = until if until is not None else inf
1219
        filterq = keys if domain else []
1220
        sizeq = size_range
1221

    
1222
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1223
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1224
        objects.sort(key=lambda x: x[0])
1225
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1226
        return objects
1227

    
1228
    # Reporting functions.
1229

    
1230
    def _report_size_change(self, user, account, size, details={}):
1231
        account_node = self._lookup_account(account, True)[1]
1232
        total = self._get_statistics(account_node)[1]
1233
        details.update({'user': user, 'total': total})
1234
        logger.debug(
1235
            "_report_size_change: %s %s %s %s", user, account, size, details)
1236
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1237
                                                  account, QUEUE_INSTANCE_ID, 'diskspace',
1238
                                                  float(size), details))
1239

    
1240
        serial = self.quotaholder.issue_provision()
1241
        self.serial.append(serial)
1242

    
1243
    def _report_object_change(self, user, account, path, details={}):
1244
        details.update({'user': user})
1245
        logger.debug("_report_object_change: %s %s %s %s", user,
1246
                     account, path, details)
1247
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1248
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1249

    
1250
    def _report_sharing_change(self, user, account, path, details={}):
1251
        logger.debug("_report_permissions_change: %s %s %s %s",
1252
                     user, account, path, details)
1253
        details.update({'user': user})
1254
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1255
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1256

    
1257
    # Policy functions.
1258

    
1259
    def _check_policy(self, policy):
1260
        for k in policy.keys():
1261
            if policy[k] == '':
1262
                policy[k] = self.default_policy.get(k)
1263
        for k, v in policy.iteritems():
1264
            if k == 'quota':
1265
                q = int(v)  # May raise ValueError.
1266
                if q < 0:
1267
                    raise ValueError
1268
            elif k == 'versioning':
1269
                if v not in ['auto', 'none']:
1270
                    raise ValueError
1271
            else:
1272
                raise ValueError
1273

    
1274
    def _put_policy(self, node, policy, replace):
1275
        if replace:
1276
            for k, v in self.default_policy.iteritems():
1277
                if k not in policy:
1278
                    policy[k] = v
1279
        self.node.policy_set(node, policy)
1280

    
1281
    def _get_policy(self, node):
1282
        policy = self.default_policy.copy()
1283
        policy.update(self.node.policy_get(node))
1284
        return policy
1285

    
1286
    def _apply_versioning(self, account, container, version_id):
1287
        """Delete the provided version if such is the policy.
1288
           Return size of object removed.
1289
        """
1290

    
1291
        if version_id is None:
1292
            return 0
1293
        path, node = self._lookup_container(account, container)
1294
        versioning = self._get_policy(node)['versioning']
1295
        if versioning != 'auto':
1296
            hash, size = self.node.version_remove(version_id)
1297
            self.store.map_delete(hash)
1298
            return size
1299
        return 0
1300

    
1301
    # Access control functions.
1302

    
1303
    def _check_groups(self, groups):
1304
        # raise ValueError('Bad characters in groups')
1305
        pass
1306

    
1307
    def _check_permissions(self, path, permissions):
1308
        # raise ValueError('Bad characters in permissions')
1309
        pass
1310

    
1311
    def _get_formatted_paths(self, paths):
1312
        formatted = []
1313
        for p in paths:
1314
            node = self.node.node_lookup(p)
1315
            if node is not None:
1316
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1317
            if props is not None:
1318
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1319
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1320
                formatted.append((p, self.MATCH_EXACT))
1321
        return formatted
1322

    
1323
    def _get_permissions_path(self, account, container, name):
1324
        path = '/'.join((account, container, name))
1325
        permission_paths = self.permissions.access_inherit(path)
1326
        permission_paths.sort()
1327
        permission_paths.reverse()
1328
        for p in permission_paths:
1329
            if p == path:
1330
                return p
1331
            else:
1332
                if p.count('/') < 2:
1333
                    continue
1334
                node = self.node.node_lookup(p)
1335
                if node is not None:
1336
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1337
                if props is not None:
1338
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1339
                        return p
1340
        return None
1341

    
1342
    def _can_read(self, user, account, container, name):
1343
        if user == account:
1344
            return True
1345
        path = '/'.join((account, container, name))
1346
        if self.permissions.public_get(path) is not None:
1347
            return True
1348
        path = self._get_permissions_path(account, container, name)
1349
        if not path:
1350
            raise NotAllowedError
1351
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1352
            raise NotAllowedError
1353

    
1354
    def _can_write(self, user, account, container, name):
1355
        if user == account:
1356
            return True
1357
        path = '/'.join((account, container, name))
1358
        path = self._get_permissions_path(account, container, name)
1359
        if not path:
1360
            raise NotAllowedError
1361
        if not self.permissions.access_check(path, self.WRITE, user):
1362
            raise NotAllowedError
1363

    
1364
    def _allowed_accounts(self, user):
1365
        allow = set()
1366
        for path in self.permissions.access_list_paths(user):
1367
            allow.add(path.split('/', 1)[0])
1368
        return sorted(allow)
1369

    
1370
    def _allowed_containers(self, user, account):
1371
        allow = set()
1372
        for path in self.permissions.access_list_paths(user, account):
1373
            allow.add(path.split('/', 2)[1])
1374
        return sorted(allow)