Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b8098f77

History | View | Annotate | Download (62.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_url=None, quotaholder_token=None,
149
                 free_versioning=True, block_params=None):
150
        db_module = db_module or DEFAULT_DB_MODULE
151
        db_connection = db_connection or DEFAULT_DB_CONNECTION
152
        block_module = block_module or DEFAULT_BLOCK_MODULE
153
        block_path = block_path or DEFAULT_BLOCK_PATH
154
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
155
        block_params = block_params or DEFAULT_BLOCK_PARAMS
156
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
157

    
158
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
159
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
160
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
161

    
162
        self.hash_algorithm = 'sha256'
163
        self.block_size = 4 * 1024 * 1024  # 4MB
164
        self.free_versioning = free_versioning
165

    
166
        self.default_policy = {'quota': DEFAULT_QUOTA,
167
                               'versioning': DEFAULT_VERSIONING}
168

    
169
        def load_module(m):
170
            __import__(m)
171
            return sys.modules[m]
172

    
173
        self.db_module = load_module(db_module)
174
        self.wrapper = self.db_module.DBWrapper(db_connection)
175
        params = {'wrapper': self.wrapper}
176
        self.permissions = self.db_module.Permissions(**params)
177
        self.config = self.db_module.Config(**params)
178
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
179
        for x in ['READ', 'WRITE']:
180
            setattr(self, x, getattr(self.db_module, x))
181
        self.node = self.db_module.Node(**params)
182
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
183
            setattr(self, x, getattr(self.db_module, x))
184

    
185
        self.block_module = load_module(block_module)
186
        self.block_params = block_params
187
        params = {'path': block_path,
188
                  'block_size': self.block_size,
189
                  'hash_algorithm': self.hash_algorithm,
190
                  'umask': block_umask}
191
        params.update(self.block_params)
192
        self.store = self.block_module.Store(**params)
193

    
194
        if queue_module and queue_hosts:
195
            self.queue_module = load_module(queue_module)
196
            params = {'hosts': queue_hosts,
197
                      'exchange': queue_exchange,
198
                      'client_id': QUEUE_CLIENT_ID}
199
            self.queue = self.queue_module.Queue(**params)
200
        else:
201
            class NoQueue:
202
                def send(self, *args):
203
                    pass
204

    
205
                def close(self):
206
                    pass
207

    
208
            self.queue = NoQueue()
209

    
210
        self.quotaholder_url = quotaholder_url
211
        self.quotaholder_token = quotaholder_token
212
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
213
        self.serials = []
214
        self.messages = []
215

    
216
    def close(self):
217
        self.wrapper.close()
218
        self.queue.close()
219

    
220
    @property
221
    def using_external_quotaholder(self):
222
        if self.quotaholder_url:
223
            return True
224
        return False
225

    
226
    @backend_method
227
    def list_accounts(self, user, marker=None, limit=10000):
228
        """Return a list of accounts the user can access."""
229

    
230
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
231
        allowed = self._allowed_accounts(user)
232
        start, limit = self._list_limits(allowed, marker, limit)
233
        return allowed[start:start + limit]
234

    
235
    @backend_method
236
    def get_account_meta(
237
            self, user, account, domain, until=None, include_user_defined=True,
238
            external_quota=None):
239
        """Return a dictionary with the account metadata for the domain."""
240

    
241
        logger.debug(
242
            "get_account_meta: %s %s %s %s", user, account, domain, until)
243
        path, node = self._lookup_account(account, user == account)
244
        if user != account:
245
            if until or node is None or account not in self._allowed_accounts(user):
246
                raise NotAllowedError
247
        try:
248
            props = self._get_properties(node, until)
249
            mtime = props[self.MTIME]
250
        except NameError:
251
            props = None
252
            mtime = until
253
        count, bytes, tstamp = self._get_statistics(node, until)
254
        tstamp = max(tstamp, mtime)
255
        if until is None:
256
            modified = tstamp
257
        else:
258
            modified = self._get_statistics(
259
                node)[2]  # Overall last modification.
260
            modified = max(modified, mtime)
261

    
262
        if user != account:
263
            meta = {'name': account}
264
        else:
265
            meta = {}
266
            if props is not None and include_user_defined:
267
                meta.update(
268
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
269
            if until is not None:
270
                meta.update({'until_timestamp': tstamp})
271
            meta.update({'name': account, 'count': count, 'bytes': bytes})
272
            if self.using_external_quotaholder:
273
                external_quota = external_quota or {}
274
                meta['bytes'] = external_quota.get('currValue', 0)
275
        meta.update({'modified': modified})
276
        return meta
277

    
278
    @backend_method
279
    def update_account_meta(self, user, account, domain, meta, replace=False):
280
        """Update the metadata associated with the account for the domain."""
281

    
282
        logger.debug("update_account_meta: %s %s %s %s %s", user,
283
                     account, domain, meta, replace)
284
        if user != account:
285
            raise NotAllowedError
286
        path, node = self._lookup_account(account, True)
287
        self._put_metadata(user, node, domain, meta, replace)
288

    
289
    @backend_method
290
    def get_account_groups(self, user, account):
291
        """Return a dictionary with the user groups defined for this account."""
292

    
293
        logger.debug("get_account_groups: %s %s", user, account)
294
        if user != account:
295
            if account not in self._allowed_accounts(user):
296
                raise NotAllowedError
297
            return {}
298
        self._lookup_account(account, True)
299
        return self.permissions.group_dict(account)
300

    
301
    @backend_method
302
    def update_account_groups(self, user, account, groups, replace=False):
303
        """Update the groups associated with the account."""
304

    
305
        logger.debug("update_account_groups: %s %s %s %s", user,
306
                     account, groups, replace)
307
        if user != account:
308
            raise NotAllowedError
309
        self._lookup_account(account, True)
310
        self._check_groups(groups)
311
        if replace:
312
            self.permissions.group_destroy(account)
313
        for k, v in groups.iteritems():
314
            if not replace:  # If not already deleted.
315
                self.permissions.group_delete(account, k)
316
            if v:
317
                self.permissions.group_addmany(account, k, v)
318

    
319
    @backend_method
320
    def get_account_policy(self, user, account, external_quota=None):
321
        """Return a dictionary with the account policy."""
322

    
323
        logger.debug("get_account_policy: %s %s", user, account)
324
        if user != account:
325
            if account not in self._allowed_accounts(user):
326
                raise NotAllowedError
327
            return {}
328
        path, node = self._lookup_account(account, True)
329
        policy = self._get_policy(node)
330
        if self.using_external_quotaholder:
331
            external_quota = external_quota or {}
332
            policy['quota'] = external_quota.get('maxValue', 0)
333
        return policy
334

    
335
    @backend_method
336
    def update_account_policy(self, user, account, policy, replace=False):
337
        """Update the policy associated with the account."""
338

    
339
        logger.debug("update_account_policy: %s %s %s %s", user,
340
                     account, policy, replace)
341
        if user != account:
342
            raise NotAllowedError
343
        path, node = self._lookup_account(account, True)
344
        self._check_policy(policy)
345
        self._put_policy(node, policy, replace)
346

    
347
    @backend_method
348
    def put_account(self, user, account, policy={}):
349
        """Create a new account with the given name."""
350

    
351
        logger.debug("put_account: %s %s %s", user, account, policy)
352
        if user != account:
353
            raise NotAllowedError
354
        node = self.node.node_lookup(account)
355
        if node is not None:
356
            raise AccountExists('Account already exists')
357
        if policy:
358
            self._check_policy(policy)
359
        node = self._put_path(user, self.ROOTNODE, account)
360
        self._put_policy(node, policy, True)
361

    
362
    @backend_method
363
    def delete_account(self, user, account):
364
        """Delete the account with the given name."""
365

    
366
        logger.debug("delete_account: %s %s", user, account)
367
        if user != account:
368
            raise NotAllowedError
369
        node = self.node.node_lookup(account)
370
        if node is None:
371
            return
372
        if not self.node.node_remove(node):
373
            raise AccountNotEmpty('Account is not empty')
374
        self.permissions.group_destroy(account)
375

    
376
    @backend_method
377
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
378
        """Return a list of containers existing under an account."""
379

    
380
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
381
                     account, marker, limit, shared, until, public)
382
        if user != account:
383
            if until or account not in self._allowed_accounts(user):
384
                raise NotAllowedError
385
            allowed = self._allowed_containers(user, account)
386
            start, limit = self._list_limits(allowed, marker, limit)
387
            return allowed[start:start + limit]
388
        if shared or public:
389
            allowed = set()
390
            if shared:
391
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
392
            if public:
393
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
394
            allowed = sorted(allowed)
395
            start, limit = self._list_limits(allowed, marker, limit)
396
            return allowed[start:start + limit]
397
        node = self.node.node_lookup(account)
398
        containers = [x[0] for x in self._list_object_properties(
399
            node, account, '', '/', marker, limit, False, None, [], until)]
400
        start, limit = self._list_limits(
401
            [x[0] for x in containers], marker, limit)
402
        return containers[start:start + limit]
403

    
404
    @backend_method
405
    def list_container_meta(self, user, account, container, domain, until=None):
406
        """Return a list with all the container's object meta keys for the domain."""
407

    
408
        logger.debug("list_container_meta: %s %s %s %s %s", user,
409
                     account, container, domain, until)
410
        allowed = []
411
        if user != account:
412
            if until:
413
                raise NotAllowedError
414
            allowed = self.permissions.access_list_paths(
415
                user, '/'.join((account, container)))
416
            if not allowed:
417
                raise NotAllowedError
418
        path, node = self._lookup_container(account, container)
419
        before = until if until is not None else inf
420
        allowed = self._get_formatted_paths(allowed)
421
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
422

    
423
    @backend_method
424
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
425
        """Return a dictionary with the container metadata for the domain."""
426

    
427
        logger.debug("get_container_meta: %s %s %s %s %s", user,
428
                     account, container, domain, until)
429
        if user != account:
430
            if until or container not in self._allowed_containers(user, account):
431
                raise NotAllowedError
432
        path, node = self._lookup_container(account, container)
433
        props = self._get_properties(node, until)
434
        mtime = props[self.MTIME]
435
        count, bytes, tstamp = self._get_statistics(node, until)
436
        tstamp = max(tstamp, mtime)
437
        if until is None:
438
            modified = tstamp
439
        else:
440
            modified = self._get_statistics(
441
                node)[2]  # Overall last modification.
442
            modified = max(modified, mtime)
443

    
444
        if user != account:
445
            meta = {'name': container}
446
        else:
447
            meta = {}
448
            if include_user_defined:
449
                meta.update(
450
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
451
            if until is not None:
452
                meta.update({'until_timestamp': tstamp})
453
            meta.update({'name': container, 'count': count, 'bytes': bytes})
454
        meta.update({'modified': modified})
455
        return meta
456

    
457
    @backend_method
458
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
459
        """Update the metadata associated with the container for the domain."""
460

    
461
        logger.debug("update_container_meta: %s %s %s %s %s %s",
462
                     user, account, container, domain, meta, replace)
463
        if user != account:
464
            raise NotAllowedError
465
        path, node = self._lookup_container(account, container)
466
        src_version_id, dest_version_id = self._put_metadata(
467
            user, node, domain, meta, replace)
468
        if src_version_id is not None:
469
            versioning = self._get_policy(node)['versioning']
470
            if versioning != 'auto':
471
                self.node.version_remove(src_version_id)
472

    
473
    @backend_method
474
    def get_container_policy(self, user, account, container):
475
        """Return a dictionary with the container policy."""
476

    
477
        logger.debug(
478
            "get_container_policy: %s %s %s", user, account, container)
479
        if user != account:
480
            if container not in self._allowed_containers(user, account):
481
                raise NotAllowedError
482
            return {}
483
        path, node = self._lookup_container(account, container)
484
        return self._get_policy(node)
485

    
486
    @backend_method
487
    def update_container_policy(self, user, account, container, policy, replace=False):
488
        """Update the policy associated with the container."""
489

    
490
        logger.debug("update_container_policy: %s %s %s %s %s",
491
                     user, account, container, policy, replace)
492
        if user != account:
493
            raise NotAllowedError
494
        path, node = self._lookup_container(account, container)
495
        self._check_policy(policy)
496
        self._put_policy(node, policy, replace)
497

    
498
    @backend_method
499
    def put_container(self, user, account, container, policy={}):
500
        """Create a new container with the given name."""
501

    
502
        logger.debug(
503
            "put_container: %s %s %s %s", user, account, container, policy)
504
        if user != account:
505
            raise NotAllowedError
506
        try:
507
            path, node = self._lookup_container(account, container)
508
        except NameError:
509
            pass
510
        else:
511
            raise ContainerExists('Container already exists')
512
        if policy:
513
            self._check_policy(policy)
514
        path = '/'.join((account, container))
515
        node = self._put_path(
516
            user, self._lookup_account(account, True)[1], path)
517
        self._put_policy(node, policy, True)
518

    
519
    @backend_method
520
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
521
        """Delete/purge the container with the given name."""
522

    
523
        logger.debug("delete_container: %s %s %s %s %s %s", user,
524
                     account, container, until, prefix, delimiter)
525
        if user != account:
526
            raise NotAllowedError
527
        path, node = self._lookup_container(account, container)
528

    
529
        if until is not None:
530
            hashes, size, serials = self.node.node_purge_children(
531
                node, until, CLUSTER_HISTORY)
532
            for h in hashes:
533
                self.store.map_delete(h)
534
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
535
            self._report_size_change(user, account, -size,
536
                                     {'action':'container purge', 'path': path,
537
                                      'versions': ','.join(str(i) for i in serials)})
538
            return
539

    
540
        if not delimiter:
541
            if self._get_statistics(node)[0] > 0:
542
                raise ContainerNotEmpty('Container is not empty')
543
            hashes, size, serials = self.node.node_purge_children(
544
                node, inf, CLUSTER_HISTORY)
545
            for h in hashes:
546
                self.store.map_delete(h)
547
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
548
            self.node.node_remove(node)
549
            self._report_size_change(user, account, -size,
550
                                     {'action': 'container delete',
551
                                      'path': path,
552
                                      'versions': ','.join(str(i) for i in serials)})
553
        else:
554
            # remove only contents
555
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
556
            paths = []
557
            for t in src_names:
558
                path = '/'.join((account, container, t[0]))
559
                node = t[2]
560
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
561
                del_size = self._apply_versioning(
562
                    account, container, src_version_id)
563
                self._report_size_change(
564
                        user, account, -del_size, {
565
                                'action': 'object delete',
566
                                'path': path,
567
                        'versions': ','.join([str(dest_version_id)])
568
                     }
569
                )
570
                self._report_object_change(
571
                    user, account, path, details={'action': 'object delete'})
572
                paths.append(path)
573
            self.permissions.access_clear_bulk(paths)
574

    
575
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
576
        if user != account and until:
577
            raise NotAllowedError
578
        if shared and public:
579
            # get shared first
580
            shared = self._list_object_permissions(
581
                user, account, container, prefix, shared=True, public=False)
582
            objects = set()
583
            if shared:
584
                path, node = self._lookup_container(account, container)
585
                shared = self._get_formatted_paths(shared)
586
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
587

    
588
            # get public
589
            objects |= set(self._list_public_object_properties(
590
                user, account, container, prefix, all_props))
591
            objects = list(objects)
592

    
593
            objects.sort(key=lambda x: x[0])
594
            start, limit = self._list_limits(
595
                [x[0] for x in objects], marker, limit)
596
            return objects[start:start + limit]
597
        elif public:
598
            objects = self._list_public_object_properties(
599
                user, account, container, prefix, all_props)
600
            start, limit = self._list_limits(
601
                [x[0] for x in objects], marker, limit)
602
            return objects[start:start + limit]
603

    
604
        allowed = self._list_object_permissions(
605
            user, account, container, prefix, shared, public)
606
        if shared and not allowed:
607
            return []
608
        path, node = self._lookup_container(account, container)
609
        allowed = self._get_formatted_paths(allowed)
610
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
611
        start, limit = self._list_limits(
612
            [x[0] for x in objects], marker, limit)
613
        return objects[start:start + limit]
614

    
615
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
616
        public = self._list_object_permissions(
617
            user, account, container, prefix, shared=False, public=True)
618
        paths, nodes = self._lookup_objects(public)
619
        path = '/'.join((account, container))
620
        cont_prefix = path + '/'
621
        paths = [x[len(cont_prefix):] for x in paths]
622
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
623
        objects = [(path,) + props for path, props in zip(paths, props)]
624
        return objects
625

    
626
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
627
        objects = []
628
        while True:
629
            marker = objects[-1] if objects else None
630
            limit = 10000
631
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
632
            objects.extend(l)
633
            if not l or len(l) < limit:
634
                break
635
        return objects
636

    
637
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
638
        allowed = []
639
        path = '/'.join((account, container, prefix)).rstrip('/')
640
        if user != account:
641
            allowed = self.permissions.access_list_paths(user, path)
642
            if not allowed:
643
                raise NotAllowedError
644
        else:
645
            allowed = set()
646
            if shared:
647
                allowed.update(self.permissions.access_list_shared(path))
648
            if public:
649
                allowed.update(
650
                    [x[0] for x in self.permissions.public_list(path)])
651
            allowed = sorted(allowed)
652
            if not allowed:
653
                return []
654
        return allowed
655

    
656
    @backend_method
657
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
658
        """Return a list of object (name, version_id) tuples existing under a container."""
659

    
660
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
661
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
662

    
663
    @backend_method
664
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
665
        """Return a list of object metadata dicts existing under a container."""
666

    
667
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
668
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
669
        objects = []
670
        for p in props:
671
            if len(p) == 2:
672
                objects.append({'subdir': p[0]})
673
            else:
674
                objects.append({'name': p[0],
675
                                'bytes': p[self.SIZE + 1],
676
                                'type': p[self.TYPE + 1],
677
                                'hash': p[self.HASH + 1],
678
                                'version': p[self.SERIAL + 1],
679
                                'version_timestamp': p[self.MTIME + 1],
680
                                'modified': p[self.MTIME + 1] if until is None else None,
681
                                'modified_by': p[self.MUSER + 1],
682
                                'uuid': p[self.UUID + 1],
683
                                'checksum': p[self.CHECKSUM + 1]})
684
        return objects
685

    
686
    @backend_method
687
    def list_object_permissions(self, user, account, container, prefix=''):
688
        """Return a list of paths that enforce permissions under a container."""
689

    
690
        logger.debug("list_object_permissions: %s %s %s %s", user,
691
                     account, container, prefix)
692
        return self._list_object_permissions(user, account, container, prefix, True, False)
693

    
694
    @backend_method
695
    def list_object_public(self, user, account, container, prefix=''):
696
        """Return a dict mapping paths to public ids for objects that are public under a container."""
697

    
698
        logger.debug("list_object_public: %s %s %s %s", user,
699
                     account, container, prefix)
700
        public = {}
701
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
702
            public[path] = p + ULTIMATE_ANSWER
703
        return public
704

    
705
    @backend_method
706
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
707
        """Return a dictionary with the object metadata for the domain."""
708

    
709
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
710
                     account, container, name, domain, version)
711
        self._can_read(user, account, container, name)
712
        path, node = self._lookup_object(account, container, name)
713
        props = self._get_version(node, version)
714
        if version is None:
715
            modified = props[self.MTIME]
716
        else:
717
            try:
718
                modified = self._get_version(
719
                    node)[self.MTIME]  # Overall last modification.
720
            except NameError:  # Object may be deleted.
721
                del_props = self.node.version_lookup(
722
                    node, inf, CLUSTER_DELETED)
723
                if del_props is None:
724
                    raise ItemNotExists('Object does not exist')
725
                modified = del_props[self.MTIME]
726

    
727
        meta = {}
728
        if include_user_defined:
729
            meta.update(
730
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
731
        meta.update({'name': name,
732
                     'bytes': props[self.SIZE],
733
                     'type': props[self.TYPE],
734
                     'hash': props[self.HASH],
735
                     'version': props[self.SERIAL],
736
                     'version_timestamp': props[self.MTIME],
737
                     'modified': modified,
738
                     'modified_by': props[self.MUSER],
739
                     'uuid': props[self.UUID],
740
                     'checksum': props[self.CHECKSUM]})
741
        return meta
742

    
743
    @backend_method
744
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
745
        """Update the metadata associated with the object for the domain and return the new version."""
746

    
747
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
748
                     user, account, container, name, domain, meta, replace)
749
        self._can_write(user, account, container, name)
750
        path, node = self._lookup_object(account, container, name)
751
        src_version_id, dest_version_id = self._put_metadata(
752
            user, node, domain, meta, replace)
753
        self._apply_versioning(account, container, src_version_id)
754
        return dest_version_id
755

    
756
    @backend_method
757
    def get_object_permissions(self, user, account, container, name):
758
        """Return the action allowed on the object, the path
759
        from which the object gets its permissions from,
760
        along with a dictionary containing the permissions."""
761

    
762
        logger.debug("get_object_permissions: %s %s %s %s", user,
763
                     account, container, name)
764
        allowed = 'write'
765
        permissions_path = self._get_permissions_path(account, container, name)
766
        if user != account:
767
            if self.permissions.access_check(permissions_path, self.WRITE, user):
768
                allowed = 'write'
769
            elif self.permissions.access_check(permissions_path, self.READ, user):
770
                allowed = 'read'
771
            else:
772
                raise NotAllowedError
773
        self._lookup_object(account, container, name)
774
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
775

    
776
    @backend_method
777
    def update_object_permissions(self, user, account, container, name, permissions):
778
        """Update the permissions associated with the object."""
779

    
780
        logger.debug("update_object_permissions: %s %s %s %s %s",
781
                     user, account, container, name, permissions)
782
        if user != account:
783
            raise NotAllowedError
784
        path = self._lookup_object(account, container, name)[0]
785
        self._check_permissions(path, permissions)
786
        self.permissions.access_set(path, permissions)
787
        self._report_sharing_change(user, account, path, {'members':
788
                                    self.permissions.access_members(path)})
789

    
790
    @backend_method
791
    def get_object_public(self, user, account, container, name):
792
        """Return the public id of the object if applicable."""
793

    
794
        logger.debug(
795
            "get_object_public: %s %s %s %s", user, account, container, name)
796
        self._can_read(user, account, container, name)
797
        path = self._lookup_object(account, container, name)[0]
798
        p = self.permissions.public_get(path)
799
        if p is not None:
800
            p += ULTIMATE_ANSWER
801
        return p
802

    
803
    @backend_method
804
    def update_object_public(self, user, account, container, name, public):
805
        """Update the public status of the object."""
806

    
807
        logger.debug("update_object_public: %s %s %s %s %s", user,
808
                     account, container, name, public)
809
        self._can_write(user, account, container, name)
810
        path = self._lookup_object(account, container, name)[0]
811
        if not public:
812
            self.permissions.public_unset(path)
813
        else:
814
            self.permissions.public_set(path)
815

    
816
    @backend_method
817
    def get_object_hashmap(self, user, account, container, name, version=None):
818
        """Return the object's size and a list with partial hashes."""
819

    
820
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
821
                     account, container, name, version)
822
        self._can_read(user, account, container, name)
823
        path, node = self._lookup_object(account, container, name)
824
        props = self._get_version(node, version)
825
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
826
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
827

    
828
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
829
        if permissions is not None and user != account:
830
            raise NotAllowedError
831
        self._can_write(user, account, container, name)
832
        if permissions is not None:
833
            path = '/'.join((account, container, name))
834
            self._check_permissions(path, permissions)
835

    
836
        account_path, account_node = self._lookup_account(account, True)
837
        container_path, container_node = self._lookup_container(
838
            account, container)
839
        path, node = self._put_object_node(
840
            container_path, container_node, name)
841
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
842

    
843
        # Handle meta.
844
        if src_version_id is None:
845
            src_version_id = pre_version_id
846
        self._put_metadata_duplicate(
847
            src_version_id, dest_version_id, domain, meta, replace_meta)
848

    
849
        del_size = self._apply_versioning(account, container, pre_version_id)
850
        size_delta = size - del_size
851
        if not self.using_external_quotaholder: # Check quota.
852
            if size_delta > 0:
853
                account_quota = long(self._get_policy(account_node)['quota'])
854
                account_usage = self._get_statistics(account_node)[1] + size_delta
855
                container_quota = long(self._get_policy(container_node)['quota'])
856
                container_usage = self._get_statistics(container_node)[1] + size_delta
857
                if (account_quota > 0 and account_usage > account_quota):
858
                    logger.error('account_quota: %s, account_usage: %s' % (
859
                        account_quota, account_usage
860
                    ))
861
                    raise QuotaError
862
                if (container_quota > 0 and container_usage > container_quota):
863
                    # This must be executed in a transaction, so the version is
864
                    # never created if it fails.
865
                    logger.error('container_quota: %s, container_usage: %s' % (
866
                        container_quota, container_usage
867
                    ))
868
                    raise QuotaError
869
        self._report_size_change(user, account, size_delta,
870
                                 {'action': 'object update', 'path': path,
871
                                  'versions': ','.join([str(dest_version_id)])})
872
        if permissions is not None:
873
            self.permissions.access_set(path, permissions)
874
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
875

    
876
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
877
        return dest_version_id
878

    
879
    @backend_method
880
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
881
        """Create/update an object with the specified size and partial hashes."""
882

    
883
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
884
                     account, container, name, size, type, hashmap, checksum)
885
        if size == 0:  # No such thing as an empty hashmap.
886
            hashmap = [self.put_block('')]
887
        map = HashMap(self.block_size, self.hash_algorithm)
888
        map.extend([binascii.unhexlify(x) for x in hashmap])
889
        missing = self.store.block_search(map)
890
        if missing:
891
            ie = IndexError()
892
            ie.data = [binascii.hexlify(x) for x in missing]
893
            raise ie
894

    
895
        hash = map.hash()
896
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
897
        self.store.map_put(hash, map)
898
        return dest_version_id
899

    
900
    @backend_method
901
    def update_object_checksum(self, user, account, container, name, version, checksum):
902
        """Update an object's checksum."""
903

    
904
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
905
                     user, account, container, name, version, checksum)
906
        # Update objects with greater version and same hashmap and size (fix metadata updates).
907
        self._can_write(user, account, container, name)
908
        path, node = self._lookup_object(account, container, name)
909
        props = self._get_version(node, version)
910
        versions = self.node.node_get_versions(node)
911
        for x in versions:
912
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
913
                self.node.version_put_property(
914
                    x[self.SERIAL], 'checksum', checksum)
915

    
916
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
917
        dest_version_ids = []
918
        self._can_read(user, src_account, src_container, src_name)
919
        path, node = self._lookup_object(src_account, src_container, src_name)
920
        # TODO: Will do another fetch of the properties in duplicate version...
921
        props = self._get_version(
922
            node, src_version)  # Check to see if source exists.
923
        src_version_id = props[self.SERIAL]
924
        hash = props[self.HASH]
925
        size = props[self.SIZE]
926
        is_copy = not is_move and (src_account, src_container, src_name) != (
927
            dest_account, dest_container, dest_name)  # New uuid.
928
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
929
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
930
            self._delete_object(user, src_account, src_container, src_name)
931

    
932
        if delimiter:
933
            prefix = src_name + \
934
                delimiter if not src_name.endswith(delimiter) else src_name
935
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
936
            src_names.sort(key=lambda x: x[2])  # order by nodes
937
            paths = [elem[0] for elem in src_names]
938
            nodes = [elem[2] for elem in src_names]
939
            # TODO: Will do another fetch of the properties in duplicate version...
940
            props = self._get_versions(nodes)  # Check to see if source exists.
941

    
942
            for prop, path, node in zip(props, paths, nodes):
943
                src_version_id = prop[self.SERIAL]
944
                hash = prop[self.HASH]
945
                vtype = prop[self.TYPE]
946
                size = prop[self.SIZE]
947
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
948
                    delimiter) else dest_name
949
                vdest_name = path.replace(prefix, dest_prefix, 1)
950
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
951
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
952
                    self._delete_object(user, src_account, src_container, path)
953
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
954

    
955
    @backend_method
956
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
957
        """Copy an object's data and metadata."""
958

    
959
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
960
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
961
        return dest_version_id
962

    
963
    @backend_method
964
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
965
        """Move an object's data and metadata."""
966

    
967
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
968
        if user != src_account:
969
            raise NotAllowedError
970
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
971
        return dest_version_id
972

    
973
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
974
        if user != account:
975
            raise NotAllowedError
976

    
977
        if until is not None:
978
            path = '/'.join((account, container, name))
979
            node = self.node.node_lookup(path)
980
            if node is None:
981
                return
982
            hashes = []
983
            size = 0
984
            serials = []
985
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
986
            hashes += h
987
            size += s
988
            serials += v
989
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
990
            hashes += h
991
            size += s
992
            serials += v
993
            for h in hashes:
994
                self.store.map_delete(h)
995
            self.node.node_purge(node, until, CLUSTER_DELETED)
996
            try:
997
                props = self._get_version(node)
998
            except NameError:
999
                self.permissions.access_clear(path)
1000
            self._report_size_change(user, account, -size,
1001
                                    {'action': 'object purge', 'path': path,
1002
                                     'versions': ','.join(str(i) for i in serials)})
1003
            return
1004

    
1005
        path, node = self._lookup_object(account, container, name)
1006
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1007
        del_size = self._apply_versioning(account, container, src_version_id)
1008
        self._report_size_change(user, account, -del_size,
1009
                                 {'action': 'object delete', 'path': path,
1010
                                  'versions': ','.join([str(dest_version_id)])})
1011
        self._report_object_change(
1012
            user, account, path, details={'action': 'object delete'})
1013
        self.permissions.access_clear(path)
1014

    
1015
        if delimiter:
1016
            prefix = name + delimiter if not name.endswith(delimiter) else name
1017
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1018
            paths = []
1019
            for t in src_names:
1020
                path = '/'.join((account, container, t[0]))
1021
                node = t[2]
1022
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1023
                del_size = self._apply_versioning(
1024
                    account, container, src_version_id)
1025
                self._report_size_change(user, account, -del_size,
1026
                                         {'action': 'object delete',
1027
                                          'path': path,
1028
                                          'versions': ','.join([str(dest_version_id)])})
1029
                self._report_object_change(
1030
                    user, account, path, details={'action': 'object delete'})
1031
                paths.append(path)
1032
            self.permissions.access_clear_bulk(paths)
1033

    
1034
    @backend_method
1035
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1036
        """Delete/purge an object."""
1037

    
1038
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1039
                     account, container, name, until, prefix, delimiter)
1040
        self._delete_object(user, account, container, name, until, delimiter)
1041

    
1042
    @backend_method
1043
    def list_versions(self, user, account, container, name):
1044
        """Return a list of all (version, version_timestamp) tuples for an object."""
1045

    
1046
        logger.debug(
1047
            "list_versions: %s %s %s %s", user, account, container, name)
1048
        self._can_read(user, account, container, name)
1049
        path, node = self._lookup_object(account, container, name)
1050
        versions = self.node.node_get_versions(node)
1051
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1052

    
1053
    @backend_method
1054
    def get_uuid(self, user, uuid):
1055
        """Return the (account, container, name) for the UUID given."""
1056

    
1057
        logger.debug("get_uuid: %s %s", user, uuid)
1058
        info = self.node.latest_uuid(uuid)
1059
        if info is None:
1060
            raise NameError
1061
        path, serial = info
1062
        account, container, name = path.split('/', 2)
1063
        self._can_read(user, account, container, name)
1064
        return (account, container, name)
1065

    
1066
    @backend_method
1067
    def get_public(self, user, public):
1068
        """Return the (account, container, name) for the public id given."""
1069

    
1070
        logger.debug("get_public: %s %s", user, public)
1071
        if public is None or public < ULTIMATE_ANSWER:
1072
            raise NameError
1073
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1074
        if path is None:
1075
            raise NameError
1076
        account, container, name = path.split('/', 2)
1077
        self._can_read(user, account, container, name)
1078
        return (account, container, name)
1079

    
1080
    @backend_method(autocommit=0)
1081
    def get_block(self, hash):
1082
        """Return a block's data."""
1083

    
1084
        logger.debug("get_block: %s", hash)
1085
        block = self.store.block_get(binascii.unhexlify(hash))
1086
        if not block:
1087
            raise ItemNotExists('Block does not exist')
1088
        return block
1089

    
1090
    @backend_method(autocommit=0)
1091
    def put_block(self, data):
1092
        """Store a block and return the hash."""
1093

    
1094
        logger.debug("put_block: %s", len(data))
1095
        return binascii.hexlify(self.store.block_put(data))
1096

    
1097
    @backend_method(autocommit=0)
1098
    def update_block(self, hash, data, offset=0):
1099
        """Update a known block and return the hash."""
1100

    
1101
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1102
        if offset == 0 and len(data) == self.block_size:
1103
            return self.put_block(data)
1104
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1105
        return binascii.hexlify(h)
1106

    
1107
    # Path functions.
1108

    
1109
    def _generate_uuid(self):
1110
        return str(uuidlib.uuid4())
1111

    
1112
    def _put_object_node(self, path, parent, name):
1113
        path = '/'.join((path, name))
1114
        node = self.node.node_lookup(path)
1115
        if node is None:
1116
            node = self.node.node_create(parent, path)
1117
        return path, node
1118

    
1119
    def _put_path(self, user, parent, path):
1120
        node = self.node.node_create(parent, path)
1121
        self.node.version_create(node, None, 0, '', None, user,
1122
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1123
        return node
1124

    
1125
    def _lookup_account(self, account, create=True):
1126
        node = self.node.node_lookup(account)
1127
        if node is None and create:
1128
            node = self._put_path(
1129
                account, self.ROOTNODE, account)  # User is account.
1130
        return account, node
1131

    
1132
    def _lookup_container(self, account, container):
1133
        path = '/'.join((account, container))
1134
        node = self.node.node_lookup(path)
1135
        if node is None:
1136
            raise ItemNotExists('Container does not exist')
1137
        return path, node
1138

    
1139
    def _lookup_object(self, account, container, name):
1140
        path = '/'.join((account, container, name))
1141
        node = self.node.node_lookup(path)
1142
        if node is None:
1143
            raise ItemNotExists('Object does not exist')
1144
        return path, node
1145

    
1146
    def _lookup_objects(self, paths):
1147
        nodes = self.node.node_lookup_bulk(paths)
1148
        return paths, nodes
1149

    
1150
    def _get_properties(self, node, until=None):
1151
        """Return properties until the timestamp given."""
1152

    
1153
        before = until if until is not None else inf
1154
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1155
        if props is None and until is not None:
1156
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1157
        if props is None:
1158
            raise ItemNotExists('Path does not exist')
1159
        return props
1160

    
1161
    def _get_statistics(self, node, until=None):
1162
        """Return count, sum of size and latest timestamp of everything under node."""
1163

    
1164
        if until is None:
1165
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1166
        else:
1167
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1168
        if stats is None:
1169
            stats = (0, 0, 0)
1170
        return stats
1171

    
1172
    def _get_version(self, node, version=None):
1173
        if version is None:
1174
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1175
            if props is None:
1176
                raise ItemNotExists('Object does not exist')
1177
        else:
1178
            try:
1179
                version = int(version)
1180
            except ValueError:
1181
                raise VersionNotExists('Version does not exist')
1182
            props = self.node.version_get_properties(version)
1183
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1184
                raise VersionNotExists('Version does not exist')
1185
        return props
1186

    
1187
    def _get_versions(self, nodes):
1188
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1189

    
1190
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1191
        """Create a new version of the node."""
1192

    
1193
        props = self.node.version_lookup(
1194
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1195
        if props is not None:
1196
            src_version_id = props[self.SERIAL]
1197
            src_hash = props[self.HASH]
1198
            src_size = props[self.SIZE]
1199
            src_type = props[self.TYPE]
1200
            src_checksum = props[self.CHECKSUM]
1201
        else:
1202
            src_version_id = None
1203
            src_hash = None
1204
            src_size = 0
1205
            src_type = ''
1206
            src_checksum = ''
1207
        if size is None:  # Set metadata.
1208
            hash = src_hash  # This way hash can be set to None (account or container).
1209
            size = src_size
1210
        if type is None:
1211
            type = src_type
1212
        if checksum is None:
1213
            checksum = src_checksum
1214
        uuid = self._generate_uuid(
1215
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1216

    
1217
        if src_node is None:
1218
            pre_version_id = src_version_id
1219
        else:
1220
            pre_version_id = None
1221
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1222
            if props is not None:
1223
                pre_version_id = props[self.SERIAL]
1224
        if pre_version_id is not None:
1225
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1226

    
1227
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1228
        return pre_version_id, dest_version_id
1229

    
1230
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1231
        if src_version_id is not None:
1232
            self.node.attribute_copy(src_version_id, dest_version_id)
1233
        if not replace:
1234
            self.node.attribute_del(dest_version_id, domain, (
1235
                k for k, v in meta.iteritems() if v == ''))
1236
            self.node.attribute_set(dest_version_id, domain, (
1237
                (k, v) for k, v in meta.iteritems() if v != ''))
1238
        else:
1239
            self.node.attribute_del(dest_version_id, domain)
1240
            self.node.attribute_set(dest_version_id, domain, ((
1241
                k, v) for k, v in meta.iteritems()))
1242

    
1243
    def _put_metadata(self, user, node, domain, meta, replace=False):
1244
        """Create a new version and store metadata."""
1245

    
1246
        src_version_id, dest_version_id = self._put_version_duplicate(
1247
            user, node)
1248
        self._put_metadata_duplicate(
1249
            src_version_id, dest_version_id, domain, meta, replace)
1250
        return src_version_id, dest_version_id
1251

    
1252
    def _list_limits(self, listing, marker, limit):
1253
        start = 0
1254
        if marker:
1255
            try:
1256
                start = listing.index(marker) + 1
1257
            except ValueError:
1258
                pass
1259
        if not limit or limit > 10000:
1260
            limit = 10000
1261
        return start, limit
1262

    
1263
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1264
        cont_prefix = path + '/'
1265
        prefix = cont_prefix + prefix
1266
        start = cont_prefix + marker if marker else None
1267
        before = until if until is not None else inf
1268
        filterq = keys if domain else []
1269
        sizeq = size_range
1270

    
1271
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1272
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1273
        objects.sort(key=lambda x: x[0])
1274
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1275
        return objects
1276

    
1277
    # Reporting functions.
1278

    
1279
    def _report_size_change(self, user, account, size, details={}):
1280
        account_node = self._lookup_account(account, True)[1]
1281
        total = self._get_statistics(account_node)[1]
1282
        details.update({'user': user, 'total': total})
1283
        logger.debug(
1284
            "_report_size_change: %s %s %s %s", user, account, size, details)
1285
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1286
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1287
                              float(size), details))
1288

    
1289
        if not self.using_external_quotaholder:
1290
            return
1291

    
1292
        serial = self.quotaholder.issue_commission(
1293
                context     =   {},
1294
                target      =   account,
1295
                key         =   '1',
1296
                clientkey   =   'pithos',
1297
                ownerkey    =   '',
1298
                name        =   details['path'] if 'path' in details else '',
1299
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1300
        )
1301
        self.serials.append(serial)
1302

    
1303
    def _report_object_change(self, user, account, path, details={}):
1304
        details.update({'user': user})
1305
        logger.debug("_report_object_change: %s %s %s %s", user,
1306
                     account, path, details)
1307
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1308
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1309

    
1310
    def _report_sharing_change(self, user, account, path, details={}):
1311
        logger.debug("_report_permissions_change: %s %s %s %s",
1312
                     user, account, path, details)
1313
        details.update({'user': user})
1314
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1315
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1316

    
1317
    # Policy functions.
1318

    
1319
    def _check_policy(self, policy):
1320
        for k in policy.keys():
1321
            if policy[k] == '':
1322
                policy[k] = self.default_policy.get(k)
1323
        for k, v in policy.iteritems():
1324
            if k == 'quota':
1325
                q = int(v)  # May raise ValueError.
1326
                if q < 0:
1327
                    raise ValueError
1328
            elif k == 'versioning':
1329
                if v not in ['auto', 'none']:
1330
                    raise ValueError
1331
            else:
1332
                raise ValueError
1333

    
1334
    def _put_policy(self, node, policy, replace):
1335
        if replace:
1336
            for k, v in self.default_policy.iteritems():
1337
                if k not in policy:
1338
                    policy[k] = v
1339
        self.node.policy_set(node, policy)
1340

    
1341
    def _get_policy(self, node):
1342
        policy = self.default_policy.copy()
1343
        policy.update(self.node.policy_get(node))
1344
        return policy
1345

    
1346
    def _apply_versioning(self, account, container, version_id):
1347
        """Delete the provided version if such is the policy.
1348
           Return size of object removed.
1349
        """
1350

    
1351
        if version_id is None:
1352
            return 0
1353
        path, node = self._lookup_container(account, container)
1354
        versioning = self._get_policy(node)['versioning']
1355
        if versioning != 'auto':
1356
            hash, size = self.node.version_remove(version_id)
1357
            self.store.map_delete(hash)
1358
            return size
1359
        elif self.free_versioning:
1360
            return self.node.version_get_properties(
1361
                version_id, keys=('size',))[0]
1362
        return 0
1363

    
1364
    # Access control functions.
1365

    
1366
    def _check_groups(self, groups):
1367
        # raise ValueError('Bad characters in groups')
1368
        pass
1369

    
1370
    def _check_permissions(self, path, permissions):
1371
        # raise ValueError('Bad characters in permissions')
1372
        pass
1373

    
1374
    def _get_formatted_paths(self, paths):
1375
        formatted = []
1376
        for p in paths:
1377
            node = self.node.node_lookup(p)
1378
            props = None
1379
            if node is not None:
1380
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1381
            if props is not None:
1382
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1383
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1384
                formatted.append((p, self.MATCH_EXACT))
1385
        return formatted
1386

    
1387
    def _get_permissions_path(self, account, container, name):
1388
        path = '/'.join((account, container, name))
1389
        permission_paths = self.permissions.access_inherit(path)
1390
        permission_paths.sort()
1391
        permission_paths.reverse()
1392
        for p in permission_paths:
1393
            if p == path:
1394
                return p
1395
            else:
1396
                if p.count('/') < 2:
1397
                    continue
1398
                node = self.node.node_lookup(p)
1399
                props = None
1400
                if node is not None:
1401
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1402
                if props is not None:
1403
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1404
                        return p
1405
        return None
1406

    
1407
    def _can_read(self, user, account, container, name):
1408
        if user == account:
1409
            return True
1410
        path = '/'.join((account, container, name))
1411
        if self.permissions.public_get(path) is not None:
1412
            return True
1413
        path = self._get_permissions_path(account, container, name)
1414
        if not path:
1415
            raise NotAllowedError
1416
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1417
            raise NotAllowedError
1418

    
1419
    def _can_write(self, user, account, container, name):
1420
        if user == account:
1421
            return True
1422
        path = '/'.join((account, container, name))
1423
        path = self._get_permissions_path(account, container, name)
1424
        if not path:
1425
            raise NotAllowedError
1426
        if not self.permissions.access_check(path, self.WRITE, user):
1427
            raise NotAllowedError
1428

    
1429
    def _allowed_accounts(self, user):
1430
        allow = set()
1431
        for path in self.permissions.access_list_paths(user):
1432
            allow.add(path.split('/', 1)[0])
1433
        return sorted(allow)
1434

    
1435
    def _allowed_containers(self, user, account):
1436
        allow = set()
1437
        for path in self.permissions.access_list_paths(user, account):
1438
            allow.add(path.split('/', 2)[1])
1439
        return sorted(allow)