Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 388ea25f

History | View | Annotate | Download (59.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44

    
45
# Stripped-down version of the HashMap class found in tools.
46

    
47

    
48
class HashMap(list):
49

    
50
    def __init__(self, blocksize, blockhash):
51
        super(HashMap, self).__init__()
52
        self.blocksize = blocksize
53
        self.blockhash = blockhash
54

    
55
    def _hash_raw(self, v):
56
        h = hashlib.new(self.blockhash)
57
        h.update(v)
58
        return h.digest()
59

    
60
    def hash(self):
61
        if len(self) == 0:
62
            return self._hash_raw('')
63
        if len(self) == 1:
64
            return self.__getitem__(0)
65

    
66
        h = list(self)
67
        s = 2
68
        while s < len(h):
69
            s = s * 2
70
        h += [('\x00' * len(h[0]))] * (s - len(h))
71
        while len(h) > 1:
72
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73
        return h[0]
74

    
75
# Default modules and settings.
76
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79
DEFAULT_BLOCK_PATH = 'data/'
80
DEFAULT_BLOCK_UMASK = 0o022
81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
83
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
84

    
85
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
86
QUEUE_CLIENT_ID = 'pithos'
87
QUEUE_INSTANCE_ID = '1'
88

    
89
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
90

    
91
inf = float('inf')
92

    
93
ULTIMATE_ANSWER = 42
94

    
95

    
96
logger = logging.getLogger(__name__)
97

    
98

    
99
def backend_method(func=None, autocommit=1):
100
    if func is None:
101
        def fn(func):
102
            return backend_method(func, autocommit)
103
        return fn
104

    
105
    if not autocommit:
106
        return func
107

    
108
    def fn(self, *args, **kw):
109
        self.wrapper.execute()
110
        try:
111
            self.messages = []
112
            ret = func(self, *args, **kw)
113
            for m in self.messages:
114
                self.queue.send(*m)
115
            self.wrapper.commit()
116
            return ret
117
        except:
118
            self.wrapper.rollback()
119
            raise
120
    return fn
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 queue_module=None, queue_hosts=None,
132
                 queue_exchange=None):
133
        db_module = db_module or DEFAULT_DB_MODULE
134
        db_connection = db_connection or DEFAULT_DB_CONNECTION
135
        block_module = block_module or DEFAULT_BLOCK_MODULE
136
        block_path = block_path or DEFAULT_BLOCK_PATH
137
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
138
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
139
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
140
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
141
                
142
        self.hash_algorithm = 'sha256'
143
        self.block_size = 4 * 1024 * 1024  # 4MB
144

    
145
        self.default_policy = {'quota': DEFAULT_QUOTA,
146
                               'versioning': DEFAULT_VERSIONING}
147

    
148
        def load_module(m):
149
            __import__(m)
150
            return sys.modules[m]
151

    
152
        self.db_module = load_module(db_module)
153
        self.wrapper = self.db_module.DBWrapper(db_connection)
154
        params = {'wrapper': self.wrapper}
155
        self.permissions = self.db_module.Permissions(**params)
156
        self.config = self.db_module.Config(**params)
157
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
158
        for x in ['READ', 'WRITE']:
159
            setattr(self, x, getattr(self.db_module, x))
160
        self.node = self.db_module.Node(**params)
161
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
162
            setattr(self, x, getattr(self.db_module, x))
163

    
164
        self.block_module = load_module(block_module)
165
        params = {'path': block_path,
166
                  'block_size': self.block_size,
167
                  'hash_algorithm': self.hash_algorithm,
168
                  'umask': block_umask}
169
        self.store = self.block_module.Store(**params)
170

    
171
        if queue_module and queue_hosts:
172
            self.queue_module = load_module(queue_module)
173
            params = {'hosts': queue_hosts,
174
                              'exchange': queue_exchange,
175
                      'client_id': QUEUE_CLIENT_ID}
176
            self.queue = self.queue_module.Queue(**params)
177
        else:
178
            class NoQueue:
179
                def send(self, *args):
180
                    pass
181

    
182
                def close(self):
183
                    pass
184

    
185
            self.queue = NoQueue()
186

    
187
    def close(self):
188
        self.wrapper.close()
189
        self.queue.close()
190

    
191
    @backend_method
192
    def list_accounts(self, user, marker=None, limit=10000):
193
        """Return a list of accounts the user can access."""
194

    
195
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
196
        allowed = self._allowed_accounts(user)
197
        start, limit = self._list_limits(allowed, marker, limit)
198
        return allowed[start:start + limit]
199

    
200
    @backend_method
201
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
202
        """Return a dictionary with the account metadata for the domain."""
203

    
204
        logger.debug(
205
            "get_account_meta: %s %s %s %s", user, account, domain, until)
206
        path, node = self._lookup_account(account, user == account)
207
        if user != account:
208
            if until or node is None or account not in self._allowed_accounts(user):
209
                raise NotAllowedError
210
        try:
211
            props = self._get_properties(node, until)
212
            mtime = props[self.MTIME]
213
        except NameError:
214
            props = None
215
            mtime = until
216
        count, bytes, tstamp = self._get_statistics(node, until)
217
        tstamp = max(tstamp, mtime)
218
        if until is None:
219
            modified = tstamp
220
        else:
221
            modified = self._get_statistics(
222
                node)[2]  # Overall last modification.
223
            modified = max(modified, mtime)
224

    
225
        if user != account:
226
            meta = {'name': account}
227
        else:
228
            meta = {}
229
            if props is not None and include_user_defined:
230
                meta.update(
231
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
232
            if until is not None:
233
                meta.update({'until_timestamp': tstamp})
234
            meta.update({'name': account, 'count': count, 'bytes': bytes})
235
        meta.update({'modified': modified})
236
        return meta
237

    
238
    @backend_method
239
    def update_account_meta(self, user, account, domain, meta, replace=False):
240
        """Update the metadata associated with the account for the domain."""
241

    
242
        logger.debug("update_account_meta: %s %s %s %s %s", user,
243
                     account, domain, meta, replace)
244
        if user != account:
245
            raise NotAllowedError
246
        path, node = self._lookup_account(account, True)
247
        self._put_metadata(user, node, domain, meta, replace)
248

    
249
    @backend_method
250
    def get_account_groups(self, user, account):
251
        """Return a dictionary with the user groups defined for this account."""
252

    
253
        logger.debug("get_account_groups: %s %s", user, account)
254
        if user != account:
255
            if account not in self._allowed_accounts(user):
256
                raise NotAllowedError
257
            return {}
258
        self._lookup_account(account, True)
259
        return self.permissions.group_dict(account)
260

    
261
    @backend_method
262
    def update_account_groups(self, user, account, groups, replace=False):
263
        """Update the groups associated with the account."""
264

    
265
        logger.debug("update_account_groups: %s %s %s %s", user,
266
                     account, groups, replace)
267
        if user != account:
268
            raise NotAllowedError
269
        self._lookup_account(account, True)
270
        self._check_groups(groups)
271
        if replace:
272
            self.permissions.group_destroy(account)
273
        for k, v in groups.iteritems():
274
            if not replace:  # If not already deleted.
275
                self.permissions.group_delete(account, k)
276
            if v:
277
                self.permissions.group_addmany(account, k, v)
278

    
279
    @backend_method
280
    def get_account_policy(self, user, account):
281
        """Return a dictionary with the account policy."""
282

    
283
        logger.debug("get_account_policy: %s %s", user, account)
284
        if user != account:
285
            if account not in self._allowed_accounts(user):
286
                raise NotAllowedError
287
            return {}
288
        path, node = self._lookup_account(account, True)
289
        return self._get_policy(node)
290

    
291
    @backend_method
292
    def update_account_policy(self, user, account, policy, replace=False):
293
        """Update the policy associated with the account."""
294

    
295
        logger.debug("update_account_policy: %s %s %s %s", user,
296
                     account, policy, replace)
297
        if user != account:
298
            raise NotAllowedError
299
        path, node = self._lookup_account(account, True)
300
        self._check_policy(policy)
301
        self._put_policy(node, policy, replace)
302

    
303
    @backend_method
304
    def put_account(self, user, account, policy={}):
305
        """Create a new account with the given name."""
306

    
307
        logger.debug("put_account: %s %s %s", user, account, policy)
308
        if user != account:
309
            raise NotAllowedError
310
        node = self.node.node_lookup(account)
311
        if node is not None:
312
            raise AccountExists('Account already exists')
313
        if policy:
314
            self._check_policy(policy)
315
        node = self._put_path(user, self.ROOTNODE, account)
316
        self._put_policy(node, policy, True)
317

    
318
    @backend_method
319
    def delete_account(self, user, account):
320
        """Delete the account with the given name."""
321

    
322
        logger.debug("delete_account: %s %s", user, account)
323
        if user != account:
324
            raise NotAllowedError
325
        node = self.node.node_lookup(account)
326
        if node is None:
327
            return
328
        if not self.node.node_remove(node):
329
            raise AccountNotEmpty('Account is not empty')
330
        self.permissions.group_destroy(account)
331

    
332
    @backend_method
333
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
334
        """Return a list of containers existing under an account."""
335

    
336
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
337
                     account, marker, limit, shared, until, public)
338
        if user != account:
339
            if until or account not in self._allowed_accounts(user):
340
                raise NotAllowedError
341
            allowed = self._allowed_containers(user, account)
342
            start, limit = self._list_limits(allowed, marker, limit)
343
            return allowed[start:start + limit]
344
        if shared or public:
345
            allowed = set()
346
            if shared:
347
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
348
            if public:
349
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
350
            allowed = sorted(allowed)
351
            start, limit = self._list_limits(allowed, marker, limit)
352
            return allowed[start:start + limit]
353
        node = self.node.node_lookup(account)
354
        containers = [x[0] for x in self._list_object_properties(
355
            node, account, '', '/', marker, limit, False, None, [], until)]
356
        start, limit = self._list_limits(
357
            [x[0] for x in containers], marker, limit)
358
        return containers[start:start + limit]
359

    
360
    @backend_method
361
    def list_container_meta(self, user, account, container, domain, until=None):
362
        """Return a list with all the container's object meta keys for the domain."""
363

    
364
        logger.debug("list_container_meta: %s %s %s %s %s", user,
365
                     account, container, domain, until)
366
        allowed = []
367
        if user != account:
368
            if until:
369
                raise NotAllowedError
370
            allowed = self.permissions.access_list_paths(
371
                user, '/'.join((account, container)))
372
            if not allowed:
373
                raise NotAllowedError
374
        path, node = self._lookup_container(account, container)
375
        before = until if until is not None else inf
376
        allowed = self._get_formatted_paths(allowed)
377
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
378

    
379
    @backend_method
380
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
381
        """Return a dictionary with the container metadata for the domain."""
382

    
383
        logger.debug("get_container_meta: %s %s %s %s %s", user,
384
                     account, container, domain, until)
385
        if user != account:
386
            if until or container not in self._allowed_containers(user, account):
387
                raise NotAllowedError
388
        path, node = self._lookup_container(account, container)
389
        props = self._get_properties(node, until)
390
        mtime = props[self.MTIME]
391
        count, bytes, tstamp = self._get_statistics(node, until)
392
        tstamp = max(tstamp, mtime)
393
        if until is None:
394
            modified = tstamp
395
        else:
396
            modified = self._get_statistics(
397
                node)[2]  # Overall last modification.
398
            modified = max(modified, mtime)
399

    
400
        if user != account:
401
            meta = {'name': container}
402
        else:
403
            meta = {}
404
            if include_user_defined:
405
                meta.update(
406
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
407
            if until is not None:
408
                meta.update({'until_timestamp': tstamp})
409
            meta.update({'name': container, 'count': count, 'bytes': bytes})
410
        meta.update({'modified': modified})
411
        return meta
412

    
413
    @backend_method
414
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
415
        """Update the metadata associated with the container for the domain."""
416

    
417
        logger.debug("update_container_meta: %s %s %s %s %s %s",
418
                     user, account, container, domain, meta, replace)
419
        if user != account:
420
            raise NotAllowedError
421
        path, node = self._lookup_container(account, container)
422
        src_version_id, dest_version_id = self._put_metadata(
423
            user, node, domain, meta, replace)
424
        if src_version_id is not None:
425
            versioning = self._get_policy(node)['versioning']
426
            if versioning != 'auto':
427
                self.node.version_remove(src_version_id)
428

    
429
    @backend_method
430
    def get_container_policy(self, user, account, container):
431
        """Return a dictionary with the container policy."""
432

    
433
        logger.debug(
434
            "get_container_policy: %s %s %s", user, account, container)
435
        if user != account:
436
            if container not in self._allowed_containers(user, account):
437
                raise NotAllowedError
438
            return {}
439
        path, node = self._lookup_container(account, container)
440
        return self._get_policy(node)
441

    
442
    @backend_method
443
    def update_container_policy(self, user, account, container, policy, replace=False):
444
        """Update the policy associated with the container."""
445

    
446
        logger.debug("update_container_policy: %s %s %s %s %s",
447
                     user, account, container, policy, replace)
448
        if user != account:
449
            raise NotAllowedError
450
        path, node = self._lookup_container(account, container)
451
        self._check_policy(policy)
452
        self._put_policy(node, policy, replace)
453

    
454
    @backend_method
455
    def put_container(self, user, account, container, policy={}):
456
        """Create a new container with the given name."""
457

    
458
        logger.debug(
459
            "put_container: %s %s %s %s", user, account, container, policy)
460
        if user != account:
461
            raise NotAllowedError
462
        try:
463
            path, node = self._lookup_container(account, container)
464
        except NameError:
465
            pass
466
        else:
467
            raise ContainerExists('Container already exists')
468
        if policy:
469
            self._check_policy(policy)
470
        path = '/'.join((account, container))
471
        node = self._put_path(
472
            user, self._lookup_account(account, True)[1], path)
473
        self._put_policy(node, policy, True)
474

    
475
    @backend_method
476
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
477
        """Delete/purge the container with the given name."""
478

    
479
        logger.debug("delete_container: %s %s %s %s %s %s", user,
480
                     account, container, until, prefix, delimiter)
481
        if user != account:
482
            raise NotAllowedError
483
        path, node = self._lookup_container(account, container)
484

    
485
        if until is not None:
486
            hashes, size, serials = self.node.node_purge_children(
487
                node, until, CLUSTER_HISTORY)
488
            for h in hashes:
489
                self.store.map_delete(h)
490
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
491
            self._report_size_change(user, account, -size,
492
                                                             {'action':'container purge', 'path': path,
493
                                                              'versions': serials})
494
            return
495

    
496
        if not delimiter:
497
            if self._get_statistics(node)[0] > 0:
498
                raise ContainerNotEmpty('Container is not empty')
499
            hashes, size, serials = self.node.node_purge_children(
500
                node, inf, CLUSTER_HISTORY)
501
            for h in hashes:
502
                self.store.map_delete(h)
503
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
504
            self.node.node_remove(node)
505
            self._report_size_change(user, account, -size,
506
                                                             {'action': 'container delete',
507
                                                              'path': path,
508
                                                              'versions': serials})
509
        else:
510
                # remove only contents
511
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
512
            paths = []
513
            for t in src_names:
514
                path = '/'.join((account, container, t[0]))
515
                node = t[2]
516
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
517
                del_size = self._apply_versioning(
518
                    account, container, src_version_id)
519
                if del_size:
520
                    self._report_size_change(user, account, -del_size,
521
                                                                     {'action': 'object delete',
522
                                                                      'path': path, 'versions': [dest_version_id]})
523
                self._report_object_change(
524
                    user, account, path, details={'action': 'object delete'})
525
                paths.append(path)
526
            self.permissions.access_clear_bulk(paths)
527

    
528
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
529
        if user != account and until:
530
            raise NotAllowedError
531
        if shared and public:
532
            # get shared first
533
            shared = self._list_object_permissions(
534
                user, account, container, prefix, shared=True, public=False)
535
            objects = set()
536
            if shared:
537
                path, node = self._lookup_container(account, container)
538
                shared = self._get_formatted_paths(shared)
539
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
540

    
541
            # get public
542
            objects |= set(self._list_public_object_properties(
543
                user, account, container, prefix, all_props))
544
            objects = list(objects)
545

    
546
            objects.sort(key=lambda x: x[0])
547
            start, limit = self._list_limits(
548
                [x[0] for x in objects], marker, limit)
549
            return objects[start:start + limit]
550
        elif public:
551
            objects = self._list_public_object_properties(
552
                user, account, container, prefix, all_props)
553
            start, limit = self._list_limits(
554
                [x[0] for x in objects], marker, limit)
555
            return objects[start:start + limit]
556

    
557
        allowed = self._list_object_permissions(
558
            user, account, container, prefix, shared, public)
559
        if shared and not allowed:
560
            return []
561
        path, node = self._lookup_container(account, container)
562
        allowed = self._get_formatted_paths(allowed)
563
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
564
        start, limit = self._list_limits(
565
            [x[0] for x in objects], marker, limit)
566
        return objects[start:start + limit]
567

    
568
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
569
        public = self._list_object_permissions(
570
            user, account, container, prefix, shared=False, public=True)
571
        paths, nodes = self._lookup_objects(public)
572
        path = '/'.join((account, container))
573
        cont_prefix = path + '/'
574
        paths = [x[len(cont_prefix):] for x in paths]
575
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
576
        objects = [(path,) + props for path, props in zip(paths, props)]
577
        return objects
578

    
579
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
580
        objects = []
581
        while True:
582
            marker = objects[-1] if objects else None
583
            limit = 10000
584
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
585
            objects.extend(l)
586
            if not l or len(l) < limit:
587
                break
588
        return objects
589

    
590
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
591
        allowed = []
592
        path = '/'.join((account, container, prefix)).rstrip('/')
593
        if user != account:
594
            allowed = self.permissions.access_list_paths(user, path)
595
            if not allowed:
596
                raise NotAllowedError
597
        else:
598
            allowed = set()
599
            if shared:
600
                allowed.update(self.permissions.access_list_shared(path))
601
            if public:
602
                allowed.update(
603
                    [x[0] for x in self.permissions.public_list(path)])
604
            allowed = sorted(allowed)
605
            if not allowed:
606
                return []
607
        return allowed
608

    
609
    @backend_method
610
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
611
        """Return a list of object (name, version_id) tuples existing under a container."""
612

    
613
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
614
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
615

    
616
    @backend_method
617
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
618
        """Return a list of object metadata dicts existing under a container."""
619

    
620
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
621
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
622
        objects = []
623
        for p in props:
624
            if len(p) == 2:
625
                objects.append({'subdir': p[0]})
626
            else:
627
                objects.append({'name': p[0],
628
                                'bytes': p[self.SIZE + 1],
629
                                'type': p[self.TYPE + 1],
630
                                'hash': p[self.HASH + 1],
631
                                'version': p[self.SERIAL + 1],
632
                                'version_timestamp': p[self.MTIME + 1],
633
                                'modified': p[self.MTIME + 1] if until is None else None,
634
                                'modified_by': p[self.MUSER + 1],
635
                                'uuid': p[self.UUID + 1],
636
                                'checksum': p[self.CHECKSUM + 1]})
637
        return objects
638

    
639
    @backend_method
640
    def list_object_permissions(self, user, account, container, prefix=''):
641
        """Return a list of paths that enforce permissions under a container."""
642

    
643
        logger.debug("list_object_permissions: %s %s %s %s", user,
644
                     account, container, prefix)
645
        return self._list_object_permissions(user, account, container, prefix, True, False)
646

    
647
    @backend_method
648
    def list_object_public(self, user, account, container, prefix=''):
649
        """Return a dict mapping paths to public ids for objects that are public under a container."""
650

    
651
        logger.debug("list_object_public: %s %s %s %s", user,
652
                     account, container, prefix)
653
        public = {}
654
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
655
            public[path] = p + ULTIMATE_ANSWER
656
        return public
657

    
658
    @backend_method
659
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
660
        """Return a dictionary with the object metadata for the domain."""
661

    
662
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
663
                     account, container, name, domain, version)
664
        self._can_read(user, account, container, name)
665
        path, node = self._lookup_object(account, container, name)
666
        props = self._get_version(node, version)
667
        if version is None:
668
            modified = props[self.MTIME]
669
        else:
670
            try:
671
                modified = self._get_version(
672
                    node)[self.MTIME]  # Overall last modification.
673
            except NameError:  # Object may be deleted.
674
                del_props = self.node.version_lookup(
675
                    node, inf, CLUSTER_DELETED)
676
                if del_props is None:
677
                    raise ItemNotExists('Object does not exist')
678
                modified = del_props[self.MTIME]
679

    
680
        meta = {}
681
        if include_user_defined:
682
            meta.update(
683
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
684
        meta.update({'name': name,
685
                     'bytes': props[self.SIZE],
686
                     'type': props[self.TYPE],
687
                     'hash': props[self.HASH],
688
                     'version': props[self.SERIAL],
689
                     'version_timestamp': props[self.MTIME],
690
                     'modified': modified,
691
                     'modified_by': props[self.MUSER],
692
                     'uuid': props[self.UUID],
693
                     'checksum': props[self.CHECKSUM]})
694
        return meta
695

    
696
    @backend_method
697
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
698
        """Update the metadata associated with the object for the domain and return the new version."""
699

    
700
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
701
                     user, account, container, name, domain, meta, replace)
702
        self._can_write(user, account, container, name)
703
        path, node = self._lookup_object(account, container, name)
704
        src_version_id, dest_version_id = self._put_metadata(
705
            user, node, domain, meta, replace)
706
        self._apply_versioning(account, container, src_version_id)
707
        return dest_version_id
708

    
709
    @backend_method
710
    def get_object_permissions(self, user, account, container, name):
711
        """Return the action allowed on the object, the path
712
        from which the object gets its permissions from,
713
        along with a dictionary containing the permissions."""
714

    
715
        logger.debug("get_object_permissions: %s %s %s %s", user,
716
                     account, container, name)
717
        allowed = 'write'
718
        permissions_path = self._get_permissions_path(account, container, name)
719
        if user != account:
720
            if self.permissions.access_check(permissions_path, self.WRITE, user):
721
                allowed = 'write'
722
            elif self.permissions.access_check(permissions_path, self.READ, user):
723
                allowed = 'read'
724
            else:
725
                raise NotAllowedError
726
        self._lookup_object(account, container, name)
727
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
728

    
729
    @backend_method
730
    def update_object_permissions(self, user, account, container, name, permissions):
731
        """Update the permissions associated with the object."""
732

    
733
        logger.debug("update_object_permissions: %s %s %s %s %s",
734
                     user, account, container, name, permissions)
735
        if user != account:
736
            raise NotAllowedError
737
        path = self._lookup_object(account, container, name)[0]
738
        self._check_permissions(path, permissions)
739
        self.permissions.access_set(path, permissions)
740
        self._report_sharing_change(user, account, path, {'members':
741
                                    self.permissions.access_members(path)})
742

    
743
    @backend_method
744
    def get_object_public(self, user, account, container, name):
745
        """Return the public id of the object if applicable."""
746

    
747
        logger.debug(
748
            "get_object_public: %s %s %s %s", user, account, container, name)
749
        self._can_read(user, account, container, name)
750
        path = self._lookup_object(account, container, name)[0]
751
        p = self.permissions.public_get(path)
752
        if p is not None:
753
            p += ULTIMATE_ANSWER
754
        return p
755

    
756
    @backend_method
757
    def update_object_public(self, user, account, container, name, public):
758
        """Update the public status of the object."""
759

    
760
        logger.debug("update_object_public: %s %s %s %s %s", user,
761
                     account, container, name, public)
762
        self._can_write(user, account, container, name)
763
        path = self._lookup_object(account, container, name)[0]
764
        if not public:
765
            self.permissions.public_unset(path)
766
        else:
767
            self.permissions.public_set(path)
768

    
769
    @backend_method
770
    def get_object_hashmap(self, user, account, container, name, version=None):
771
        """Return the object's size and a list with partial hashes."""
772

    
773
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
774
                     account, container, name, version)
775
        self._can_read(user, account, container, name)
776
        path, node = self._lookup_object(account, container, name)
777
        props = self._get_version(node, version)
778
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
779
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
780

    
781
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
782
        if permissions is not None and user != account:
783
            raise NotAllowedError
784
        self._can_write(user, account, container, name)
785
        if permissions is not None:
786
            path = '/'.join((account, container, name))
787
            self._check_permissions(path, permissions)
788

    
789
        account_path, account_node = self._lookup_account(account, True)
790
        container_path, container_node = self._lookup_container(
791
            account, container)
792
        path, node = self._put_object_node(
793
            container_path, container_node, name)
794
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
795

    
796
        # Handle meta.
797
        if src_version_id is None:
798
            src_version_id = pre_version_id
799
        self._put_metadata_duplicate(
800
            src_version_id, dest_version_id, domain, meta, replace_meta)
801

    
802
        # Check quota.
803
        del_size = self._apply_versioning(account, container, pre_version_id)
804
        size_delta = size - del_size
805
        if size_delta > 0:
806
            account_quota = long(self._get_policy(account_node)['quota'])
807
            container_quota = long(self._get_policy(container_node)['quota'])
808
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
809
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
810
                # This must be executed in a transaction, so the version is never created if it fails.
811
                raise QuotaError
812
        self._report_size_change(user, account, size_delta,
813
                                                         {'action': 'object update', 'path': path,
814
                                                          'versions': [dest_version_id]})
815

    
816
        if permissions is not None:
817
            self.permissions.access_set(path, permissions)
818
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
819

    
820
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
821
        return dest_version_id
822

    
823
    @backend_method
824
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
825
        """Create/update an object with the specified size and partial hashes."""
826

    
827
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
828
                     account, container, name, size, type, hashmap, checksum)
829
        if size == 0:  # No such thing as an empty hashmap.
830
            hashmap = [self.put_block('')]
831
        map = HashMap(self.block_size, self.hash_algorithm)
832
        map.extend([binascii.unhexlify(x) for x in hashmap])
833
        missing = self.store.block_search(map)
834
        if missing:
835
            ie = IndexError()
836
            ie.data = [binascii.hexlify(x) for x in missing]
837
            raise ie
838

    
839
        hash = map.hash()
840
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
841
        self.store.map_put(hash, map)
842
        return dest_version_id
843

    
844
    @backend_method
845
    def update_object_checksum(self, user, account, container, name, version, checksum):
846
        """Update an object's checksum."""
847

    
848
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
849
                     user, account, container, name, version, checksum)
850
        # Update objects with greater version and same hashmap and size (fix metadata updates).
851
        self._can_write(user, account, container, name)
852
        path, node = self._lookup_object(account, container, name)
853
        props = self._get_version(node, version)
854
        versions = self.node.node_get_versions(node)
855
        for x in versions:
856
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
857
                self.node.version_put_property(
858
                    x[self.SERIAL], 'checksum', checksum)
859

    
860
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
861
        dest_version_ids = []
862
        self._can_read(user, src_account, src_container, src_name)
863
        path, node = self._lookup_object(src_account, src_container, src_name)
864
        # TODO: Will do another fetch of the properties in duplicate version...
865
        props = self._get_version(
866
            node, src_version)  # Check to see if source exists.
867
        src_version_id = props[self.SERIAL]
868
        hash = props[self.HASH]
869
        size = props[self.SIZE]
870
        is_copy = not is_move and (src_account, src_container, src_name) != (
871
            dest_account, dest_container, dest_name)  # New uuid.
872
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
873
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
874
            self._delete_object(user, src_account, src_container, src_name)
875

    
876
        if delimiter:
877
            prefix = src_name + \
878
                delimiter if not src_name.endswith(delimiter) else src_name
879
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
880
            src_names.sort(key=lambda x: x[2])  # order by nodes
881
            paths = [elem[0] for elem in src_names]
882
            nodes = [elem[2] for elem in src_names]
883
            # TODO: Will do another fetch of the properties in duplicate version...
884
            props = self._get_versions(nodes)  # Check to see if source exists.
885

    
886
            for prop, path, node in zip(props, paths, nodes):
887
                src_version_id = prop[self.SERIAL]
888
                hash = prop[self.HASH]
889
                vtype = prop[self.TYPE]
890
                size = prop[self.SIZE]
891
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
892
                    delimiter) else dest_name
893
                vdest_name = path.replace(prefix, dest_prefix, 1)
894
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
895
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
896
                    self._delete_object(user, src_account, src_container, path)
897
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
898

    
899
    @backend_method
900
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
901
        """Copy an object's data and metadata."""
902

    
903
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
904
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
905
        return dest_version_id
906

    
907
    @backend_method
908
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
909
        """Move an object's data and metadata."""
910

    
911
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
912
        if user != src_account:
913
            raise NotAllowedError
914
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
915
        return dest_version_id
916

    
917
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
918
        if user != account:
919
            raise NotAllowedError
920

    
921
        if until is not None:
922
            path = '/'.join((account, container, name))
923
            node = self.node.node_lookup(path)
924
            if node is None:
925
                return
926
            hashes = []
927
            size = 0
928
            serials = []
929
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
930
            hashes += h
931
            size += s
932
            serials += v
933
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
934
            hashes += h
935
            size += s
936
            serials += v
937
            for h in hashes:
938
                self.store.map_delete(h)
939
            self.node.node_purge(node, until, CLUSTER_DELETED)
940
            try:
941
                props = self._get_version(node)
942
            except NameError:
943
                self.permissions.access_clear(path)
944
            self._report_size_change(user, account, -size,
945
                                                            {'action': 'object purge', 'path': path,
946
                                                             'versions': serials})
947
            return
948

    
949
        path, node = self._lookup_object(account, container, name)
950
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
951
        del_size = self._apply_versioning(account, container, src_version_id)
952
        if del_size:
953
            self._report_size_change(user, account, -del_size,
954
                                                             {'action': 'object delete', 'path': path,
955
                                                              'versions': [dest_version_id]})
956
        self._report_object_change(
957
            user, account, path, details={'action': 'object delete'})
958
        self.permissions.access_clear(path)
959

    
960
        if delimiter:
961
            prefix = name + delimiter if not name.endswith(delimiter) else name
962
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
963
            paths = []
964
            for t in src_names:
965
                path = '/'.join((account, container, t[0]))
966
                node = t[2]
967
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
968
                del_size = self._apply_versioning(
969
                    account, container, src_version_id)
970
                if del_size:
971
                    self._report_size_change(user, account, -del_size,
972
                                                                     {'action': 'object delete',
973
                                                                      'path': path,
974
                                                                      'versions': [dest_version_id]})
975
                self._report_object_change(
976
                    user, account, path, details={'action': 'object delete'})
977
                paths.append(path)
978
            self.permissions.access_clear_bulk(paths)
979

    
980
    @backend_method
981
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
982
        """Delete/purge an object."""
983

    
984
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
985
                     account, container, name, until, prefix, delimiter)
986
        self._delete_object(user, account, container, name, until, delimiter)
987

    
988
    @backend_method
989
    def list_versions(self, user, account, container, name):
990
        """Return a list of all (version, version_timestamp) tuples for an object."""
991

    
992
        logger.debug(
993
            "list_versions: %s %s %s %s", user, account, container, name)
994
        self._can_read(user, account, container, name)
995
        path, node = self._lookup_object(account, container, name)
996
        versions = self.node.node_get_versions(node)
997
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
998

    
999
    @backend_method
1000
    def get_uuid(self, user, uuid):
1001
        """Return the (account, container, name) for the UUID given."""
1002

    
1003
        logger.debug("get_uuid: %s %s", user, uuid)
1004
        info = self.node.latest_uuid(uuid)
1005
        if info is None:
1006
            raise NameError
1007
        path, serial = info
1008
        account, container, name = path.split('/', 2)
1009
        self._can_read(user, account, container, name)
1010
        return (account, container, name)
1011

    
1012
    @backend_method
1013
    def get_public(self, user, public):
1014
        """Return the (account, container, name) for the public id given."""
1015

    
1016
        logger.debug("get_public: %s %s", user, public)
1017
        if public is None or public < ULTIMATE_ANSWER:
1018
            raise NameError
1019
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1020
        if path is None:
1021
            raise NameError
1022
        account, container, name = path.split('/', 2)
1023
        self._can_read(user, account, container, name)
1024
        return (account, container, name)
1025

    
1026
    @backend_method(autocommit=0)
1027
    def get_block(self, hash):
1028
        """Return a block's data."""
1029

    
1030
        logger.debug("get_block: %s", hash)
1031
        block = self.store.block_get(binascii.unhexlify(hash))
1032
        if not block:
1033
            raise ItemNotExists('Block does not exist')
1034
        return block
1035

    
1036
    @backend_method(autocommit=0)
1037
    def put_block(self, data):
1038
        """Store a block and return the hash."""
1039

    
1040
        logger.debug("put_block: %s", len(data))
1041
        return binascii.hexlify(self.store.block_put(data))
1042

    
1043
    @backend_method(autocommit=0)
1044
    def update_block(self, hash, data, offset=0):
1045
        """Update a known block and return the hash."""
1046

    
1047
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1048
        if offset == 0 and len(data) == self.block_size:
1049
            return self.put_block(data)
1050
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1051
        return binascii.hexlify(h)
1052

    
1053
    # Path functions.
1054

    
1055
    def _generate_uuid(self):
1056
        return str(uuidlib.uuid4())
1057

    
1058
    def _put_object_node(self, path, parent, name):
1059
        path = '/'.join((path, name))
1060
        node = self.node.node_lookup(path)
1061
        if node is None:
1062
            node = self.node.node_create(parent, path)
1063
        return path, node
1064

    
1065
    def _put_path(self, user, parent, path):
1066
        node = self.node.node_create(parent, path)
1067
        self.node.version_create(node, None, 0, '', None, user,
1068
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1069
        return node
1070

    
1071
    def _lookup_account(self, account, create=True):
1072
        node = self.node.node_lookup(account)
1073
        if node is None and create:
1074
            node = self._put_path(
1075
                account, self.ROOTNODE, account)  # User is account.
1076
        return account, node
1077

    
1078
    def _lookup_container(self, account, container):
1079
        path = '/'.join((account, container))
1080
        node = self.node.node_lookup(path)
1081
        if node is None:
1082
            raise ItemNotExists('Container does not exist')
1083
        return path, node
1084

    
1085
    def _lookup_object(self, account, container, name):
1086
        path = '/'.join((account, container, name))
1087
        node = self.node.node_lookup(path)
1088
        if node is None:
1089
            raise ItemNotExists('Object does not exist')
1090
        return path, node
1091

    
1092
    def _lookup_objects(self, paths):
1093
        nodes = self.node.node_lookup_bulk(paths)
1094
        return paths, nodes
1095

    
1096
    def _get_properties(self, node, until=None):
1097
        """Return properties until the timestamp given."""
1098

    
1099
        before = until if until is not None else inf
1100
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1101
        if props is None and until is not None:
1102
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1103
        if props is None:
1104
            raise ItemNotExists('Path does not exist')
1105
        return props
1106

    
1107
    def _get_statistics(self, node, until=None):
1108
        """Return count, sum of size and latest timestamp of everything under node."""
1109

    
1110
        if until is None:
1111
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1112
        else:
1113
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1114
        if stats is None:
1115
            stats = (0, 0, 0)
1116
        return stats
1117

    
1118
    def _get_version(self, node, version=None):
1119
        if version is None:
1120
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1121
            if props is None:
1122
                raise ItemNotExists('Object does not exist')
1123
        else:
1124
            try:
1125
                version = int(version)
1126
            except ValueError:
1127
                raise VersionNotExists('Version does not exist')
1128
            props = self.node.version_get_properties(version)
1129
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1130
                raise VersionNotExists('Version does not exist')
1131
        return props
1132

    
1133
    def _get_versions(self, nodes):
1134
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1135

    
1136
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1137
        """Create a new version of the node."""
1138

    
1139
        props = self.node.version_lookup(
1140
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1141
        if props is not None:
1142
            src_version_id = props[self.SERIAL]
1143
            src_hash = props[self.HASH]
1144
            src_size = props[self.SIZE]
1145
            src_type = props[self.TYPE]
1146
            src_checksum = props[self.CHECKSUM]
1147
        else:
1148
            src_version_id = None
1149
            src_hash = None
1150
            src_size = 0
1151
            src_type = ''
1152
            src_checksum = ''
1153
        if size is None:  # Set metadata.
1154
            hash = src_hash  # This way hash can be set to None (account or container).
1155
            size = src_size
1156
        if type is None:
1157
            type = src_type
1158
        if checksum is None:
1159
            checksum = src_checksum
1160
        uuid = self._generate_uuid(
1161
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1162

    
1163
        if src_node is None:
1164
            pre_version_id = src_version_id
1165
        else:
1166
            pre_version_id = None
1167
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1168
            if props is not None:
1169
                pre_version_id = props[self.SERIAL]
1170
        if pre_version_id is not None:
1171
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1172

    
1173
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1174
        return pre_version_id, dest_version_id
1175

    
1176
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1177
        if src_version_id is not None:
1178
            self.node.attribute_copy(src_version_id, dest_version_id)
1179
        if not replace:
1180
            self.node.attribute_del(dest_version_id, domain, (
1181
                k for k, v in meta.iteritems() if v == ''))
1182
            self.node.attribute_set(dest_version_id, domain, (
1183
                (k, v) for k, v in meta.iteritems() if v != ''))
1184
        else:
1185
            self.node.attribute_del(dest_version_id, domain)
1186
            self.node.attribute_set(dest_version_id, domain, ((
1187
                k, v) for k, v in meta.iteritems()))
1188

    
1189
    def _put_metadata(self, user, node, domain, meta, replace=False):
1190
        """Create a new version and store metadata."""
1191

    
1192
        src_version_id, dest_version_id = self._put_version_duplicate(
1193
            user, node)
1194
        self._put_metadata_duplicate(
1195
            src_version_id, dest_version_id, domain, meta, replace)
1196
        return src_version_id, dest_version_id
1197

    
1198
    def _list_limits(self, listing, marker, limit):
1199
        start = 0
1200
        if marker:
1201
            try:
1202
                start = listing.index(marker) + 1
1203
            except ValueError:
1204
                pass
1205
        if not limit or limit > 10000:
1206
            limit = 10000
1207
        return start, limit
1208

    
1209
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1210
        cont_prefix = path + '/'
1211
        prefix = cont_prefix + prefix
1212
        start = cont_prefix + marker if marker else None
1213
        before = until if until is not None else inf
1214
        filterq = keys if domain else []
1215
        sizeq = size_range
1216

    
1217
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1218
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1219
        objects.sort(key=lambda x: x[0])
1220
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1221
        return objects
1222

    
1223
    # Reporting functions.
1224

    
1225
    def _report_size_change(self, user, account, size, details={}):
1226
        account_node = self._lookup_account(account, True)[1]
1227
        total = self._get_statistics(account_node)[1]
1228
        details.update({'user': user, 'total': total})
1229
        logger.debug(
1230
            "_report_size_change: %s %s %s %s", user, account, size, details)
1231
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1232
                                                  account, QUEUE_INSTANCE_ID, 'diskspace',
1233
                                                  float(size), details))
1234

    
1235
    def _report_object_change(self, user, account, path, details={}):
1236
        details.update({'user': user})
1237
        logger.debug("_report_object_change: %s %s %s %s", user,
1238
                     account, path, details)
1239
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1240
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1241

    
1242
    def _report_sharing_change(self, user, account, path, details={}):
1243
        logger.debug("_report_permissions_change: %s %s %s %s",
1244
                     user, account, path, details)
1245
        details.update({'user': user})
1246
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1247
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1248

    
1249
    # Policy functions.
1250

    
1251
    def _check_policy(self, policy):
1252
        for k in policy.keys():
1253
            if policy[k] == '':
1254
                policy[k] = self.default_policy.get(k)
1255
        for k, v in policy.iteritems():
1256
            if k == 'quota':
1257
                q = int(v)  # May raise ValueError.
1258
                if q < 0:
1259
                    raise ValueError
1260
            elif k == 'versioning':
1261
                if v not in ['auto', 'none']:
1262
                    raise ValueError
1263
            else:
1264
                raise ValueError
1265

    
1266
    def _put_policy(self, node, policy, replace):
1267
        if replace:
1268
            for k, v in self.default_policy.iteritems():
1269
                if k not in policy:
1270
                    policy[k] = v
1271
        self.node.policy_set(node, policy)
1272

    
1273
    def _get_policy(self, node):
1274
        policy = self.default_policy.copy()
1275
        policy.update(self.node.policy_get(node))
1276
        return policy
1277

    
1278
    def _apply_versioning(self, account, container, version_id):
1279
        """Delete the provided version if such is the policy.
1280
           Return size of object removed.
1281
        """
1282

    
1283
        if version_id is None:
1284
            return 0
1285
        path, node = self._lookup_container(account, container)
1286
        versioning = self._get_policy(node)['versioning']
1287
        if versioning != 'auto':
1288
            hash, size = self.node.version_remove(version_id)
1289
            self.store.map_delete(hash)
1290
            return size
1291
        return 0
1292

    
1293
    # Access control functions.
1294

    
1295
    def _check_groups(self, groups):
1296
        # raise ValueError('Bad characters in groups')
1297
        pass
1298

    
1299
    def _check_permissions(self, path, permissions):
1300
        # raise ValueError('Bad characters in permissions')
1301
        pass
1302

    
1303
    def _get_formatted_paths(self, paths):
1304
        formatted = []
1305
        for p in paths:
1306
            node = self.node.node_lookup(p)
1307
            if node is not None:
1308
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1309
            if props is not None:
1310
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1311
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1312
                formatted.append((p, self.MATCH_EXACT))
1313
        return formatted
1314

    
1315
    def _get_permissions_path(self, account, container, name):
1316
        path = '/'.join((account, container, name))
1317
        permission_paths = self.permissions.access_inherit(path)
1318
        permission_paths.sort()
1319
        permission_paths.reverse()
1320
        for p in permission_paths:
1321
            if p == path:
1322
                return p
1323
            else:
1324
                if p.count('/') < 2:
1325
                    continue
1326
                node = self.node.node_lookup(p)
1327
                if node is not None:
1328
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1329
                if props is not None:
1330
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1331
                        return p
1332
        return None
1333

    
1334
    def _can_read(self, user, account, container, name):
1335
        if user == account:
1336
            return True
1337
        path = '/'.join((account, container, name))
1338
        if self.permissions.public_get(path) is not None:
1339
            return True
1340
        path = self._get_permissions_path(account, container, name)
1341
        if not path:
1342
            raise NotAllowedError
1343
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1344
            raise NotAllowedError
1345

    
1346
    def _can_write(self, user, account, container, name):
1347
        if user == account:
1348
            return True
1349
        path = '/'.join((account, container, name))
1350
        path = self._get_permissions_path(account, container, name)
1351
        if not path:
1352
            raise NotAllowedError
1353
        if not self.permissions.access_check(path, self.WRITE, user):
1354
            raise NotAllowedError
1355

    
1356
    def _allowed_accounts(self, user):
1357
        allow = set()
1358
        for path in self.permissions.access_list_paths(user):
1359
            allow.add(path.split('/', 1)[0])
1360
        return sorted(allow)
1361

    
1362
    def _allowed_containers(self, user, account):
1363
        allow = set()
1364
        for path in self.permissions.access_list_paths(user, account):
1365
            allow.add(path.split('/', 2)[1])
1366
        return sorted(allow)