Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 0ed09c7c

History | View | Annotate | Download (62.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            if serials:
129
                self.quotaholder.reject_commission(
130
                            context     =   {},
131
                            clientkey   =   'pithos',
132
                            serials     =   serials)
133
            self.wrapper.rollback()
134
            raise
135
    return fn
136

    
137

    
138
class ModularBackend(BaseBackend):
139
    """A modular backend.
140

141
    Uses modules for SQL functions and storage.
142
    """
143

    
144
    def __init__(self, db_module=None, db_connection=None,
145
                 block_module=None, block_path=None, block_umask=None,
146
                 queue_module=None, queue_hosts=None, queue_exchange=None,
147
                 quotaholder_url=None, quotaholder_token=None,
148
                 free_versioning=True):
149
        db_module = db_module or DEFAULT_DB_MODULE
150
        db_connection = db_connection or DEFAULT_DB_CONNECTION
151
        block_module = block_module or DEFAULT_BLOCK_MODULE
152
        block_path = block_path or DEFAULT_BLOCK_PATH
153
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
154
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
155
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
156
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
157
        
158
        self.hash_algorithm = 'sha256'
159
        self.block_size = 4 * 1024 * 1024  # 4MB
160
        self.free_versioning = free_versioning
161

    
162
        self.default_policy = {'quota': DEFAULT_QUOTA,
163
                               'versioning': DEFAULT_VERSIONING}
164

    
165
        def load_module(m):
166
            __import__(m)
167
            return sys.modules[m]
168

    
169
        self.db_module = load_module(db_module)
170
        self.wrapper = self.db_module.DBWrapper(db_connection)
171
        params = {'wrapper': self.wrapper}
172
        self.permissions = self.db_module.Permissions(**params)
173
        self.config = self.db_module.Config(**params)
174
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
175
        for x in ['READ', 'WRITE']:
176
            setattr(self, x, getattr(self.db_module, x))
177
        self.node = self.db_module.Node(**params)
178
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
179
            setattr(self, x, getattr(self.db_module, x))
180

    
181
        self.block_module = load_module(block_module)
182
        params = {'path': block_path,
183
                  'block_size': self.block_size,
184
                  'hash_algorithm': self.hash_algorithm,
185
                  'umask': block_umask}
186
        self.store = self.block_module.Store(**params)
187

    
188
        if queue_module and queue_hosts:
189
            self.queue_module = load_module(queue_module)
190
            params = {'hosts': queue_hosts,
191
                              'exchange': queue_exchange,
192
                      'client_id': QUEUE_CLIENT_ID}
193
            self.queue = self.queue_module.Queue(**params)
194
        else:
195
            class NoQueue:
196
                def send(self, *args):
197
                    pass
198

    
199
                def close(self):
200
                    pass
201

    
202
            self.queue = NoQueue()
203

    
204
        self.quotaholder_url = quotaholder_url
205
        self.quotaholder_token = quotaholder_token
206
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
207
        self.serials = []
208
        self.messages = []
209

    
210
    def close(self):
211
        self.wrapper.close()
212
        self.queue.close()
213

    
214
    @property
215
    def using_external_quotaholder(self):
216
        if self.quotaholder_url:
217
            return True
218
        return False
219

    
220
    @backend_method
221
    def list_accounts(self, user, marker=None, limit=10000):
222
        """Return a list of accounts the user can access."""
223

    
224
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
225
        allowed = self._allowed_accounts(user)
226
        start, limit = self._list_limits(allowed, marker, limit)
227
        return allowed[start:start + limit]
228

    
229
    @backend_method
230
    def get_account_meta(
231
            self, user, account, domain, until=None, include_user_defined=True,
232
            external_quota=None):
233
        """Return a dictionary with the account metadata for the domain."""
234

    
235
        external_quota = external_quota or {}
236
        logger.debug(
237
            "get_account_meta: %s %s %s %s", user, account, domain, until)
238
        path, node = self._lookup_account(account, user == account)
239
        if user != account:
240
            if until or node is None or account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
        try:
243
            props = self._get_properties(node, until)
244
            mtime = props[self.MTIME]
245
        except NameError:
246
            props = None
247
            mtime = until
248
        count, bytes, tstamp = self._get_statistics(node, until)
249
        tstamp = max(tstamp, mtime)
250
        if until is None:
251
            modified = tstamp
252
        else:
253
            modified = self._get_statistics(
254
                node)[2]  # Overall last modification.
255
            modified = max(modified, mtime)
256

    
257
        if user != account:
258
            meta = {'name': account}
259
        else:
260
            meta = {}
261
            if props is not None and include_user_defined:
262
                meta.update(
263
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
264
            if until is not None:
265
                meta.update({'until_timestamp': tstamp})
266
            meta.update({'name': account, 'count': count, 'bytes': bytes})
267
            if self.using_external_quotaholder:
268
                meta['bytes'] = external_quota.get('currValue', 0)
269
        meta.update({'modified': modified})
270
        return meta
271

    
272
    @backend_method
273
    def update_account_meta(self, user, account, domain, meta, replace=False):
274
        """Update the metadata associated with the account for the domain."""
275

    
276
        logger.debug("update_account_meta: %s %s %s %s %s", user,
277
                     account, domain, meta, replace)
278
        if user != account:
279
            raise NotAllowedError
280
        path, node = self._lookup_account(account, True)
281
        self._put_metadata(user, node, domain, meta, replace)
282

    
283
    @backend_method
284
    def get_account_groups(self, user, account):
285
        """Return a dictionary with the user groups defined for this account."""
286

    
287
        logger.debug("get_account_groups: %s %s", user, account)
288
        if user != account:
289
            if account not in self._allowed_accounts(user):
290
                raise NotAllowedError
291
            return {}
292
        self._lookup_account(account, True)
293
        return self.permissions.group_dict(account)
294

    
295
    @backend_method
296
    def update_account_groups(self, user, account, groups, replace=False):
297
        """Update the groups associated with the account."""
298

    
299
        logger.debug("update_account_groups: %s %s %s %s", user,
300
                     account, groups, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        self._lookup_account(account, True)
304
        self._check_groups(groups)
305
        if replace:
306
            self.permissions.group_destroy(account)
307
        for k, v in groups.iteritems():
308
            if not replace:  # If not already deleted.
309
                self.permissions.group_delete(account, k)
310
            if v:
311
                self.permissions.group_addmany(account, k, v)
312

    
313
    @backend_method
314
    def get_account_policy(self, user, account, external_quota=None):
315
        """Return a dictionary with the account policy."""
316

    
317
        logger.debug("get_account_policy: %s %s", user, account)
318
        if user != account:
319
            if account not in self._allowed_accounts(user):
320
                raise NotAllowedError
321
            return {}
322
        path, node = self._lookup_account(account, True)
323
        policy = self._get_policy(node)
324
        return policy
325

    
326
    @backend_method
327
    def update_account_policy(self, user, account, policy, replace=False):
328
        """Update the policy associated with the account."""
329

    
330
        logger.debug("update_account_policy: %s %s %s %s", user,
331
                     account, policy, replace)
332
        if user != account:
333
            raise NotAllowedError
334
        path, node = self._lookup_account(account, True)
335
        self._check_policy(policy)
336
        self._put_policy(node, policy, replace)
337

    
338
    @backend_method
339
    def put_account(self, user, account, policy={}):
340
        """Create a new account with the given name."""
341

    
342
        logger.debug("put_account: %s %s %s", user, account, policy)
343
        if user != account:
344
            raise NotAllowedError
345
        node = self.node.node_lookup(account)
346
        if node is not None:
347
            raise AccountExists('Account already exists')
348
        if policy:
349
            self._check_policy(policy)
350
        node = self._put_path(user, self.ROOTNODE, account)
351
        self._put_policy(node, policy, True)
352

    
353
    @backend_method
354
    def delete_account(self, user, account):
355
        """Delete the account with the given name."""
356

    
357
        logger.debug("delete_account: %s %s", user, account)
358
        if user != account:
359
            raise NotAllowedError
360
        node = self.node.node_lookup(account)
361
        if node is None:
362
            return
363
        if not self.node.node_remove(node):
364
            raise AccountNotEmpty('Account is not empty')
365
        self.permissions.group_destroy(account)
366

    
367
    @backend_method
368
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
369
        """Return a list of containers existing under an account."""
370

    
371
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
372
                     account, marker, limit, shared, until, public)
373
        if user != account:
374
            if until or account not in self._allowed_accounts(user):
375
                raise NotAllowedError
376
            allowed = self._allowed_containers(user, account)
377
            start, limit = self._list_limits(allowed, marker, limit)
378
            return allowed[start:start + limit]
379
        if shared or public:
380
            allowed = set()
381
            if shared:
382
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
383
            if public:
384
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
385
            allowed = sorted(allowed)
386
            start, limit = self._list_limits(allowed, marker, limit)
387
            return allowed[start:start + limit]
388
        node = self.node.node_lookup(account)
389
        containers = [x[0] for x in self._list_object_properties(
390
            node, account, '', '/', marker, limit, False, None, [], until)]
391
        start, limit = self._list_limits(
392
            [x[0] for x in containers], marker, limit)
393
        return containers[start:start + limit]
394

    
395
    @backend_method
396
    def list_container_meta(self, user, account, container, domain, until=None):
397
        """Return a list with all the container's object meta keys for the domain."""
398

    
399
        logger.debug("list_container_meta: %s %s %s %s %s", user,
400
                     account, container, domain, until)
401
        allowed = []
402
        if user != account:
403
            if until:
404
                raise NotAllowedError
405
            allowed = self.permissions.access_list_paths(
406
                user, '/'.join((account, container)))
407
            if not allowed:
408
                raise NotAllowedError
409
        path, node = self._lookup_container(account, container)
410
        before = until if until is not None else inf
411
        allowed = self._get_formatted_paths(allowed)
412
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
413

    
414
    @backend_method
415
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
416
        """Return a dictionary with the container metadata for the domain."""
417

    
418
        logger.debug("get_container_meta: %s %s %s %s %s", user,
419
                     account, container, domain, until)
420
        if user != account:
421
            if until or container not in self._allowed_containers(user, account):
422
                raise NotAllowedError
423
        path, node = self._lookup_container(account, container)
424
        props = self._get_properties(node, until)
425
        mtime = props[self.MTIME]
426
        count, bytes, tstamp = self._get_statistics(node, until)
427
        tstamp = max(tstamp, mtime)
428
        if until is None:
429
            modified = tstamp
430
        else:
431
            modified = self._get_statistics(
432
                node)[2]  # Overall last modification.
433
            modified = max(modified, mtime)
434

    
435
        if user != account:
436
            meta = {'name': container}
437
        else:
438
            meta = {}
439
            if include_user_defined:
440
                meta.update(
441
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
442
            if until is not None:
443
                meta.update({'until_timestamp': tstamp})
444
            meta.update({'name': container, 'count': count, 'bytes': bytes})
445
        meta.update({'modified': modified})
446
        return meta
447

    
448
    @backend_method
449
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
450
        """Update the metadata associated with the container for the domain."""
451

    
452
        logger.debug("update_container_meta: %s %s %s %s %s %s",
453
                     user, account, container, domain, meta, replace)
454
        if user != account:
455
            raise NotAllowedError
456
        path, node = self._lookup_container(account, container)
457
        src_version_id, dest_version_id = self._put_metadata(
458
            user, node, domain, meta, replace)
459
        if src_version_id is not None:
460
            versioning = self._get_policy(node)['versioning']
461
            if versioning != 'auto':
462
                self.node.version_remove(src_version_id)
463

    
464
    @backend_method
465
    def get_container_policy(self, user, account, container):
466
        """Return a dictionary with the container policy."""
467

    
468
        logger.debug(
469
            "get_container_policy: %s %s %s", user, account, container)
470
        if user != account:
471
            if container not in self._allowed_containers(user, account):
472
                raise NotAllowedError
473
            return {}
474
        path, node = self._lookup_container(account, container)
475
        return self._get_policy(node)
476

    
477
    @backend_method
478
    def update_container_policy(self, user, account, container, policy, replace=False):
479
        """Update the policy associated with the container."""
480

    
481
        logger.debug("update_container_policy: %s %s %s %s %s",
482
                     user, account, container, policy, replace)
483
        if user != account:
484
            raise NotAllowedError
485
        path, node = self._lookup_container(account, container)
486
        self._check_policy(policy)
487
        self._put_policy(node, policy, replace)
488

    
489
    @backend_method
490
    def put_container(self, user, account, container, policy={}):
491
        """Create a new container with the given name."""
492

    
493
        logger.debug(
494
            "put_container: %s %s %s %s", user, account, container, policy)
495
        if user != account:
496
            raise NotAllowedError
497
        try:
498
            path, node = self._lookup_container(account, container)
499
        except NameError:
500
            pass
501
        else:
502
            raise ContainerExists('Container already exists')
503
        if policy:
504
            self._check_policy(policy)
505
        path = '/'.join((account, container))
506
        node = self._put_path(
507
            user, self._lookup_account(account, True)[1], path)
508
        self._put_policy(node, policy, True)
509

    
510
    @backend_method
511
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
512
        """Delete/purge the container with the given name."""
513

    
514
        logger.debug("delete_container: %s %s %s %s %s %s", user,
515
                     account, container, until, prefix, delimiter)
516
        if user != account:
517
            raise NotAllowedError
518
        path, node = self._lookup_container(account, container)
519

    
520
        if until is not None:
521
            hashes, size, serials = self.node.node_purge_children(
522
                node, until, CLUSTER_HISTORY)
523
            for h in hashes:
524
                self.store.map_delete(h)
525
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
526
            self._report_size_change(user, account, -size,
527
                                     {'action':'container purge', 'path': path,
528
                                      'versions': ','.join(str(i) for i in serials)})
529
            return
530

    
531
        if not delimiter:
532
            if self._get_statistics(node)[0] > 0:
533
                raise ContainerNotEmpty('Container is not empty')
534
            hashes, size, serials = self.node.node_purge_children(
535
                node, inf, CLUSTER_HISTORY)
536
            for h in hashes:
537
                self.store.map_delete(h)
538
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
539
            self.node.node_remove(node)
540
            self._report_size_change(user, account, -size,
541
                                     {'action': 'container delete',
542
                                      'path': path,
543
                                      'versions': ','.join(str(i) for i in serials)})
544
        else:
545
            # remove only contents
546
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
547
            paths = []
548
            for t in src_names:
549
                path = '/'.join((account, container, t[0]))
550
                node = t[2]
551
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
552
                del_size = self._apply_versioning(
553
                    account, container, src_version_id)
554
                self._report_size_change(
555
                        user, account, -del_size, {
556
                                'action': 'object delete',
557
                                'path': path,
558
                             'versions': ','.join([str(dest_version_id)])
559
                     }
560
                )
561
                self._report_object_change(
562
                    user, account, path, details={'action': 'object delete'})
563
                paths.append(path)
564
            self.permissions.access_clear_bulk(paths)
565

    
566
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
567
        if user != account and until:
568
            raise NotAllowedError
569
        if shared and public:
570
            # get shared first
571
            shared = self._list_object_permissions(
572
                user, account, container, prefix, shared=True, public=False)
573
            objects = set()
574
            if shared:
575
                path, node = self._lookup_container(account, container)
576
                shared = self._get_formatted_paths(shared)
577
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
578

    
579
            # get public
580
            objects |= set(self._list_public_object_properties(
581
                user, account, container, prefix, all_props))
582
            objects = list(objects)
583

    
584
            objects.sort(key=lambda x: x[0])
585
            start, limit = self._list_limits(
586
                [x[0] for x in objects], marker, limit)
587
            return objects[start:start + limit]
588
        elif public:
589
            objects = self._list_public_object_properties(
590
                user, account, container, prefix, all_props)
591
            start, limit = self._list_limits(
592
                [x[0] for x in objects], marker, limit)
593
            return objects[start:start + limit]
594

    
595
        allowed = self._list_object_permissions(
596
            user, account, container, prefix, shared, public)
597
        if shared and not allowed:
598
            return []
599
        path, node = self._lookup_container(account, container)
600
        allowed = self._get_formatted_paths(allowed)
601
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
602
        start, limit = self._list_limits(
603
            [x[0] for x in objects], marker, limit)
604
        return objects[start:start + limit]
605

    
606
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
607
        public = self._list_object_permissions(
608
            user, account, container, prefix, shared=False, public=True)
609
        paths, nodes = self._lookup_objects(public)
610
        path = '/'.join((account, container))
611
        cont_prefix = path + '/'
612
        paths = [x[len(cont_prefix):] for x in paths]
613
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
614
        objects = [(path,) + props for path, props in zip(paths, props)]
615
        return objects
616

    
617
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
618
        objects = []
619
        while True:
620
            marker = objects[-1] if objects else None
621
            limit = 10000
622
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
623
            objects.extend(l)
624
            if not l or len(l) < limit:
625
                break
626
        return objects
627

    
628
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
629
        allowed = []
630
        path = '/'.join((account, container, prefix)).rstrip('/')
631
        if user != account:
632
            allowed = self.permissions.access_list_paths(user, path)
633
            if not allowed:
634
                raise NotAllowedError
635
        else:
636
            allowed = set()
637
            if shared:
638
                allowed.update(self.permissions.access_list_shared(path))
639
            if public:
640
                allowed.update(
641
                    [x[0] for x in self.permissions.public_list(path)])
642
            allowed = sorted(allowed)
643
            if not allowed:
644
                return []
645
        return allowed
646

    
647
    @backend_method
648
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
649
        """Return a list of object (name, version_id) tuples existing under a container."""
650

    
651
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
652
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
653

    
654
    @backend_method
655
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
656
        """Return a list of object metadata dicts existing under a container."""
657

    
658
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
659
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
660
        objects = []
661
        for p in props:
662
            if len(p) == 2:
663
                objects.append({'subdir': p[0]})
664
            else:
665
                objects.append({'name': p[0],
666
                                'bytes': p[self.SIZE + 1],
667
                                'type': p[self.TYPE + 1],
668
                                'hash': p[self.HASH + 1],
669
                                'version': p[self.SERIAL + 1],
670
                                'version_timestamp': p[self.MTIME + 1],
671
                                'modified': p[self.MTIME + 1] if until is None else None,
672
                                'modified_by': p[self.MUSER + 1],
673
                                'uuid': p[self.UUID + 1],
674
                                'checksum': p[self.CHECKSUM + 1]})
675
        return objects
676

    
677
    @backend_method
678
    def list_object_permissions(self, user, account, container, prefix=''):
679
        """Return a list of paths that enforce permissions under a container."""
680

    
681
        logger.debug("list_object_permissions: %s %s %s %s", user,
682
                     account, container, prefix)
683
        return self._list_object_permissions(user, account, container, prefix, True, False)
684

    
685
    @backend_method
686
    def list_object_public(self, user, account, container, prefix=''):
687
        """Return a dict mapping paths to public ids for objects that are public under a container."""
688

    
689
        logger.debug("list_object_public: %s %s %s %s", user,
690
                     account, container, prefix)
691
        public = {}
692
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
693
            public[path] = p + ULTIMATE_ANSWER
694
        return public
695

    
696
    @backend_method
697
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
698
        """Return a dictionary with the object metadata for the domain."""
699

    
700
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
701
                     account, container, name, domain, version)
702
        self._can_read(user, account, container, name)
703
        path, node = self._lookup_object(account, container, name)
704
        props = self._get_version(node, version)
705
        if version is None:
706
            modified = props[self.MTIME]
707
        else:
708
            try:
709
                modified = self._get_version(
710
                    node)[self.MTIME]  # Overall last modification.
711
            except NameError:  # Object may be deleted.
712
                del_props = self.node.version_lookup(
713
                    node, inf, CLUSTER_DELETED)
714
                if del_props is None:
715
                    raise ItemNotExists('Object does not exist')
716
                modified = del_props[self.MTIME]
717

    
718
        meta = {}
719
        if include_user_defined:
720
            meta.update(
721
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
722
        meta.update({'name': name,
723
                     'bytes': props[self.SIZE],
724
                     'type': props[self.TYPE],
725
                     'hash': props[self.HASH],
726
                     'version': props[self.SERIAL],
727
                     'version_timestamp': props[self.MTIME],
728
                     'modified': modified,
729
                     'modified_by': props[self.MUSER],
730
                     'uuid': props[self.UUID],
731
                     'checksum': props[self.CHECKSUM]})
732
        return meta
733

    
734
    @backend_method
735
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
736
        """Update the metadata associated with the object for the domain and return the new version."""
737

    
738
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
739
                     user, account, container, name, domain, meta, replace)
740
        self._can_write(user, account, container, name)
741
        path, node = self._lookup_object(account, container, name)
742
        src_version_id, dest_version_id = self._put_metadata(
743
            user, node, domain, meta, replace)
744
        self._apply_versioning(account, container, src_version_id)
745
        return dest_version_id
746

    
747
    @backend_method
748
    def get_object_permissions(self, user, account, container, name):
749
        """Return the action allowed on the object, the path
750
        from which the object gets its permissions from,
751
        along with a dictionary containing the permissions."""
752

    
753
        logger.debug("get_object_permissions: %s %s %s %s", user,
754
                     account, container, name)
755
        allowed = 'write'
756
        permissions_path = self._get_permissions_path(account, container, name)
757
        if user != account:
758
            if self.permissions.access_check(permissions_path, self.WRITE, user):
759
                allowed = 'write'
760
            elif self.permissions.access_check(permissions_path, self.READ, user):
761
                allowed = 'read'
762
            else:
763
                raise NotAllowedError
764
        self._lookup_object(account, container, name)
765
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
766

    
767
    @backend_method
768
    def update_object_permissions(self, user, account, container, name, permissions):
769
        """Update the permissions associated with the object."""
770

    
771
        logger.debug("update_object_permissions: %s %s %s %s %s",
772
                     user, account, container, name, permissions)
773
        if user != account:
774
            raise NotAllowedError
775
        path = self._lookup_object(account, container, name)[0]
776
        self._check_permissions(path, permissions)
777
        self.permissions.access_set(path, permissions)
778
        self._report_sharing_change(user, account, path, {'members':
779
                                    self.permissions.access_members(path)})
780

    
781
    @backend_method
782
    def get_object_public(self, user, account, container, name):
783
        """Return the public id of the object if applicable."""
784

    
785
        logger.debug(
786
            "get_object_public: %s %s %s %s", user, account, container, name)
787
        self._can_read(user, account, container, name)
788
        path = self._lookup_object(account, container, name)[0]
789
        p = self.permissions.public_get(path)
790
        if p is not None:
791
            p += ULTIMATE_ANSWER
792
        return p
793

    
794
    @backend_method
795
    def update_object_public(self, user, account, container, name, public):
796
        """Update the public status of the object."""
797

    
798
        logger.debug("update_object_public: %s %s %s %s %s", user,
799
                     account, container, name, public)
800
        self._can_write(user, account, container, name)
801
        path = self._lookup_object(account, container, name)[0]
802
        if not public:
803
            self.permissions.public_unset(path)
804
        else:
805
            self.permissions.public_set(path)
806

    
807
    @backend_method
808
    def get_object_hashmap(self, user, account, container, name, version=None):
809
        """Return the object's size and a list with partial hashes."""
810

    
811
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
812
                     account, container, name, version)
813
        self._can_read(user, account, container, name)
814
        path, node = self._lookup_object(account, container, name)
815
        props = self._get_version(node, version)
816
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
817
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
818

    
819
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
820
        if permissions is not None and user != account:
821
            raise NotAllowedError
822
        self._can_write(user, account, container, name)
823
        if permissions is not None:
824
            path = '/'.join((account, container, name))
825
            self._check_permissions(path, permissions)
826

    
827
        account_path, account_node = self._lookup_account(account, True)
828
        container_path, container_node = self._lookup_container(
829
            account, container)
830
        path, node = self._put_object_node(
831
            container_path, container_node, name)
832
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
833

    
834
        # Handle meta.
835
        if src_version_id is None:
836
            src_version_id = pre_version_id
837
        self._put_metadata_duplicate(
838
            src_version_id, dest_version_id, domain, meta, replace_meta)
839

    
840
        del_size = self._apply_versioning(account, container, pre_version_id)
841
        size_delta = size - del_size
842
        if not self.using_external_quotaholder: # Check quota.
843
            if size_delta > 0:
844
                account_quota = long(self._get_policy(account_node)['quota'])
845
                account_usage = self._get_statistics(account_node)[1] + size_delta
846
                container_quota = long(self._get_policy(container_node)['quota'])
847
                container_usage = self._get_statistics(container_node)[1] + size_delta
848
                if (account_quota > 0 and account_usage > account_quota):
849
                    logger.error('account_quota: %s, account_usage: %s' % (
850
                        account_quota, account_usage
851
                    ))
852
                    raise QuotaError
853
                if (container_quota > 0 and container_usage > container_quota):
854
                    # This must be executed in a transaction, so the version is
855
                    # never created if it fails.
856
                    logger.error('container_quota: %s, container_usage: %s' % (
857
                        container_quota, container_usage
858
                    ))
859
                    raise QuotaError
860
        self._report_size_change(user, account, size_delta,
861
                                 {'action': 'object update', 'path': path,
862
                                  'versions': ','.join([str(dest_version_id)])})
863
        if permissions is not None:
864
            self.permissions.access_set(path, permissions)
865
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
866

    
867
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
868
        return dest_version_id
869

    
870
    @backend_method
871
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
872
        """Create/update an object with the specified size and partial hashes."""
873

    
874
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
875
                     account, container, name, size, type, hashmap, checksum)
876
        if size == 0:  # No such thing as an empty hashmap.
877
            hashmap = [self.put_block('')]
878
        map = HashMap(self.block_size, self.hash_algorithm)
879
        map.extend([binascii.unhexlify(x) for x in hashmap])
880
        missing = self.store.block_search(map)
881
        if missing:
882
            ie = IndexError()
883
            ie.data = [binascii.hexlify(x) for x in missing]
884
            raise ie
885

    
886
        hash = map.hash()
887
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
888
        self.store.map_put(hash, map)
889
        return dest_version_id
890

    
891
    @backend_method
892
    def update_object_checksum(self, user, account, container, name, version, checksum):
893
        """Update an object's checksum."""
894

    
895
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
896
                     user, account, container, name, version, checksum)
897
        # Update objects with greater version and same hashmap and size (fix metadata updates).
898
        self._can_write(user, account, container, name)
899
        path, node = self._lookup_object(account, container, name)
900
        props = self._get_version(node, version)
901
        versions = self.node.node_get_versions(node)
902
        for x in versions:
903
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
904
                self.node.version_put_property(
905
                    x[self.SERIAL], 'checksum', checksum)
906

    
907
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
908
        dest_version_ids = []
909
        self._can_read(user, src_account, src_container, src_name)
910
        path, node = self._lookup_object(src_account, src_container, src_name)
911
        # TODO: Will do another fetch of the properties in duplicate version...
912
        props = self._get_version(
913
            node, src_version)  # Check to see if source exists.
914
        src_version_id = props[self.SERIAL]
915
        hash = props[self.HASH]
916
        size = props[self.SIZE]
917
        is_copy = not is_move and (src_account, src_container, src_name) != (
918
            dest_account, dest_container, dest_name)  # New uuid.
919
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
920
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
921
            self._delete_object(user, src_account, src_container, src_name)
922

    
923
        if delimiter:
924
            prefix = src_name + \
925
                delimiter if not src_name.endswith(delimiter) else src_name
926
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
927
            src_names.sort(key=lambda x: x[2])  # order by nodes
928
            paths = [elem[0] for elem in src_names]
929
            nodes = [elem[2] for elem in src_names]
930
            # TODO: Will do another fetch of the properties in duplicate version...
931
            props = self._get_versions(nodes)  # Check to see if source exists.
932

    
933
            for prop, path, node in zip(props, paths, nodes):
934
                src_version_id = prop[self.SERIAL]
935
                hash = prop[self.HASH]
936
                vtype = prop[self.TYPE]
937
                size = prop[self.SIZE]
938
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
939
                    delimiter) else dest_name
940
                vdest_name = path.replace(prefix, dest_prefix, 1)
941
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
942
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
943
                    self._delete_object(user, src_account, src_container, path)
944
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
945

    
946
    @backend_method
947
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
948
        """Copy an object's data and metadata."""
949

    
950
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
951
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
952
        return dest_version_id
953

    
954
    @backend_method
955
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
956
        """Move an object's data and metadata."""
957

    
958
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
959
        if user != src_account:
960
            raise NotAllowedError
961
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
962
        return dest_version_id
963

    
964
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
965
        if user != account:
966
            raise NotAllowedError
967

    
968
        if until is not None:
969
            path = '/'.join((account, container, name))
970
            node = self.node.node_lookup(path)
971
            if node is None:
972
                return
973
            hashes = []
974
            size = 0
975
            serials = []
976
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
977
            hashes += h
978
            size += s
979
            serials += v
980
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
981
            hashes += h
982
            size += s
983
            serials += v
984
            for h in hashes:
985
                self.store.map_delete(h)
986
            self.node.node_purge(node, until, CLUSTER_DELETED)
987
            try:
988
                props = self._get_version(node)
989
            except NameError:
990
                self.permissions.access_clear(path)
991
            self._report_size_change(user, account, -size,
992
                                    {'action': 'object purge', 'path': path,
993
                                     'versions': ','.join(str(i) for i in serials)})
994
            return
995

    
996
        path, node = self._lookup_object(account, container, name)
997
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
998
        del_size = self._apply_versioning(account, container, src_version_id)
999
        self._report_size_change(user, account, -del_size,
1000
                                 {'action': 'object delete', 'path': path,
1001
                                  'versions': ','.join([str(dest_version_id)])})
1002
        self._report_object_change(
1003
            user, account, path, details={'action': 'object delete'})
1004
        self.permissions.access_clear(path)
1005

    
1006
        if delimiter:
1007
            prefix = name + delimiter if not name.endswith(delimiter) else name
1008
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1009
            paths = []
1010
            for t in src_names:
1011
                path = '/'.join((account, container, t[0]))
1012
                node = t[2]
1013
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1014
                del_size = self._apply_versioning(
1015
                    account, container, src_version_id)
1016
                self._report_size_change(user, account, -del_size,
1017
                                         {'action': 'object delete',
1018
                                          'path': path,
1019
                                          'versions': ','.join([str(dest_version_id)])})
1020
                self._report_object_change(
1021
                    user, account, path, details={'action': 'object delete'})
1022
                paths.append(path)
1023
            self.permissions.access_clear_bulk(paths)
1024

    
1025
    @backend_method
1026
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1027
        """Delete/purge an object."""
1028

    
1029
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1030
                     account, container, name, until, prefix, delimiter)
1031
        self._delete_object(user, account, container, name, until, delimiter)
1032

    
1033
    @backend_method
1034
    def list_versions(self, user, account, container, name):
1035
        """Return a list of all (version, version_timestamp) tuples for an object."""
1036

    
1037
        logger.debug(
1038
            "list_versions: %s %s %s %s", user, account, container, name)
1039
        self._can_read(user, account, container, name)
1040
        path, node = self._lookup_object(account, container, name)
1041
        versions = self.node.node_get_versions(node)
1042
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1043

    
1044
    @backend_method
1045
    def get_uuid(self, user, uuid):
1046
        """Return the (account, container, name) for the UUID given."""
1047

    
1048
        logger.debug("get_uuid: %s %s", user, uuid)
1049
        info = self.node.latest_uuid(uuid)
1050
        if info is None:
1051
            raise NameError
1052
        path, serial = info
1053
        account, container, name = path.split('/', 2)
1054
        self._can_read(user, account, container, name)
1055
        return (account, container, name)
1056

    
1057
    @backend_method
1058
    def get_public(self, user, public):
1059
        """Return the (account, container, name) for the public id given."""
1060

    
1061
        logger.debug("get_public: %s %s", user, public)
1062
        if public is None or public < ULTIMATE_ANSWER:
1063
            raise NameError
1064
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1065
        if path is None:
1066
            raise NameError
1067
        account, container, name = path.split('/', 2)
1068
        self._can_read(user, account, container, name)
1069
        return (account, container, name)
1070

    
1071
    @backend_method(autocommit=0)
1072
    def get_block(self, hash):
1073
        """Return a block's data."""
1074

    
1075
        logger.debug("get_block: %s", hash)
1076
        block = self.store.block_get(binascii.unhexlify(hash))
1077
        if not block:
1078
            raise ItemNotExists('Block does not exist')
1079
        return block
1080

    
1081
    @backend_method(autocommit=0)
1082
    def put_block(self, data):
1083
        """Store a block and return the hash."""
1084

    
1085
        logger.debug("put_block: %s", len(data))
1086
        return binascii.hexlify(self.store.block_put(data))
1087

    
1088
    @backend_method(autocommit=0)
1089
    def update_block(self, hash, data, offset=0):
1090
        """Update a known block and return the hash."""
1091

    
1092
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1093
        if offset == 0 and len(data) == self.block_size:
1094
            return self.put_block(data)
1095
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1096
        return binascii.hexlify(h)
1097

    
1098
    # Path functions.
1099

    
1100
    def _generate_uuid(self):
1101
        return str(uuidlib.uuid4())
1102

    
1103
    def _put_object_node(self, path, parent, name):
1104
        path = '/'.join((path, name))
1105
        node = self.node.node_lookup(path)
1106
        if node is None:
1107
            node = self.node.node_create(parent, path)
1108
        return path, node
1109

    
1110
    def _put_path(self, user, parent, path):
1111
        node = self.node.node_create(parent, path)
1112
        self.node.version_create(node, None, 0, '', None, user,
1113
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1114
        return node
1115

    
1116
    def _lookup_account(self, account, create=True):
1117
        node = self.node.node_lookup(account)
1118
        if node is None and create:
1119
            node = self._put_path(
1120
                account, self.ROOTNODE, account)  # User is account.
1121
        return account, node
1122

    
1123
    def _lookup_container(self, account, container):
1124
        path = '/'.join((account, container))
1125
        node = self.node.node_lookup(path)
1126
        if node is None:
1127
            raise ItemNotExists('Container does not exist')
1128
        return path, node
1129

    
1130
    def _lookup_object(self, account, container, name):
1131
        path = '/'.join((account, container, name))
1132
        node = self.node.node_lookup(path)
1133
        if node is None:
1134
            raise ItemNotExists('Object does not exist')
1135
        return path, node
1136

    
1137
    def _lookup_objects(self, paths):
1138
        nodes = self.node.node_lookup_bulk(paths)
1139
        return paths, nodes
1140

    
1141
    def _get_properties(self, node, until=None):
1142
        """Return properties until the timestamp given."""
1143

    
1144
        before = until if until is not None else inf
1145
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1146
        if props is None and until is not None:
1147
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1148
        if props is None:
1149
            raise ItemNotExists('Path does not exist')
1150
        return props
1151

    
1152
    def _get_statistics(self, node, until=None):
1153
        """Return count, sum of size and latest timestamp of everything under node."""
1154

    
1155
        if until is None:
1156
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1157
        else:
1158
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1159
        if stats is None:
1160
            stats = (0, 0, 0)
1161
        return stats
1162

    
1163
    def _get_version(self, node, version=None):
1164
        if version is None:
1165
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1166
            if props is None:
1167
                raise ItemNotExists('Object does not exist')
1168
        else:
1169
            try:
1170
                version = int(version)
1171
            except ValueError:
1172
                raise VersionNotExists('Version does not exist')
1173
            props = self.node.version_get_properties(version)
1174
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1175
                raise VersionNotExists('Version does not exist')
1176
        return props
1177

    
1178
    def _get_versions(self, nodes):
1179
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1180

    
1181
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1182
        """Create a new version of the node."""
1183

    
1184
        props = self.node.version_lookup(
1185
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1186
        if props is not None:
1187
            src_version_id = props[self.SERIAL]
1188
            src_hash = props[self.HASH]
1189
            src_size = props[self.SIZE]
1190
            src_type = props[self.TYPE]
1191
            src_checksum = props[self.CHECKSUM]
1192
        else:
1193
            src_version_id = None
1194
            src_hash = None
1195
            src_size = 0
1196
            src_type = ''
1197
            src_checksum = ''
1198
        if size is None:  # Set metadata.
1199
            hash = src_hash  # This way hash can be set to None (account or container).
1200
            size = src_size
1201
        if type is None:
1202
            type = src_type
1203
        if checksum is None:
1204
            checksum = src_checksum
1205
        uuid = self._generate_uuid(
1206
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1207

    
1208
        if src_node is None:
1209
            pre_version_id = src_version_id
1210
        else:
1211
            pre_version_id = None
1212
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1213
            if props is not None:
1214
                pre_version_id = props[self.SERIAL]
1215
        if pre_version_id is not None:
1216
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1217

    
1218
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1219
        return pre_version_id, dest_version_id
1220

    
1221
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1222
        if src_version_id is not None:
1223
            self.node.attribute_copy(src_version_id, dest_version_id)
1224
        if not replace:
1225
            self.node.attribute_del(dest_version_id, domain, (
1226
                k for k, v in meta.iteritems() if v == ''))
1227
            self.node.attribute_set(dest_version_id, domain, (
1228
                (k, v) for k, v in meta.iteritems() if v != ''))
1229
        else:
1230
            self.node.attribute_del(dest_version_id, domain)
1231
            self.node.attribute_set(dest_version_id, domain, ((
1232
                k, v) for k, v in meta.iteritems()))
1233

    
1234
    def _put_metadata(self, user, node, domain, meta, replace=False):
1235
        """Create a new version and store metadata."""
1236

    
1237
        src_version_id, dest_version_id = self._put_version_duplicate(
1238
            user, node)
1239
        self._put_metadata_duplicate(
1240
            src_version_id, dest_version_id, domain, meta, replace)
1241
        return src_version_id, dest_version_id
1242

    
1243
    def _list_limits(self, listing, marker, limit):
1244
        start = 0
1245
        if marker:
1246
            try:
1247
                start = listing.index(marker) + 1
1248
            except ValueError:
1249
                pass
1250
        if not limit or limit > 10000:
1251
            limit = 10000
1252
        return start, limit
1253

    
1254
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1255
        cont_prefix = path + '/'
1256
        prefix = cont_prefix + prefix
1257
        start = cont_prefix + marker if marker else None
1258
        before = until if until is not None else inf
1259
        filterq = keys if domain else []
1260
        sizeq = size_range
1261

    
1262
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1263
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1264
        objects.sort(key=lambda x: x[0])
1265
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1266
        return objects
1267

    
1268
    # Reporting functions.
1269

    
1270
    def _report_size_change(self, user, account, size, details={}):
1271
        account_node = self._lookup_account(account, True)[1]
1272
        total = self._get_statistics(account_node)[1]
1273
        details.update({'user': user, 'total': total})
1274
        logger.debug(
1275
            "_report_size_change: %s %s %s %s", user, account, size, details)
1276
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1277
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1278
                              float(size), details))
1279

    
1280
        if not self.using_external_quotaholder:
1281
            return
1282
        
1283
        serial = self.quotaholder.issue_commission(
1284
                context     =   {},
1285
                target      =   user.uuid,
1286
                key         =   '1',
1287
                clientkey   =   'pithos',
1288
                ownerkey    =   '',
1289
                        name        =   details['path'] if 'path' in details else '',
1290
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1291
        )
1292
        self.serials.append(serial)
1293

    
1294
    def _report_object_change(self, user, account, path, details={}):
1295
        details.update({'user': user})
1296
        logger.debug("_report_object_change: %s %s %s %s", user,
1297
                     account, path, details)
1298
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1299
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1300

    
1301
    def _report_sharing_change(self, user, account, path, details={}):
1302
        logger.debug("_report_permissions_change: %s %s %s %s",
1303
                     user, account, path, details)
1304
        details.update({'user': user})
1305
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1306
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1307

    
1308
    # Policy functions.
1309

    
1310
    def _check_policy(self, policy):
1311
        for k in policy.keys():
1312
            if policy[k] == '':
1313
                policy[k] = self.default_policy.get(k)
1314
        for k, v in policy.iteritems():
1315
            if k == 'quota':
1316
                q = int(v)  # May raise ValueError.
1317
                if q < 0:
1318
                    raise ValueError
1319
            elif k == 'versioning':
1320
                if v not in ['auto', 'none']:
1321
                    raise ValueError
1322
            else:
1323
                raise ValueError
1324

    
1325
    def _put_policy(self, node, policy, replace):
1326
        if replace:
1327
            for k, v in self.default_policy.iteritems():
1328
                if k not in policy:
1329
                    policy[k] = v
1330
        self.node.policy_set(node, policy)
1331

    
1332
    def _get_policy(self, node):
1333
        policy = self.default_policy.copy()
1334
        policy.update(self.node.policy_get(node))
1335
        return policy
1336

    
1337
    def _apply_versioning(self, account, container, version_id):
1338
        """Delete the provided version if such is the policy.
1339
           Return size of object removed.
1340
        """
1341

    
1342
        if version_id is None:
1343
            return 0
1344
        path, node = self._lookup_container(account, container)
1345
        versioning = self._get_policy(node)['versioning']
1346
        if versioning != 'auto':
1347
            hash, size = self.node.version_remove(version_id)
1348
            self.store.map_delete(hash)
1349
            return size
1350
        elif self.free_versioning:
1351
            return self.node.version_get_properties(
1352
                version_id, keys=('size',))[0]
1353
        return 0
1354

    
1355
    # Access control functions.
1356

    
1357
    def _check_groups(self, groups):
1358
        # raise ValueError('Bad characters in groups')
1359
        pass
1360

    
1361
    def _check_permissions(self, path, permissions):
1362
        # raise ValueError('Bad characters in permissions')
1363
        pass
1364

    
1365
    def _get_formatted_paths(self, paths):
1366
        formatted = []
1367
        for p in paths:
1368
            node = self.node.node_lookup(p)
1369
            props = None
1370
            if node is not None:
1371
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1372
            if props is not None:
1373
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1374
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1375
                formatted.append((p, self.MATCH_EXACT))
1376
        return formatted
1377

    
1378
    def _get_permissions_path(self, account, container, name):
1379
        path = '/'.join((account, container, name))
1380
        permission_paths = self.permissions.access_inherit(path)
1381
        permission_paths.sort()
1382
        permission_paths.reverse()
1383
        for p in permission_paths:
1384
            if p == path:
1385
                return p
1386
            else:
1387
                if p.count('/') < 2:
1388
                    continue
1389
                node = self.node.node_lookup(p)
1390
                props = None
1391
                if node is not None:
1392
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1393
                if props is not None:
1394
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1395
                        return p
1396
        return None
1397

    
1398
    def _can_read(self, user, account, container, name):
1399
        if user == account:
1400
            return True
1401
        path = '/'.join((account, container, name))
1402
        if self.permissions.public_get(path) is not None:
1403
            return True
1404
        path = self._get_permissions_path(account, container, name)
1405
        if not path:
1406
            raise NotAllowedError
1407
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1408
            raise NotAllowedError
1409

    
1410
    def _can_write(self, user, account, container, name):
1411
        if user == account:
1412
            return True
1413
        path = '/'.join((account, container, name))
1414
        path = self._get_permissions_path(account, container, name)
1415
        if not path:
1416
            raise NotAllowedError
1417
        if not self.permissions.access_check(path, self.WRITE, user):
1418
            raise NotAllowedError
1419

    
1420
    def _allowed_accounts(self, user):
1421
        allow = set()
1422
        for path in self.permissions.access_list_paths(user):
1423
            allow.add(path.split('/', 1)[0])
1424
        return sorted(allow)
1425

    
1426
    def _allowed_containers(self, user, account):
1427
        allow = set()
1428
        for path in self.permissions.access_list_paths(user, account):
1429
            allow.add(path.split('/', 2)[1])
1430
        return sorted(allow)