Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d3655326

History | View | Annotate | Download (61.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            self.quotaholder.reject_commission(
129
                        context     =   {},
130
                        clientkey   =   'pithos',
131
                        serials     =   serials)
132
            self.wrapper.rollback()
133
            raise
134
    return fn
135

    
136

    
137
class ModularBackend(BaseBackend):
138
    """A modular backend.
139

140
    Uses modules for SQL functions and storage.
141
    """
142

    
143
    def __init__(self, db_module=None, db_connection=None,
144
                 block_module=None, block_path=None, block_umask=None,
145
                 queue_module=None, queue_hosts=None,
146
                 queue_exchange=None, quotaholder_url=None,
147
                 free_versioning=True):
148
        db_module = db_module or DEFAULT_DB_MODULE
149
        db_connection = db_connection or DEFAULT_DB_CONNECTION
150
        block_module = block_module or DEFAULT_BLOCK_MODULE
151
        block_path = block_path or DEFAULT_BLOCK_PATH
152
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
153
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
154
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
155
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
156
        
157
        self.hash_algorithm = 'sha256'
158
        self.block_size = 4 * 1024 * 1024  # 4MB
159
        self.free_versioning = free_versioning
160

    
161
        self.default_policy = {'quota': DEFAULT_QUOTA,
162
                               'versioning': DEFAULT_VERSIONING}
163

    
164
        def load_module(m):
165
            __import__(m)
166
            return sys.modules[m]
167

    
168
        self.db_module = load_module(db_module)
169
        self.wrapper = self.db_module.DBWrapper(db_connection)
170
        params = {'wrapper': self.wrapper}
171
        self.permissions = self.db_module.Permissions(**params)
172
        self.config = self.db_module.Config(**params)
173
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
174
        for x in ['READ', 'WRITE']:
175
            setattr(self, x, getattr(self.db_module, x))
176
        self.node = self.db_module.Node(**params)
177
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
178
            setattr(self, x, getattr(self.db_module, x))
179

    
180
        self.block_module = load_module(block_module)
181
        params = {'path': block_path,
182
                  'block_size': self.block_size,
183
                  'hash_algorithm': self.hash_algorithm,
184
                  'umask': block_umask}
185
        self.store = self.block_module.Store(**params)
186

    
187
        if queue_module and queue_hosts:
188
            self.queue_module = load_module(queue_module)
189
            params = {'hosts': queue_hosts,
190
                              'exchange': queue_exchange,
191
                      'client_id': QUEUE_CLIENT_ID}
192
            self.queue = self.queue_module.Queue(**params)
193
        else:
194
            class NoQueue:
195
                def send(self, *args):
196
                    pass
197

    
198
                def close(self):
199
                    pass
200

    
201
            self.queue = NoQueue()
202

    
203
        self.quotaholder_url = quotaholder_url
204
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
205
        self.serials = []
206
        self.messages = []
207

    
208
    def close(self):
209
        self.wrapper.close()
210
        self.queue.close()
211

    
212
    @backend_method
213
    def list_accounts(self, user, marker=None, limit=10000):
214
        """Return a list of accounts the user can access."""
215

    
216
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
217
        allowed = self._allowed_accounts(user)
218
        start, limit = self._list_limits(allowed, marker, limit)
219
        return allowed[start:start + limit]
220

    
221
    @backend_method
222
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
223
        """Return a dictionary with the account metadata for the domain."""
224

    
225
        logger.debug(
226
            "get_account_meta: %s %s %s %s", user, account, domain, until)
227
        path, node = self._lookup_account(account, user == account)
228
        if user != account:
229
            if until or node is None or account not in self._allowed_accounts(user):
230
                raise NotAllowedError
231
        try:
232
            props = self._get_properties(node, until)
233
            mtime = props[self.MTIME]
234
        except NameError:
235
            props = None
236
            mtime = until
237
        count, bytes, tstamp = self._get_statistics(node, until)
238
        tstamp = max(tstamp, mtime)
239
        if until is None:
240
            modified = tstamp
241
        else:
242
            modified = self._get_statistics(
243
                node)[2]  # Overall last modification.
244
            modified = max(modified, mtime)
245

    
246
        if user != account:
247
            meta = {'name': account}
248
        else:
249
            meta = {}
250
            if props is not None and include_user_defined:
251
                meta.update(
252
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
253
            if until is not None:
254
                meta.update({'until_timestamp': tstamp})
255
            meta.update({'name': account, 'count': count, 'bytes': bytes})
256
        meta.update({'modified': modified})
257
        return meta
258

    
259
    @backend_method
260
    def update_account_meta(self, user, account, domain, meta, replace=False):
261
        """Update the metadata associated with the account for the domain."""
262

    
263
        logger.debug("update_account_meta: %s %s %s %s %s", user,
264
                     account, domain, meta, replace)
265
        if user != account:
266
            raise NotAllowedError
267
        path, node = self._lookup_account(account, True)
268
        self._put_metadata(user, node, domain, meta, replace)
269

    
270
    @backend_method
271
    def get_account_groups(self, user, account):
272
        """Return a dictionary with the user groups defined for this account."""
273

    
274
        logger.debug("get_account_groups: %s %s", user, account)
275
        if user != account:
276
            if account not in self._allowed_accounts(user):
277
                raise NotAllowedError
278
            return {}
279
        self._lookup_account(account, True)
280
        return self.permissions.group_dict(account)
281

    
282
    @backend_method
283
    def update_account_groups(self, user, account, groups, replace=False):
284
        """Update the groups associated with the account."""
285

    
286
        logger.debug("update_account_groups: %s %s %s %s", user,
287
                     account, groups, replace)
288
        if user != account:
289
            raise NotAllowedError
290
        self._lookup_account(account, True)
291
        self._check_groups(groups)
292
        if replace:
293
            self.permissions.group_destroy(account)
294
        for k, v in groups.iteritems():
295
            if not replace:  # If not already deleted.
296
                self.permissions.group_delete(account, k)
297
            if v:
298
                self.permissions.group_addmany(account, k, v)
299

    
300
    @backend_method
301
    def get_account_policy(self, user, account):
302
        """Return a dictionary with the account policy."""
303

    
304
        logger.debug("get_account_policy: %s %s", user, account)
305
        if user != account:
306
            if account not in self._allowed_accounts(user):
307
                raise NotAllowedError
308
            return {}
309
        path, node = self._lookup_account(account, True)
310
        return self._get_policy(node)
311

    
312
    @backend_method
313
    def update_account_policy(self, user, account, policy, replace=False):
314
        """Update the policy associated with the account."""
315

    
316
        logger.debug("update_account_policy: %s %s %s %s", user,
317
                     account, policy, replace)
318
        if user != account:
319
            raise NotAllowedError
320
        path, node = self._lookup_account(account, True)
321
        self._check_policy(policy)
322
        self._put_policy(node, policy, replace)
323

    
324
    @backend_method
325
    def put_account(self, user, account, policy={}):
326
        """Create a new account with the given name."""
327

    
328
        logger.debug("put_account: %s %s %s", user, account, policy)
329
        if user != account:
330
            raise NotAllowedError
331
        node = self.node.node_lookup(account)
332
        if node is not None:
333
            raise AccountExists('Account already exists')
334
        if policy:
335
            self._check_policy(policy)
336
        node = self._put_path(user, self.ROOTNODE, account)
337
        self._put_policy(node, policy, True)
338

    
339
    @backend_method
340
    def delete_account(self, user, account):
341
        """Delete the account with the given name."""
342

    
343
        logger.debug("delete_account: %s %s", user, account)
344
        if user != account:
345
            raise NotAllowedError
346
        node = self.node.node_lookup(account)
347
        if node is None:
348
            return
349
        if not self.node.node_remove(node):
350
            raise AccountNotEmpty('Account is not empty')
351
        self.permissions.group_destroy(account)
352

    
353
    @backend_method
354
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
355
        """Return a list of containers existing under an account."""
356

    
357
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
358
                     account, marker, limit, shared, until, public)
359
        if user != account:
360
            if until or account not in self._allowed_accounts(user):
361
                raise NotAllowedError
362
            allowed = self._allowed_containers(user, account)
363
            start, limit = self._list_limits(allowed, marker, limit)
364
            return allowed[start:start + limit]
365
        if shared or public:
366
            allowed = set()
367
            if shared:
368
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
369
            if public:
370
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
371
            allowed = sorted(allowed)
372
            start, limit = self._list_limits(allowed, marker, limit)
373
            return allowed[start:start + limit]
374
        node = self.node.node_lookup(account)
375
        containers = [x[0] for x in self._list_object_properties(
376
            node, account, '', '/', marker, limit, False, None, [], until)]
377
        start, limit = self._list_limits(
378
            [x[0] for x in containers], marker, limit)
379
        return containers[start:start + limit]
380

    
381
    @backend_method
382
    def list_container_meta(self, user, account, container, domain, until=None):
383
        """Return a list with all the container's object meta keys for the domain."""
384

    
385
        logger.debug("list_container_meta: %s %s %s %s %s", user,
386
                     account, container, domain, until)
387
        allowed = []
388
        if user != account:
389
            if until:
390
                raise NotAllowedError
391
            allowed = self.permissions.access_list_paths(
392
                user, '/'.join((account, container)))
393
            if not allowed:
394
                raise NotAllowedError
395
        path, node = self._lookup_container(account, container)
396
        before = until if until is not None else inf
397
        allowed = self._get_formatted_paths(allowed)
398
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
399

    
400
    @backend_method
401
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
402
        """Return a dictionary with the container metadata for the domain."""
403

    
404
        logger.debug("get_container_meta: %s %s %s %s %s", user,
405
                     account, container, domain, until)
406
        if user != account:
407
            if until or container not in self._allowed_containers(user, account):
408
                raise NotAllowedError
409
        path, node = self._lookup_container(account, container)
410
        props = self._get_properties(node, until)
411
        mtime = props[self.MTIME]
412
        count, bytes, tstamp = self._get_statistics(node, until)
413
        tstamp = max(tstamp, mtime)
414
        if until is None:
415
            modified = tstamp
416
        else:
417
            modified = self._get_statistics(
418
                node)[2]  # Overall last modification.
419
            modified = max(modified, mtime)
420

    
421
        if user != account:
422
            meta = {'name': container}
423
        else:
424
            meta = {}
425
            if include_user_defined:
426
                meta.update(
427
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
428
            if until is not None:
429
                meta.update({'until_timestamp': tstamp})
430
            meta.update({'name': container, 'count': count, 'bytes': bytes})
431
        meta.update({'modified': modified})
432
        return meta
433

    
434
    @backend_method
435
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
436
        """Update the metadata associated with the container for the domain."""
437

    
438
        logger.debug("update_container_meta: %s %s %s %s %s %s",
439
                     user, account, container, domain, meta, replace)
440
        if user != account:
441
            raise NotAllowedError
442
        path, node = self._lookup_container(account, container)
443
        src_version_id, dest_version_id = self._put_metadata(
444
            user, node, domain, meta, replace)
445
        if src_version_id is not None:
446
            versioning = self._get_policy(node)['versioning']
447
            if versioning != 'auto':
448
                self.node.version_remove(src_version_id)
449

    
450
    @backend_method
451
    def get_container_policy(self, user, account, container):
452
        """Return a dictionary with the container policy."""
453

    
454
        logger.debug(
455
            "get_container_policy: %s %s %s", user, account, container)
456
        if user != account:
457
            if container not in self._allowed_containers(user, account):
458
                raise NotAllowedError
459
            return {}
460
        path, node = self._lookup_container(account, container)
461
        return self._get_policy(node)
462

    
463
    @backend_method
464
    def update_container_policy(self, user, account, container, policy, replace=False):
465
        """Update the policy associated with the container."""
466

    
467
        logger.debug("update_container_policy: %s %s %s %s %s",
468
                     user, account, container, policy, replace)
469
        if user != account:
470
            raise NotAllowedError
471
        path, node = self._lookup_container(account, container)
472
        self._check_policy(policy)
473
        self._put_policy(node, policy, replace)
474

    
475
    @backend_method
476
    def put_container(self, user, account, container, policy={}):
477
        """Create a new container with the given name."""
478

    
479
        logger.debug(
480
            "put_container: %s %s %s %s", user, account, container, policy)
481
        if user != account:
482
            raise NotAllowedError
483
        try:
484
            path, node = self._lookup_container(account, container)
485
        except NameError:
486
            pass
487
        else:
488
            raise ContainerExists('Container already exists')
489
        if policy:
490
            self._check_policy(policy)
491
        path = '/'.join((account, container))
492
        node = self._put_path(
493
            user, self._lookup_account(account, True)[1], path)
494
        self._put_policy(node, policy, True)
495

    
496
    @backend_method
497
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
498
        """Delete/purge the container with the given name."""
499

    
500
        logger.debug("delete_container: %s %s %s %s %s %s", user,
501
                     account, container, until, prefix, delimiter)
502
        if user != account:
503
            raise NotAllowedError
504
        path, node = self._lookup_container(account, container)
505

    
506
        if until is not None:
507
            hashes, size, serials = self.node.node_purge_children(
508
                node, until, CLUSTER_HISTORY)
509
            for h in hashes:
510
                self.store.map_delete(h)
511
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
512
            self._report_size_change(user, account, -size,
513
                                     {'action':'container purge', 'path': path,
514
                                      'versions': ','.join(str(i) for i in serials)})
515
            return
516

    
517
        if not delimiter:
518
            if self._get_statistics(node)[0] > 0:
519
                raise ContainerNotEmpty('Container is not empty')
520
            hashes, size, serials = self.node.node_purge_children(
521
                node, inf, CLUSTER_HISTORY)
522
            for h in hashes:
523
                self.store.map_delete(h)
524
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
525
            self.node.node_remove(node)
526
            self._report_size_change(user, account, -size,
527
                                     {'action': 'container delete',
528
                                      'path': path,
529
                                      'versions': ','.join(str(i) for i in serials)})
530
        else:
531
            # remove only contents
532
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
533
            paths = []
534
            for t in src_names:
535
                path = '/'.join((account, container, t[0]))
536
                node = t[2]
537
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
538
                del_size = self._apply_versioning(
539
                    account, container, src_version_id)
540
                self._report_size_change(
541
                        user, account, -del_size, {
542
                                'action': 'object delete',
543
                                'path': path,
544
                             'versions': ','.join([str(dest_version_id)])
545
                     }
546
                )
547
                self._report_object_change(
548
                    user, account, path, details={'action': 'object delete'})
549
                paths.append(path)
550
            self.permissions.access_clear_bulk(paths)
551

    
552
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
553
        if user != account and until:
554
            raise NotAllowedError
555
        if shared and public:
556
            # get shared first
557
            shared = self._list_object_permissions(
558
                user, account, container, prefix, shared=True, public=False)
559
            objects = set()
560
            if shared:
561
                path, node = self._lookup_container(account, container)
562
                shared = self._get_formatted_paths(shared)
563
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
564

    
565
            # get public
566
            objects |= set(self._list_public_object_properties(
567
                user, account, container, prefix, all_props))
568
            objects = list(objects)
569

    
570
            objects.sort(key=lambda x: x[0])
571
            start, limit = self._list_limits(
572
                [x[0] for x in objects], marker, limit)
573
            return objects[start:start + limit]
574
        elif public:
575
            objects = self._list_public_object_properties(
576
                user, account, container, prefix, all_props)
577
            start, limit = self._list_limits(
578
                [x[0] for x in objects], marker, limit)
579
            return objects[start:start + limit]
580

    
581
        allowed = self._list_object_permissions(
582
            user, account, container, prefix, shared, public)
583
        if shared and not allowed:
584
            return []
585
        path, node = self._lookup_container(account, container)
586
        allowed = self._get_formatted_paths(allowed)
587
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
588
        start, limit = self._list_limits(
589
            [x[0] for x in objects], marker, limit)
590
        return objects[start:start + limit]
591

    
592
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
593
        public = self._list_object_permissions(
594
            user, account, container, prefix, shared=False, public=True)
595
        paths, nodes = self._lookup_objects(public)
596
        path = '/'.join((account, container))
597
        cont_prefix = path + '/'
598
        paths = [x[len(cont_prefix):] for x in paths]
599
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
600
        objects = [(path,) + props for path, props in zip(paths, props)]
601
        return objects
602

    
603
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
604
        objects = []
605
        while True:
606
            marker = objects[-1] if objects else None
607
            limit = 10000
608
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
609
            objects.extend(l)
610
            if not l or len(l) < limit:
611
                break
612
        return objects
613

    
614
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
615
        allowed = []
616
        path = '/'.join((account, container, prefix)).rstrip('/')
617
        if user != account:
618
            allowed = self.permissions.access_list_paths(user, path)
619
            if not allowed:
620
                raise NotAllowedError
621
        else:
622
            allowed = set()
623
            if shared:
624
                allowed.update(self.permissions.access_list_shared(path))
625
            if public:
626
                allowed.update(
627
                    [x[0] for x in self.permissions.public_list(path)])
628
            allowed = sorted(allowed)
629
            if not allowed:
630
                return []
631
        return allowed
632

    
633
    @backend_method
634
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
635
        """Return a list of object (name, version_id) tuples existing under a container."""
636

    
637
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
638
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
639

    
640
    @backend_method
641
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
642
        """Return a list of object metadata dicts existing under a container."""
643

    
644
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
645
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
646
        objects = []
647
        for p in props:
648
            if len(p) == 2:
649
                objects.append({'subdir': p[0]})
650
            else:
651
                objects.append({'name': p[0],
652
                                'bytes': p[self.SIZE + 1],
653
                                'type': p[self.TYPE + 1],
654
                                'hash': p[self.HASH + 1],
655
                                'version': p[self.SERIAL + 1],
656
                                'version_timestamp': p[self.MTIME + 1],
657
                                'modified': p[self.MTIME + 1] if until is None else None,
658
                                'modified_by': p[self.MUSER + 1],
659
                                'uuid': p[self.UUID + 1],
660
                                'checksum': p[self.CHECKSUM + 1]})
661
        return objects
662

    
663
    @backend_method
664
    def list_object_permissions(self, user, account, container, prefix=''):
665
        """Return a list of paths that enforce permissions under a container."""
666

    
667
        logger.debug("list_object_permissions: %s %s %s %s", user,
668
                     account, container, prefix)
669
        return self._list_object_permissions(user, account, container, prefix, True, False)
670

    
671
    @backend_method
672
    def list_object_public(self, user, account, container, prefix=''):
673
        """Return a dict mapping paths to public ids for objects that are public under a container."""
674

    
675
        logger.debug("list_object_public: %s %s %s %s", user,
676
                     account, container, prefix)
677
        public = {}
678
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
679
            public[path] = p + ULTIMATE_ANSWER
680
        return public
681

    
682
    @backend_method
683
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
684
        """Return a dictionary with the object metadata for the domain."""
685

    
686
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
687
                     account, container, name, domain, version)
688
        self._can_read(user, account, container, name)
689
        path, node = self._lookup_object(account, container, name)
690
        props = self._get_version(node, version)
691
        if version is None:
692
            modified = props[self.MTIME]
693
        else:
694
            try:
695
                modified = self._get_version(
696
                    node)[self.MTIME]  # Overall last modification.
697
            except NameError:  # Object may be deleted.
698
                del_props = self.node.version_lookup(
699
                    node, inf, CLUSTER_DELETED)
700
                if del_props is None:
701
                    raise ItemNotExists('Object does not exist')
702
                modified = del_props[self.MTIME]
703

    
704
        meta = {}
705
        if include_user_defined:
706
            meta.update(
707
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
708
        meta.update({'name': name,
709
                     'bytes': props[self.SIZE],
710
                     'type': props[self.TYPE],
711
                     'hash': props[self.HASH],
712
                     'version': props[self.SERIAL],
713
                     'version_timestamp': props[self.MTIME],
714
                     'modified': modified,
715
                     'modified_by': props[self.MUSER],
716
                     'uuid': props[self.UUID],
717
                     'checksum': props[self.CHECKSUM]})
718
        return meta
719

    
720
    @backend_method
721
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
722
        """Update the metadata associated with the object for the domain and return the new version."""
723

    
724
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
725
                     user, account, container, name, domain, meta, replace)
726
        self._can_write(user, account, container, name)
727
        path, node = self._lookup_object(account, container, name)
728
        src_version_id, dest_version_id = self._put_metadata(
729
            user, node, domain, meta, replace)
730
        self._apply_versioning(account, container, src_version_id)
731
        return dest_version_id
732

    
733
    @backend_method
734
    def get_object_permissions(self, user, account, container, name):
735
        """Return the action allowed on the object, the path
736
        from which the object gets its permissions from,
737
        along with a dictionary containing the permissions."""
738

    
739
        logger.debug("get_object_permissions: %s %s %s %s", user,
740
                     account, container, name)
741
        allowed = 'write'
742
        permissions_path = self._get_permissions_path(account, container, name)
743
        if user != account:
744
            if self.permissions.access_check(permissions_path, self.WRITE, user):
745
                allowed = 'write'
746
            elif self.permissions.access_check(permissions_path, self.READ, user):
747
                allowed = 'read'
748
            else:
749
                raise NotAllowedError
750
        self._lookup_object(account, container, name)
751
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
752

    
753
    @backend_method
754
    def update_object_permissions(self, user, account, container, name, permissions):
755
        """Update the permissions associated with the object."""
756

    
757
        logger.debug("update_object_permissions: %s %s %s %s %s",
758
                     user, account, container, name, permissions)
759
        if user != account:
760
            raise NotAllowedError
761
        path = self._lookup_object(account, container, name)[0]
762
        self._check_permissions(path, permissions)
763
        self.permissions.access_set(path, permissions)
764
        self._report_sharing_change(user, account, path, {'members':
765
                                    self.permissions.access_members(path)})
766

    
767
    @backend_method
768
    def get_object_public(self, user, account, container, name):
769
        """Return the public id of the object if applicable."""
770

    
771
        logger.debug(
772
            "get_object_public: %s %s %s %s", user, account, container, name)
773
        self._can_read(user, account, container, name)
774
        path = self._lookup_object(account, container, name)[0]
775
        p = self.permissions.public_get(path)
776
        if p is not None:
777
            p += ULTIMATE_ANSWER
778
        return p
779

    
780
    @backend_method
781
    def update_object_public(self, user, account, container, name, public):
782
        """Update the public status of the object."""
783

    
784
        logger.debug("update_object_public: %s %s %s %s %s", user,
785
                     account, container, name, public)
786
        self._can_write(user, account, container, name)
787
        path = self._lookup_object(account, container, name)[0]
788
        if not public:
789
            self.permissions.public_unset(path)
790
        else:
791
            self.permissions.public_set(path)
792

    
793
    @backend_method
794
    def get_object_hashmap(self, user, account, container, name, version=None):
795
        """Return the object's size and a list with partial hashes."""
796

    
797
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
798
                     account, container, name, version)
799
        self._can_read(user, account, container, name)
800
        path, node = self._lookup_object(account, container, name)
801
        props = self._get_version(node, version)
802
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
803
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
804

    
805
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
806
        if permissions is not None and user != account:
807
            raise NotAllowedError
808
        self._can_write(user, account, container, name)
809
        if permissions is not None:
810
            path = '/'.join((account, container, name))
811
            self._check_permissions(path, permissions)
812

    
813
        account_path, account_node = self._lookup_account(account, True)
814
        container_path, container_node = self._lookup_container(
815
            account, container)
816
        path, node = self._put_object_node(
817
            container_path, container_node, name)
818
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
819

    
820
        # Handle meta.
821
        if src_version_id is None:
822
            src_version_id = pre_version_id
823
        self._put_metadata_duplicate(
824
            src_version_id, dest_version_id, domain, meta, replace_meta)
825

    
826
        del_size = self._apply_versioning(account, container, pre_version_id)
827
        size_delta = size - del_size
828
        if not self.quotaholder_url: # Check quota.
829
            if size_delta > 0:
830
                account_quota = long(self._get_policy(account_node)['quota'])
831
                account_usage = self._get_statistics(account_node)[1] + size_delta
832
                container_quota = long(self._get_policy(container_node)['quota'])
833
                container_usage = self._get_statistics(container_node)[1] + size_delta
834
                if (account_quota > 0 and account_usage > account_quota):
835
                    logger.error('account_quota: %s, account_usage: %s' % (
836
                        account_quota, account_usage
837
                    ))
838
                    raise QuotaError
839
                if (container_quota > 0 and container_usage > container_quota):
840
                    # This must be executed in a transaction, so the version is
841
                    # never created if it fails.
842
                    logger.error('container_quota: %s, container_usage: %s' % (
843
                        container_quota, container_usage
844
                    ))
845
                    raise QuotaError
846
        self._report_size_change(user, account, size_delta,
847
                                 {'action': 'object update', 'path': path,
848
                                  'versions': ','.join([str(dest_version_id)])})
849
        if permissions is not None:
850
            self.permissions.access_set(path, permissions)
851
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
852

    
853
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
854
        return dest_version_id
855

    
856
    @backend_method
857
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
858
        """Create/update an object with the specified size and partial hashes."""
859

    
860
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
861
                     account, container, name, size, type, hashmap, checksum)
862
        if size == 0:  # No such thing as an empty hashmap.
863
            hashmap = [self.put_block('')]
864
        map = HashMap(self.block_size, self.hash_algorithm)
865
        map.extend([binascii.unhexlify(x) for x in hashmap])
866
        missing = self.store.block_search(map)
867
        if missing:
868
            ie = IndexError()
869
            ie.data = [binascii.hexlify(x) for x in missing]
870
            raise ie
871

    
872
        hash = map.hash()
873
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
874
        self.store.map_put(hash, map)
875
        return dest_version_id
876

    
877
    @backend_method
878
    def update_object_checksum(self, user, account, container, name, version, checksum):
879
        """Update an object's checksum."""
880

    
881
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
882
                     user, account, container, name, version, checksum)
883
        # Update objects with greater version and same hashmap and size (fix metadata updates).
884
        self._can_write(user, account, container, name)
885
        path, node = self._lookup_object(account, container, name)
886
        props = self._get_version(node, version)
887
        versions = self.node.node_get_versions(node)
888
        for x in versions:
889
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
890
                self.node.version_put_property(
891
                    x[self.SERIAL], 'checksum', checksum)
892

    
893
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
894
        dest_version_ids = []
895
        self._can_read(user, src_account, src_container, src_name)
896
        path, node = self._lookup_object(src_account, src_container, src_name)
897
        # TODO: Will do another fetch of the properties in duplicate version...
898
        props = self._get_version(
899
            node, src_version)  # Check to see if source exists.
900
        src_version_id = props[self.SERIAL]
901
        hash = props[self.HASH]
902
        size = props[self.SIZE]
903
        is_copy = not is_move and (src_account, src_container, src_name) != (
904
            dest_account, dest_container, dest_name)  # New uuid.
905
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
906
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
907
            self._delete_object(user, src_account, src_container, src_name)
908

    
909
        if delimiter:
910
            prefix = src_name + \
911
                delimiter if not src_name.endswith(delimiter) else src_name
912
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
913
            src_names.sort(key=lambda x: x[2])  # order by nodes
914
            paths = [elem[0] for elem in src_names]
915
            nodes = [elem[2] for elem in src_names]
916
            # TODO: Will do another fetch of the properties in duplicate version...
917
            props = self._get_versions(nodes)  # Check to see if source exists.
918

    
919
            for prop, path, node in zip(props, paths, nodes):
920
                src_version_id = prop[self.SERIAL]
921
                hash = prop[self.HASH]
922
                vtype = prop[self.TYPE]
923
                size = prop[self.SIZE]
924
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
925
                    delimiter) else dest_name
926
                vdest_name = path.replace(prefix, dest_prefix, 1)
927
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
928
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
929
                    self._delete_object(user, src_account, src_container, path)
930
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
931

    
932
    @backend_method
933
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
934
        """Copy an object's data and metadata."""
935

    
936
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
937
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
938
        return dest_version_id
939

    
940
    @backend_method
941
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
942
        """Move an object's data and metadata."""
943

    
944
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
945
        if user != src_account:
946
            raise NotAllowedError
947
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
948
        return dest_version_id
949

    
950
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
951
        if user != account:
952
            raise NotAllowedError
953

    
954
        if until is not None:
955
            path = '/'.join((account, container, name))
956
            node = self.node.node_lookup(path)
957
            if node is None:
958
                return
959
            hashes = []
960
            size = 0
961
            serials = []
962
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
963
            hashes += h
964
            size += s
965
            serials += v
966
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
967
            hashes += h
968
            size += s
969
            serials += v
970
            for h in hashes:
971
                self.store.map_delete(h)
972
            self.node.node_purge(node, until, CLUSTER_DELETED)
973
            try:
974
                props = self._get_version(node)
975
            except NameError:
976
                self.permissions.access_clear(path)
977
            self._report_size_change(user, account, -size,
978
                                    {'action': 'object purge', 'path': path,
979
                                     'versions': ','.join(str(i) for i in serials)})
980
            return
981

    
982
        path, node = self._lookup_object(account, container, name)
983
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
984
        del_size = self._apply_versioning(account, container, src_version_id)
985
        self._report_size_change(user, account, -del_size,
986
                                 {'action': 'object delete', 'path': path,
987
                                  'versions': ','.join([str(dest_version_id)])})
988
        self._report_object_change(
989
            user, account, path, details={'action': 'object delete'})
990
        self.permissions.access_clear(path)
991

    
992
        if delimiter:
993
            prefix = name + delimiter if not name.endswith(delimiter) else name
994
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
995
            paths = []
996
            for t in src_names:
997
                path = '/'.join((account, container, t[0]))
998
                node = t[2]
999
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1000
                del_size = self._apply_versioning(
1001
                    account, container, src_version_id)
1002
                self._report_size_change(user, account, -del_size,
1003
                                         {'action': 'object delete',
1004
                                          'path': path,
1005
                                          'versions': ','.join([str(dest_version_id)])})
1006
                self._report_object_change(
1007
                    user, account, path, details={'action': 'object delete'})
1008
                paths.append(path)
1009
            self.permissions.access_clear_bulk(paths)
1010

    
1011
    @backend_method
1012
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1013
        """Delete/purge an object."""
1014

    
1015
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1016
                     account, container, name, until, prefix, delimiter)
1017
        self._delete_object(user, account, container, name, until, delimiter)
1018

    
1019
    @backend_method
1020
    def list_versions(self, user, account, container, name):
1021
        """Return a list of all (version, version_timestamp) tuples for an object."""
1022

    
1023
        logger.debug(
1024
            "list_versions: %s %s %s %s", user, account, container, name)
1025
        self._can_read(user, account, container, name)
1026
        path, node = self._lookup_object(account, container, name)
1027
        versions = self.node.node_get_versions(node)
1028
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1029

    
1030
    @backend_method
1031
    def get_uuid(self, user, uuid):
1032
        """Return the (account, container, name) for the UUID given."""
1033

    
1034
        logger.debug("get_uuid: %s %s", user, uuid)
1035
        info = self.node.latest_uuid(uuid)
1036
        if info is None:
1037
            raise NameError
1038
        path, serial = info
1039
        account, container, name = path.split('/', 2)
1040
        self._can_read(user, account, container, name)
1041
        return (account, container, name)
1042

    
1043
    @backend_method
1044
    def get_public(self, user, public):
1045
        """Return the (account, container, name) for the public id given."""
1046

    
1047
        logger.debug("get_public: %s %s", user, public)
1048
        if public is None or public < ULTIMATE_ANSWER:
1049
            raise NameError
1050
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1051
        if path is None:
1052
            raise NameError
1053
        account, container, name = path.split('/', 2)
1054
        self._can_read(user, account, container, name)
1055
        return (account, container, name)
1056

    
1057
    @backend_method(autocommit=0)
1058
    def get_block(self, hash):
1059
        """Return a block's data."""
1060

    
1061
        logger.debug("get_block: %s", hash)
1062
        block = self.store.block_get(binascii.unhexlify(hash))
1063
        if not block:
1064
            raise ItemNotExists('Block does not exist')
1065
        return block
1066

    
1067
    @backend_method(autocommit=0)
1068
    def put_block(self, data):
1069
        """Store a block and return the hash."""
1070

    
1071
        logger.debug("put_block: %s", len(data))
1072
        return binascii.hexlify(self.store.block_put(data))
1073

    
1074
    @backend_method(autocommit=0)
1075
    def update_block(self, hash, data, offset=0):
1076
        """Update a known block and return the hash."""
1077

    
1078
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1079
        if offset == 0 and len(data) == self.block_size:
1080
            return self.put_block(data)
1081
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1082
        return binascii.hexlify(h)
1083

    
1084
    # Path functions.
1085

    
1086
    def _generate_uuid(self):
1087
        return str(uuidlib.uuid4())
1088

    
1089
    def _put_object_node(self, path, parent, name):
1090
        path = '/'.join((path, name))
1091
        node = self.node.node_lookup(path)
1092
        if node is None:
1093
            node = self.node.node_create(parent, path)
1094
        return path, node
1095

    
1096
    def _put_path(self, user, parent, path):
1097
        node = self.node.node_create(parent, path)
1098
        self.node.version_create(node, None, 0, '', None, user,
1099
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1100
        return node
1101

    
1102
    def _lookup_account(self, account, create=True):
1103
        node = self.node.node_lookup(account)
1104
        if node is None and create:
1105
            node = self._put_path(
1106
                account, self.ROOTNODE, account)  # User is account.
1107
        return account, node
1108

    
1109
    def _lookup_container(self, account, container):
1110
        path = '/'.join((account, container))
1111
        node = self.node.node_lookup(path)
1112
        if node is None:
1113
            raise ItemNotExists('Container does not exist')
1114
        return path, node
1115

    
1116
    def _lookup_object(self, account, container, name):
1117
        path = '/'.join((account, container, name))
1118
        node = self.node.node_lookup(path)
1119
        if node is None:
1120
            raise ItemNotExists('Object does not exist')
1121
        return path, node
1122

    
1123
    def _lookup_objects(self, paths):
1124
        nodes = self.node.node_lookup_bulk(paths)
1125
        return paths, nodes
1126

    
1127
    def _get_properties(self, node, until=None):
1128
        """Return properties until the timestamp given."""
1129

    
1130
        before = until if until is not None else inf
1131
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1132
        if props is None and until is not None:
1133
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1134
        if props is None:
1135
            raise ItemNotExists('Path does not exist')
1136
        return props
1137

    
1138
    def _get_statistics(self, node, until=None):
1139
        """Return count, sum of size and latest timestamp of everything under node."""
1140

    
1141
        if until is None:
1142
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1143
        else:
1144
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1145
        if stats is None:
1146
            stats = (0, 0, 0)
1147
        return stats
1148

    
1149
    def _get_version(self, node, version=None):
1150
        if version is None:
1151
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1152
            if props is None:
1153
                raise ItemNotExists('Object does not exist')
1154
        else:
1155
            try:
1156
                version = int(version)
1157
            except ValueError:
1158
                raise VersionNotExists('Version does not exist')
1159
            props = self.node.version_get_properties(version)
1160
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1161
                raise VersionNotExists('Version does not exist')
1162
        return props
1163

    
1164
    def _get_versions(self, nodes):
1165
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1166

    
1167
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1168
        """Create a new version of the node."""
1169

    
1170
        props = self.node.version_lookup(
1171
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1172
        if props is not None:
1173
            src_version_id = props[self.SERIAL]
1174
            src_hash = props[self.HASH]
1175
            src_size = props[self.SIZE]
1176
            src_type = props[self.TYPE]
1177
            src_checksum = props[self.CHECKSUM]
1178
        else:
1179
            src_version_id = None
1180
            src_hash = None
1181
            src_size = 0
1182
            src_type = ''
1183
            src_checksum = ''
1184
        if size is None:  # Set metadata.
1185
            hash = src_hash  # This way hash can be set to None (account or container).
1186
            size = src_size
1187
        if type is None:
1188
            type = src_type
1189
        if checksum is None:
1190
            checksum = src_checksum
1191
        uuid = self._generate_uuid(
1192
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1193

    
1194
        if src_node is None:
1195
            pre_version_id = src_version_id
1196
        else:
1197
            pre_version_id = None
1198
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1199
            if props is not None:
1200
                pre_version_id = props[self.SERIAL]
1201
        if pre_version_id is not None:
1202
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1203

    
1204
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1205
        return pre_version_id, dest_version_id
1206

    
1207
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1208
        if src_version_id is not None:
1209
            self.node.attribute_copy(src_version_id, dest_version_id)
1210
        if not replace:
1211
            self.node.attribute_del(dest_version_id, domain, (
1212
                k for k, v in meta.iteritems() if v == ''))
1213
            self.node.attribute_set(dest_version_id, domain, (
1214
                (k, v) for k, v in meta.iteritems() if v != ''))
1215
        else:
1216
            self.node.attribute_del(dest_version_id, domain)
1217
            self.node.attribute_set(dest_version_id, domain, ((
1218
                k, v) for k, v in meta.iteritems()))
1219

    
1220
    def _put_metadata(self, user, node, domain, meta, replace=False):
1221
        """Create a new version and store metadata."""
1222

    
1223
        src_version_id, dest_version_id = self._put_version_duplicate(
1224
            user, node)
1225
        self._put_metadata_duplicate(
1226
            src_version_id, dest_version_id, domain, meta, replace)
1227
        return src_version_id, dest_version_id
1228

    
1229
    def _list_limits(self, listing, marker, limit):
1230
        start = 0
1231
        if marker:
1232
            try:
1233
                start = listing.index(marker) + 1
1234
            except ValueError:
1235
                pass
1236
        if not limit or limit > 10000:
1237
            limit = 10000
1238
        return start, limit
1239

    
1240
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1241
        cont_prefix = path + '/'
1242
        prefix = cont_prefix + prefix
1243
        start = cont_prefix + marker if marker else None
1244
        before = until if until is not None else inf
1245
        filterq = keys if domain else []
1246
        sizeq = size_range
1247

    
1248
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1249
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1250
        objects.sort(key=lambda x: x[0])
1251
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1252
        return objects
1253

    
1254
    # Reporting functions.
1255

    
1256
    def _report_size_change(self, user, account, size, details={}):
1257
        account_node = self._lookup_account(account, True)[1]
1258
        total = self._get_statistics(account_node)[1]
1259
        details.update({'user': user, 'total': total})
1260
        logger.debug(
1261
            "_report_size_change: %s %s %s %s", user, account, size, details)
1262
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1263
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1264
                              float(size), details))
1265

    
1266
        serial = self.quotaholder.issue_commission(
1267
                context     =   {},
1268
                target      =   account,
1269
                key         =   '1',
1270
                clientkey   =   'pithos',
1271
                ownerkey    =   '',
1272
                name        =   details['path'] if 'path' in details else '',
1273
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1274
        )
1275
        self.serials.append(serial)
1276

    
1277
    def _report_object_change(self, user, account, path, details={}):
1278
        details.update({'user': user})
1279
        logger.debug("_report_object_change: %s %s %s %s", user,
1280
                     account, path, details)
1281
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1282
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1283

    
1284
    def _report_sharing_change(self, user, account, path, details={}):
1285
        logger.debug("_report_permissions_change: %s %s %s %s",
1286
                     user, account, path, details)
1287
        details.update({'user': user})
1288
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1289
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1290

    
1291
    # Policy functions.
1292

    
1293
    def _check_policy(self, policy):
1294
        for k in policy.keys():
1295
            if policy[k] == '':
1296
                policy[k] = self.default_policy.get(k)
1297
        for k, v in policy.iteritems():
1298
            if k == 'quota':
1299
                q = int(v)  # May raise ValueError.
1300
                if q < 0:
1301
                    raise ValueError
1302
            elif k == 'versioning':
1303
                if v not in ['auto', 'none']:
1304
                    raise ValueError
1305
            else:
1306
                raise ValueError
1307

    
1308
    def _put_policy(self, node, policy, replace):
1309
        if replace:
1310
            for k, v in self.default_policy.iteritems():
1311
                if k not in policy:
1312
                    policy[k] = v
1313
        self.node.policy_set(node, policy)
1314

    
1315
    def _get_policy(self, node):
1316
        policy = self.default_policy.copy()
1317
        policy.update(self.node.policy_get(node))
1318
        return policy
1319

    
1320
    def _apply_versioning(self, account, container, version_id):
1321
        """Delete the provided version if such is the policy.
1322
           Return size of object removed.
1323
        """
1324

    
1325
        if version_id is None:
1326
            return 0
1327
        path, node = self._lookup_container(account, container)
1328
        versioning = self._get_policy(node)['versioning']
1329
        if versioning != 'auto' or self.free_versioning:
1330
            hash, size = self.node.version_remove(version_id)
1331
            self.store.map_delete(hash)
1332
            return size
1333
        return 0
1334

    
1335
    # Access control functions.
1336

    
1337
    def _check_groups(self, groups):
1338
        # raise ValueError('Bad characters in groups')
1339
        pass
1340

    
1341
    def _check_permissions(self, path, permissions):
1342
        # raise ValueError('Bad characters in permissions')
1343
        pass
1344

    
1345
    def _get_formatted_paths(self, paths):
1346
        formatted = []
1347
        for p in paths:
1348
            node = self.node.node_lookup(p)
1349
            props = None
1350
            if node is not None:
1351
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1352
            if props is not None:
1353
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1354
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1355
                formatted.append((p, self.MATCH_EXACT))
1356
        return formatted
1357

    
1358
    def _get_permissions_path(self, account, container, name):
1359
        path = '/'.join((account, container, name))
1360
        permission_paths = self.permissions.access_inherit(path)
1361
        permission_paths.sort()
1362
        permission_paths.reverse()
1363
        for p in permission_paths:
1364
            if p == path:
1365
                return p
1366
            else:
1367
                if p.count('/') < 2:
1368
                    continue
1369
                node = self.node.node_lookup(p)
1370
                if node is not None:
1371
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1372
                if props is not None:
1373
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1374
                        return p
1375
        return None
1376

    
1377
    def _can_read(self, user, account, container, name):
1378
        if user == account:
1379
            return True
1380
        path = '/'.join((account, container, name))
1381
        if self.permissions.public_get(path) is not None:
1382
            return True
1383
        path = self._get_permissions_path(account, container, name)
1384
        if not path:
1385
            raise NotAllowedError
1386
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1387
            raise NotAllowedError
1388

    
1389
    def _can_write(self, user, account, container, name):
1390
        if user == account:
1391
            return True
1392
        path = '/'.join((account, container, name))
1393
        path = self._get_permissions_path(account, container, name)
1394
        if not path:
1395
            raise NotAllowedError
1396
        if not self.permissions.access_check(path, self.WRITE, user):
1397
            raise NotAllowedError
1398

    
1399
    def _allowed_accounts(self, user):
1400
        allow = set()
1401
        for path in self.permissions.access_list_paths(user):
1402
            allow.add(path.split('/', 1)[0])
1403
        return sorted(allow)
1404

    
1405
    def _allowed_containers(self, user, account):
1406
        allow = set()
1407
        for path in self.permissions.access_list_paths(user, account):
1408
            allow.add(path.split('/', 2)[1])
1409
        return sorted(allow)