Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 07fcdb96

History | View | Annotate | Download (62.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            if serials:
129
                self.quotaholder.reject_commission(
130
                            context     =   {},
131
                            clientkey   =   'pithos',
132
                            serials     =   serials)
133
            self.wrapper.rollback()
134
            raise
135
    return fn
136

    
137

    
138
class ModularBackend(BaseBackend):
139
    """A modular backend.
140

141
    Uses modules for SQL functions and storage.
142
    """
143

    
144
    def __init__(self, db_module=None, db_connection=None,
145
                 block_module=None, block_path=None, block_umask=None,
146
                 queue_module=None, queue_hosts=None, queue_exchange=None,
147
                 quotaholder_url=None, quotaholder_token=None,
148
                 free_versioning=True):
149
        db_module = db_module or DEFAULT_DB_MODULE
150
        db_connection = db_connection or DEFAULT_DB_CONNECTION
151
        block_module = block_module or DEFAULT_BLOCK_MODULE
152
        block_path = block_path or DEFAULT_BLOCK_PATH
153
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
154
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
155
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
156
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
157
        
158
        self.hash_algorithm = 'sha256'
159
        self.block_size = 4 * 1024 * 1024  # 4MB
160
        self.free_versioning = free_versioning
161

    
162
        self.default_policy = {'quota': DEFAULT_QUOTA,
163
                               'versioning': DEFAULT_VERSIONING}
164

    
165
        def load_module(m):
166
            __import__(m)
167
            return sys.modules[m]
168

    
169
        self.db_module = load_module(db_module)
170
        self.wrapper = self.db_module.DBWrapper(db_connection)
171
        params = {'wrapper': self.wrapper}
172
        self.permissions = self.db_module.Permissions(**params)
173
        self.config = self.db_module.Config(**params)
174
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
175
        for x in ['READ', 'WRITE']:
176
            setattr(self, x, getattr(self.db_module, x))
177
        self.node = self.db_module.Node(**params)
178
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
179
            setattr(self, x, getattr(self.db_module, x))
180

    
181
        self.block_module = load_module(block_module)
182
        params = {'path': block_path,
183
                  'block_size': self.block_size,
184
                  'hash_algorithm': self.hash_algorithm,
185
                  'umask': block_umask}
186
        self.store = self.block_module.Store(**params)
187

    
188
        if queue_module and queue_hosts:
189
            self.queue_module = load_module(queue_module)
190
            params = {'hosts': queue_hosts,
191
                              'exchange': queue_exchange,
192
                      'client_id': QUEUE_CLIENT_ID}
193
            self.queue = self.queue_module.Queue(**params)
194
        else:
195
            class NoQueue:
196
                def send(self, *args):
197
                    pass
198

    
199
                def close(self):
200
                    pass
201

    
202
            self.queue = NoQueue()
203

    
204
        self.quotaholder_url = quotaholder_url
205
        self.quotaholder_token = quotaholder_token
206
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
207
        self.serials = []
208
        self.messages = []
209

    
210
    def close(self):
211
        self.wrapper.close()
212
        self.queue.close()
213

    
214
    @property
215
    def using_external_quotaholder(self):
216
        if self.quotaholder_url:
217
            return True
218
        return False
219

    
220
    @backend_method
221
    def list_accounts(self, user, marker=None, limit=10000):
222
        """Return a list of accounts the user can access."""
223

    
224
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
225
        allowed = self._allowed_accounts(user)
226
        start, limit = self._list_limits(allowed, marker, limit)
227
        return allowed[start:start + limit]
228

    
229
    @backend_method
230
    def get_account_meta(
231
            self, user, account, domain, until=None, include_user_defined=True,
232
            external_quota=None):
233
        """Return a dictionary with the account metadata for the domain."""
234

    
235
        external_quota = external_quota or {}
236
        logger.debug(
237
            "get_account_meta: %s %s %s %s", user, account, domain, until)
238
        path, node = self._lookup_account(account, user == account)
239
        if user != account:
240
            if until or node is None or account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
        try:
243
            props = self._get_properties(node, until)
244
            mtime = props[self.MTIME]
245
        except NameError:
246
            props = None
247
            mtime = until
248
        count, bytes, tstamp = self._get_statistics(node, until)
249
        tstamp = max(tstamp, mtime)
250
        if until is None:
251
            modified = tstamp
252
        else:
253
            modified = self._get_statistics(
254
                node)[2]  # Overall last modification.
255
            modified = max(modified, mtime)
256

    
257
        if user != account:
258
            meta = {'name': account}
259
        else:
260
            meta = {}
261
            if props is not None and include_user_defined:
262
                meta.update(
263
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
264
            if until is not None:
265
                meta.update({'until_timestamp': tstamp})
266
            meta.update({'name': account, 'count': count, 'bytes': bytes})
267
            if self.using_external_quotaholder:
268
                meta['bytes'] = external_quota.get('currValue', 0)
269
        meta.update({'modified': modified})
270
        return meta
271

    
272
    @backend_method
273
    def update_account_meta(self, user, account, domain, meta, replace=False):
274
        """Update the metadata associated with the account for the domain."""
275

    
276
        logger.debug("update_account_meta: %s %s %s %s %s", user,
277
                     account, domain, meta, replace)
278
        if user != account:
279
            raise NotAllowedError
280
        path, node = self._lookup_account(account, True)
281
        self._put_metadata(user, node, domain, meta, replace)
282

    
283
    @backend_method
284
    def get_account_groups(self, user, account):
285
        """Return a dictionary with the user groups defined for this account."""
286

    
287
        logger.debug("get_account_groups: %s %s", user, account)
288
        if user != account:
289
            if account not in self._allowed_accounts(user):
290
                raise NotAllowedError
291
            return {}
292
        self._lookup_account(account, True)
293
        return self.permissions.group_dict(account)
294

    
295
    @backend_method
296
    def update_account_groups(self, user, account, groups, replace=False):
297
        """Update the groups associated with the account."""
298

    
299
        logger.debug("update_account_groups: %s %s %s %s", user,
300
                     account, groups, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        self._lookup_account(account, True)
304
        self._check_groups(groups)
305
        if replace:
306
            self.permissions.group_destroy(account)
307
        for k, v in groups.iteritems():
308
            if not replace:  # If not already deleted.
309
                self.permissions.group_delete(account, k)
310
            if v:
311
                self.permissions.group_addmany(account, k, v)
312

    
313
    @backend_method
314
    def get_account_policy(self, user, account, external_quota=None):
315
        """Return a dictionary with the account policy."""
316

    
317
        logger.debug("get_account_policy: %s %s", user, account)
318
        if user != account:
319
            if account not in self._allowed_accounts(user):
320
                raise NotAllowedError
321
            return {}
322
        path, node = self._lookup_account(account, True)
323
        policy = self._get_policy(node)
324
        if self.using_external_quotaholder:
325
            policy['quota'] = external_quota.get('maxValue', 0)
326
        return policy
327

    
328
    @backend_method
329
    def update_account_policy(self, user, account, policy, replace=False):
330
        """Update the policy associated with the account."""
331

    
332
        logger.debug("update_account_policy: %s %s %s %s", user,
333
                     account, policy, replace)
334
        if user != account:
335
            raise NotAllowedError
336
        path, node = self._lookup_account(account, True)
337
        self._check_policy(policy)
338
        self._put_policy(node, policy, replace)
339

    
340
    @backend_method
341
    def put_account(self, user, account, policy={}):
342
        """Create a new account with the given name."""
343

    
344
        logger.debug("put_account: %s %s %s", user, account, policy)
345
        if user != account:
346
            raise NotAllowedError
347
        node = self.node.node_lookup(account)
348
        if node is not None:
349
            raise AccountExists('Account already exists')
350
        if policy:
351
            self._check_policy(policy)
352
        node = self._put_path(user, self.ROOTNODE, account)
353
        self._put_policy(node, policy, True)
354

    
355
    @backend_method
356
    def delete_account(self, user, account):
357
        """Delete the account with the given name."""
358

    
359
        logger.debug("delete_account: %s %s", user, account)
360
        if user != account:
361
            raise NotAllowedError
362
        node = self.node.node_lookup(account)
363
        if node is None:
364
            return
365
        if not self.node.node_remove(node):
366
            raise AccountNotEmpty('Account is not empty')
367
        self.permissions.group_destroy(account)
368

    
369
    @backend_method
370
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
371
        """Return a list of containers existing under an account."""
372

    
373
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
374
                     account, marker, limit, shared, until, public)
375
        if user != account:
376
            if until or account not in self._allowed_accounts(user):
377
                raise NotAllowedError
378
            allowed = self._allowed_containers(user, account)
379
            start, limit = self._list_limits(allowed, marker, limit)
380
            return allowed[start:start + limit]
381
        if shared or public:
382
            allowed = set()
383
            if shared:
384
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
385
            if public:
386
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
387
            allowed = sorted(allowed)
388
            start, limit = self._list_limits(allowed, marker, limit)
389
            return allowed[start:start + limit]
390
        node = self.node.node_lookup(account)
391
        containers = [x[0] for x in self._list_object_properties(
392
            node, account, '', '/', marker, limit, False, None, [], until)]
393
        start, limit = self._list_limits(
394
            [x[0] for x in containers], marker, limit)
395
        return containers[start:start + limit]
396

    
397
    @backend_method
398
    def list_container_meta(self, user, account, container, domain, until=None):
399
        """Return a list with all the container's object meta keys for the domain."""
400

    
401
        logger.debug("list_container_meta: %s %s %s %s %s", user,
402
                     account, container, domain, until)
403
        allowed = []
404
        if user != account:
405
            if until:
406
                raise NotAllowedError
407
            allowed = self.permissions.access_list_paths(
408
                user, '/'.join((account, container)))
409
            if not allowed:
410
                raise NotAllowedError
411
        path, node = self._lookup_container(account, container)
412
        before = until if until is not None else inf
413
        allowed = self._get_formatted_paths(allowed)
414
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
415

    
416
    @backend_method
417
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
418
        """Return a dictionary with the container metadata for the domain."""
419

    
420
        logger.debug("get_container_meta: %s %s %s %s %s", user,
421
                     account, container, domain, until)
422
        if user != account:
423
            if until or container not in self._allowed_containers(user, account):
424
                raise NotAllowedError
425
        path, node = self._lookup_container(account, container)
426
        props = self._get_properties(node, until)
427
        mtime = props[self.MTIME]
428
        count, bytes, tstamp = self._get_statistics(node, until)
429
        tstamp = max(tstamp, mtime)
430
        if until is None:
431
            modified = tstamp
432
        else:
433
            modified = self._get_statistics(
434
                node)[2]  # Overall last modification.
435
            modified = max(modified, mtime)
436

    
437
        if user != account:
438
            meta = {'name': container}
439
        else:
440
            meta = {}
441
            if include_user_defined:
442
                meta.update(
443
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
444
            if until is not None:
445
                meta.update({'until_timestamp': tstamp})
446
            meta.update({'name': container, 'count': count, 'bytes': bytes})
447
        meta.update({'modified': modified})
448
        return meta
449

    
450
    @backend_method
451
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
452
        """Update the metadata associated with the container for the domain."""
453

    
454
        logger.debug("update_container_meta: %s %s %s %s %s %s",
455
                     user, account, container, domain, meta, replace)
456
        if user != account:
457
            raise NotAllowedError
458
        path, node = self._lookup_container(account, container)
459
        src_version_id, dest_version_id = self._put_metadata(
460
            user, node, domain, meta, replace)
461
        if src_version_id is not None:
462
            versioning = self._get_policy(node)['versioning']
463
            if versioning != 'auto':
464
                self.node.version_remove(src_version_id)
465

    
466
    @backend_method
467
    def get_container_policy(self, user, account, container):
468
        """Return a dictionary with the container policy."""
469

    
470
        logger.debug(
471
            "get_container_policy: %s %s %s", user, account, container)
472
        if user != account:
473
            if container not in self._allowed_containers(user, account):
474
                raise NotAllowedError
475
            return {}
476
        path, node = self._lookup_container(account, container)
477
        return self._get_policy(node)
478

    
479
    @backend_method
480
    def update_container_policy(self, user, account, container, policy, replace=False):
481
        """Update the policy associated with the container."""
482

    
483
        logger.debug("update_container_policy: %s %s %s %s %s",
484
                     user, account, container, policy, replace)
485
        if user != account:
486
            raise NotAllowedError
487
        path, node = self._lookup_container(account, container)
488
        self._check_policy(policy)
489
        self._put_policy(node, policy, replace)
490

    
491
    @backend_method
492
    def put_container(self, user, account, container, policy={}):
493
        """Create a new container with the given name."""
494

    
495
        logger.debug(
496
            "put_container: %s %s %s %s", user, account, container, policy)
497
        if user != account:
498
            raise NotAllowedError
499
        try:
500
            path, node = self._lookup_container(account, container)
501
        except NameError:
502
            pass
503
        else:
504
            raise ContainerExists('Container already exists')
505
        if policy:
506
            self._check_policy(policy)
507
        path = '/'.join((account, container))
508
        node = self._put_path(
509
            user, self._lookup_account(account, True)[1], path)
510
        self._put_policy(node, policy, True)
511

    
512
    @backend_method
513
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
514
        """Delete/purge the container with the given name."""
515

    
516
        logger.debug("delete_container: %s %s %s %s %s %s", user,
517
                     account, container, until, prefix, delimiter)
518
        if user != account:
519
            raise NotAllowedError
520
        path, node = self._lookup_container(account, container)
521

    
522
        if until is not None:
523
            hashes, size, serials = self.node.node_purge_children(
524
                node, until, CLUSTER_HISTORY)
525
            for h in hashes:
526
                self.store.map_delete(h)
527
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
528
            self._report_size_change(user, account, -size,
529
                                     {'action':'container purge', 'path': path,
530
                                      'versions': ','.join(str(i) for i in serials)})
531
            return
532

    
533
        if not delimiter:
534
            if self._get_statistics(node)[0] > 0:
535
                raise ContainerNotEmpty('Container is not empty')
536
            hashes, size, serials = self.node.node_purge_children(
537
                node, inf, CLUSTER_HISTORY)
538
            for h in hashes:
539
                self.store.map_delete(h)
540
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
541
            self.node.node_remove(node)
542
            self._report_size_change(user, account, -size,
543
                                     {'action': 'container delete',
544
                                      'path': path,
545
                                      'versions': ','.join(str(i) for i in serials)})
546
        else:
547
            # remove only contents
548
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
549
            paths = []
550
            for t in src_names:
551
                path = '/'.join((account, container, t[0]))
552
                node = t[2]
553
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
554
                del_size = self._apply_versioning(
555
                    account, container, src_version_id)
556
                self._report_size_change(
557
                        user, account, -del_size, {
558
                                'action': 'object delete',
559
                                'path': path,
560
                             'versions': ','.join([str(dest_version_id)])
561
                     }
562
                )
563
                self._report_object_change(
564
                    user, account, path, details={'action': 'object delete'})
565
                paths.append(path)
566
            self.permissions.access_clear_bulk(paths)
567

    
568
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
569
        if user != account and until:
570
            raise NotAllowedError
571
        if shared and public:
572
            # get shared first
573
            shared = self._list_object_permissions(
574
                user, account, container, prefix, shared=True, public=False)
575
            objects = set()
576
            if shared:
577
                path, node = self._lookup_container(account, container)
578
                shared = self._get_formatted_paths(shared)
579
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
580

    
581
            # get public
582
            objects |= set(self._list_public_object_properties(
583
                user, account, container, prefix, all_props))
584
            objects = list(objects)
585

    
586
            objects.sort(key=lambda x: x[0])
587
            start, limit = self._list_limits(
588
                [x[0] for x in objects], marker, limit)
589
            return objects[start:start + limit]
590
        elif public:
591
            objects = self._list_public_object_properties(
592
                user, account, container, prefix, all_props)
593
            start, limit = self._list_limits(
594
                [x[0] for x in objects], marker, limit)
595
            return objects[start:start + limit]
596

    
597
        allowed = self._list_object_permissions(
598
            user, account, container, prefix, shared, public)
599
        if shared and not allowed:
600
            return []
601
        path, node = self._lookup_container(account, container)
602
        allowed = self._get_formatted_paths(allowed)
603
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
604
        start, limit = self._list_limits(
605
            [x[0] for x in objects], marker, limit)
606
        return objects[start:start + limit]
607

    
608
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
609
        public = self._list_object_permissions(
610
            user, account, container, prefix, shared=False, public=True)
611
        paths, nodes = self._lookup_objects(public)
612
        path = '/'.join((account, container))
613
        cont_prefix = path + '/'
614
        paths = [x[len(cont_prefix):] for x in paths]
615
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
616
        objects = [(path,) + props for path, props in zip(paths, props)]
617
        return objects
618

    
619
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
620
        objects = []
621
        while True:
622
            marker = objects[-1] if objects else None
623
            limit = 10000
624
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
625
            objects.extend(l)
626
            if not l or len(l) < limit:
627
                break
628
        return objects
629

    
630
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
631
        allowed = []
632
        path = '/'.join((account, container, prefix)).rstrip('/')
633
        if user != account:
634
            allowed = self.permissions.access_list_paths(user, path)
635
            if not allowed:
636
                raise NotAllowedError
637
        else:
638
            allowed = set()
639
            if shared:
640
                allowed.update(self.permissions.access_list_shared(path))
641
            if public:
642
                allowed.update(
643
                    [x[0] for x in self.permissions.public_list(path)])
644
            allowed = sorted(allowed)
645
            if not allowed:
646
                return []
647
        return allowed
648

    
649
    @backend_method
650
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
651
        """Return a list of object (name, version_id) tuples existing under a container."""
652

    
653
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
654
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
655

    
656
    @backend_method
657
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
658
        """Return a list of object metadata dicts existing under a container."""
659

    
660
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
661
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
662
        objects = []
663
        for p in props:
664
            if len(p) == 2:
665
                objects.append({'subdir': p[0]})
666
            else:
667
                objects.append({'name': p[0],
668
                                'bytes': p[self.SIZE + 1],
669
                                'type': p[self.TYPE + 1],
670
                                'hash': p[self.HASH + 1],
671
                                'version': p[self.SERIAL + 1],
672
                                'version_timestamp': p[self.MTIME + 1],
673
                                'modified': p[self.MTIME + 1] if until is None else None,
674
                                'modified_by': p[self.MUSER + 1],
675
                                'uuid': p[self.UUID + 1],
676
                                'checksum': p[self.CHECKSUM + 1]})
677
        return objects
678

    
679
    @backend_method
680
    def list_object_permissions(self, user, account, container, prefix=''):
681
        """Return a list of paths that enforce permissions under a container."""
682

    
683
        logger.debug("list_object_permissions: %s %s %s %s", user,
684
                     account, container, prefix)
685
        return self._list_object_permissions(user, account, container, prefix, True, False)
686

    
687
    @backend_method
688
    def list_object_public(self, user, account, container, prefix=''):
689
        """Return a dict mapping paths to public ids for objects that are public under a container."""
690

    
691
        logger.debug("list_object_public: %s %s %s %s", user,
692
                     account, container, prefix)
693
        public = {}
694
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
695
            public[path] = p + ULTIMATE_ANSWER
696
        return public
697

    
698
    @backend_method
699
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
700
        """Return a dictionary with the object metadata for the domain."""
701

    
702
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
703
                     account, container, name, domain, version)
704
        self._can_read(user, account, container, name)
705
        path, node = self._lookup_object(account, container, name)
706
        props = self._get_version(node, version)
707
        if version is None:
708
            modified = props[self.MTIME]
709
        else:
710
            try:
711
                modified = self._get_version(
712
                    node)[self.MTIME]  # Overall last modification.
713
            except NameError:  # Object may be deleted.
714
                del_props = self.node.version_lookup(
715
                    node, inf, CLUSTER_DELETED)
716
                if del_props is None:
717
                    raise ItemNotExists('Object does not exist')
718
                modified = del_props[self.MTIME]
719

    
720
        meta = {}
721
        if include_user_defined:
722
            meta.update(
723
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
724
        meta.update({'name': name,
725
                     'bytes': props[self.SIZE],
726
                     'type': props[self.TYPE],
727
                     'hash': props[self.HASH],
728
                     'version': props[self.SERIAL],
729
                     'version_timestamp': props[self.MTIME],
730
                     'modified': modified,
731
                     'modified_by': props[self.MUSER],
732
                     'uuid': props[self.UUID],
733
                     'checksum': props[self.CHECKSUM]})
734
        return meta
735

    
736
    @backend_method
737
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
738
        """Update the metadata associated with the object for the domain and return the new version."""
739

    
740
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
741
                     user, account, container, name, domain, meta, replace)
742
        self._can_write(user, account, container, name)
743
        path, node = self._lookup_object(account, container, name)
744
        src_version_id, dest_version_id = self._put_metadata(
745
            user, node, domain, meta, replace)
746
        self._apply_versioning(account, container, src_version_id)
747
        return dest_version_id
748

    
749
    @backend_method
750
    def get_object_permissions(self, user, account, container, name):
751
        """Return the action allowed on the object, the path
752
        from which the object gets its permissions from,
753
        along with a dictionary containing the permissions."""
754

    
755
        logger.debug("get_object_permissions: %s %s %s %s", user,
756
                     account, container, name)
757
        allowed = 'write'
758
        permissions_path = self._get_permissions_path(account, container, name)
759
        if user != account:
760
            if self.permissions.access_check(permissions_path, self.WRITE, user):
761
                allowed = 'write'
762
            elif self.permissions.access_check(permissions_path, self.READ, user):
763
                allowed = 'read'
764
            else:
765
                raise NotAllowedError
766
        self._lookup_object(account, container, name)
767
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
768

    
769
    @backend_method
770
    def update_object_permissions(self, user, account, container, name, permissions):
771
        """Update the permissions associated with the object."""
772

    
773
        logger.debug("update_object_permissions: %s %s %s %s %s",
774
                     user, account, container, name, permissions)
775
        if user != account:
776
            raise NotAllowedError
777
        path = self._lookup_object(account, container, name)[0]
778
        self._check_permissions(path, permissions)
779
        self.permissions.access_set(path, permissions)
780
        self._report_sharing_change(user, account, path, {'members':
781
                                    self.permissions.access_members(path)})
782

    
783
    @backend_method
784
    def get_object_public(self, user, account, container, name):
785
        """Return the public id of the object if applicable."""
786

    
787
        logger.debug(
788
            "get_object_public: %s %s %s %s", user, account, container, name)
789
        self._can_read(user, account, container, name)
790
        path = self._lookup_object(account, container, name)[0]
791
        p = self.permissions.public_get(path)
792
        if p is not None:
793
            p += ULTIMATE_ANSWER
794
        return p
795

    
796
    @backend_method
797
    def update_object_public(self, user, account, container, name, public):
798
        """Update the public status of the object."""
799

    
800
        logger.debug("update_object_public: %s %s %s %s %s", user,
801
                     account, container, name, public)
802
        self._can_write(user, account, container, name)
803
        path = self._lookup_object(account, container, name)[0]
804
        if not public:
805
            self.permissions.public_unset(path)
806
        else:
807
            self.permissions.public_set(path)
808

    
809
    @backend_method
810
    def get_object_hashmap(self, user, account, container, name, version=None):
811
        """Return the object's size and a list with partial hashes."""
812

    
813
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
814
                     account, container, name, version)
815
        self._can_read(user, account, container, name)
816
        path, node = self._lookup_object(account, container, name)
817
        props = self._get_version(node, version)
818
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
819
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
820

    
821
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
822
        if permissions is not None and user != account:
823
            raise NotAllowedError
824
        self._can_write(user, account, container, name)
825
        if permissions is not None:
826
            path = '/'.join((account, container, name))
827
            self._check_permissions(path, permissions)
828

    
829
        account_path, account_node = self._lookup_account(account, True)
830
        container_path, container_node = self._lookup_container(
831
            account, container)
832
        path, node = self._put_object_node(
833
            container_path, container_node, name)
834
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
835

    
836
        # Handle meta.
837
        if src_version_id is None:
838
            src_version_id = pre_version_id
839
        self._put_metadata_duplicate(
840
            src_version_id, dest_version_id, domain, meta, replace_meta)
841

    
842
        del_size = self._apply_versioning(account, container, pre_version_id)
843
        size_delta = size - del_size
844
        if not self.using_external_quotaholder: # Check quota.
845
            if size_delta > 0:
846
                account_quota = long(self._get_policy(account_node)['quota'])
847
                account_usage = self._get_statistics(account_node)[1] + size_delta
848
                container_quota = long(self._get_policy(container_node)['quota'])
849
                container_usage = self._get_statistics(container_node)[1] + size_delta
850
                if (account_quota > 0 and account_usage > account_quota):
851
                    logger.error('account_quota: %s, account_usage: %s' % (
852
                        account_quota, account_usage
853
                    ))
854
                    raise QuotaError
855
                if (container_quota > 0 and container_usage > container_quota):
856
                    # This must be executed in a transaction, so the version is
857
                    # never created if it fails.
858
                    logger.error('container_quota: %s, container_usage: %s' % (
859
                        container_quota, container_usage
860
                    ))
861
                    raise QuotaError
862
        self._report_size_change(user, account, size_delta,
863
                                 {'action': 'object update', 'path': path,
864
                                  'versions': ','.join([str(dest_version_id)])})
865
        if permissions is not None:
866
            self.permissions.access_set(path, permissions)
867
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
868

    
869
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
870
        return dest_version_id
871

    
872
    @backend_method
873
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
874
        """Create/update an object with the specified size and partial hashes."""
875

    
876
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
877
                     account, container, name, size, type, hashmap, checksum)
878
        if size == 0:  # No such thing as an empty hashmap.
879
            hashmap = [self.put_block('')]
880
        map = HashMap(self.block_size, self.hash_algorithm)
881
        map.extend([binascii.unhexlify(x) for x in hashmap])
882
        missing = self.store.block_search(map)
883
        if missing:
884
            ie = IndexError()
885
            ie.data = [binascii.hexlify(x) for x in missing]
886
            raise ie
887

    
888
        hash = map.hash()
889
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
890
        self.store.map_put(hash, map)
891
        return dest_version_id
892

    
893
    @backend_method
894
    def update_object_checksum(self, user, account, container, name, version, checksum):
895
        """Update an object's checksum."""
896

    
897
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
898
                     user, account, container, name, version, checksum)
899
        # Update objects with greater version and same hashmap and size (fix metadata updates).
900
        self._can_write(user, account, container, name)
901
        path, node = self._lookup_object(account, container, name)
902
        props = self._get_version(node, version)
903
        versions = self.node.node_get_versions(node)
904
        for x in versions:
905
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
906
                self.node.version_put_property(
907
                    x[self.SERIAL], 'checksum', checksum)
908

    
909
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
910
        dest_version_ids = []
911
        self._can_read(user, src_account, src_container, src_name)
912
        path, node = self._lookup_object(src_account, src_container, src_name)
913
        # TODO: Will do another fetch of the properties in duplicate version...
914
        props = self._get_version(
915
            node, src_version)  # Check to see if source exists.
916
        src_version_id = props[self.SERIAL]
917
        hash = props[self.HASH]
918
        size = props[self.SIZE]
919
        is_copy = not is_move and (src_account, src_container, src_name) != (
920
            dest_account, dest_container, dest_name)  # New uuid.
921
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
922
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
923
            self._delete_object(user, src_account, src_container, src_name)
924

    
925
        if delimiter:
926
            prefix = src_name + \
927
                delimiter if not src_name.endswith(delimiter) else src_name
928
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
929
            src_names.sort(key=lambda x: x[2])  # order by nodes
930
            paths = [elem[0] for elem in src_names]
931
            nodes = [elem[2] for elem in src_names]
932
            # TODO: Will do another fetch of the properties in duplicate version...
933
            props = self._get_versions(nodes)  # Check to see if source exists.
934

    
935
            for prop, path, node in zip(props, paths, nodes):
936
                src_version_id = prop[self.SERIAL]
937
                hash = prop[self.HASH]
938
                vtype = prop[self.TYPE]
939
                size = prop[self.SIZE]
940
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
941
                    delimiter) else dest_name
942
                vdest_name = path.replace(prefix, dest_prefix, 1)
943
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
944
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
945
                    self._delete_object(user, src_account, src_container, path)
946
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
947

    
948
    @backend_method
949
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
950
        """Copy an object's data and metadata."""
951

    
952
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
953
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
954
        return dest_version_id
955

    
956
    @backend_method
957
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
958
        """Move an object's data and metadata."""
959

    
960
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
961
        if user != src_account:
962
            raise NotAllowedError
963
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
964
        return dest_version_id
965

    
966
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
967
        if user != account:
968
            raise NotAllowedError
969

    
970
        if until is not None:
971
            path = '/'.join((account, container, name))
972
            node = self.node.node_lookup(path)
973
            if node is None:
974
                return
975
            hashes = []
976
            size = 0
977
            serials = []
978
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
979
            hashes += h
980
            size += s
981
            serials += v
982
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
983
            hashes += h
984
            size += s
985
            serials += v
986
            for h in hashes:
987
                self.store.map_delete(h)
988
            self.node.node_purge(node, until, CLUSTER_DELETED)
989
            try:
990
                props = self._get_version(node)
991
            except NameError:
992
                self.permissions.access_clear(path)
993
            self._report_size_change(user, account, -size,
994
                                    {'action': 'object purge', 'path': path,
995
                                     'versions': ','.join(str(i) for i in serials)})
996
            return
997

    
998
        path, node = self._lookup_object(account, container, name)
999
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1000
        del_size = self._apply_versioning(account, container, src_version_id)
1001
        self._report_size_change(user, account, -del_size,
1002
                                 {'action': 'object delete', 'path': path,
1003
                                  'versions': ','.join([str(dest_version_id)])})
1004
        self._report_object_change(
1005
            user, account, path, details={'action': 'object delete'})
1006
        self.permissions.access_clear(path)
1007

    
1008
        if delimiter:
1009
            prefix = name + delimiter if not name.endswith(delimiter) else name
1010
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1011
            paths = []
1012
            for t in src_names:
1013
                path = '/'.join((account, container, t[0]))
1014
                node = t[2]
1015
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1016
                del_size = self._apply_versioning(
1017
                    account, container, src_version_id)
1018
                self._report_size_change(user, account, -del_size,
1019
                                         {'action': 'object delete',
1020
                                          'path': path,
1021
                                          'versions': ','.join([str(dest_version_id)])})
1022
                self._report_object_change(
1023
                    user, account, path, details={'action': 'object delete'})
1024
                paths.append(path)
1025
            self.permissions.access_clear_bulk(paths)
1026

    
1027
    @backend_method
1028
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1029
        """Delete/purge an object."""
1030

    
1031
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1032
                     account, container, name, until, prefix, delimiter)
1033
        self._delete_object(user, account, container, name, until, delimiter)
1034

    
1035
    @backend_method
1036
    def list_versions(self, user, account, container, name):
1037
        """Return a list of all (version, version_timestamp) tuples for an object."""
1038

    
1039
        logger.debug(
1040
            "list_versions: %s %s %s %s", user, account, container, name)
1041
        self._can_read(user, account, container, name)
1042
        path, node = self._lookup_object(account, container, name)
1043
        versions = self.node.node_get_versions(node)
1044
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1045

    
1046
    @backend_method
1047
    def get_uuid(self, user, uuid):
1048
        """Return the (account, container, name) for the UUID given."""
1049

    
1050
        logger.debug("get_uuid: %s %s", user, uuid)
1051
        info = self.node.latest_uuid(uuid)
1052
        if info is None:
1053
            raise NameError
1054
        path, serial = info
1055
        account, container, name = path.split('/', 2)
1056
        self._can_read(user, account, container, name)
1057
        return (account, container, name)
1058

    
1059
    @backend_method
1060
    def get_public(self, user, public):
1061
        """Return the (account, container, name) for the public id given."""
1062

    
1063
        logger.debug("get_public: %s %s", user, public)
1064
        if public is None or public < ULTIMATE_ANSWER:
1065
            raise NameError
1066
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1067
        if path is None:
1068
            raise NameError
1069
        account, container, name = path.split('/', 2)
1070
        self._can_read(user, account, container, name)
1071
        return (account, container, name)
1072

    
1073
    @backend_method(autocommit=0)
1074
    def get_block(self, hash):
1075
        """Return a block's data."""
1076

    
1077
        logger.debug("get_block: %s", hash)
1078
        block = self.store.block_get(binascii.unhexlify(hash))
1079
        if not block:
1080
            raise ItemNotExists('Block does not exist')
1081
        return block
1082

    
1083
    @backend_method(autocommit=0)
1084
    def put_block(self, data):
1085
        """Store a block and return the hash."""
1086

    
1087
        logger.debug("put_block: %s", len(data))
1088
        return binascii.hexlify(self.store.block_put(data))
1089

    
1090
    @backend_method(autocommit=0)
1091
    def update_block(self, hash, data, offset=0):
1092
        """Update a known block and return the hash."""
1093

    
1094
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1095
        if offset == 0 and len(data) == self.block_size:
1096
            return self.put_block(data)
1097
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1098
        return binascii.hexlify(h)
1099

    
1100
    # Path functions.
1101

    
1102
    def _generate_uuid(self):
1103
        return str(uuidlib.uuid4())
1104

    
1105
    def _put_object_node(self, path, parent, name):
1106
        path = '/'.join((path, name))
1107
        node = self.node.node_lookup(path)
1108
        if node is None:
1109
            node = self.node.node_create(parent, path)
1110
        return path, node
1111

    
1112
    def _put_path(self, user, parent, path):
1113
        node = self.node.node_create(parent, path)
1114
        self.node.version_create(node, None, 0, '', None, user,
1115
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1116
        return node
1117

    
1118
    def _lookup_account(self, account, create=True):
1119
        node = self.node.node_lookup(account)
1120
        if node is None and create:
1121
            node = self._put_path(
1122
                account, self.ROOTNODE, account)  # User is account.
1123
        return account, node
1124

    
1125
    def _lookup_container(self, account, container):
1126
        path = '/'.join((account, container))
1127
        node = self.node.node_lookup(path)
1128
        if node is None:
1129
            raise ItemNotExists('Container does not exist')
1130
        return path, node
1131

    
1132
    def _lookup_object(self, account, container, name):
1133
        path = '/'.join((account, container, name))
1134
        node = self.node.node_lookup(path)
1135
        if node is None:
1136
            raise ItemNotExists('Object does not exist')
1137
        return path, node
1138

    
1139
    def _lookup_objects(self, paths):
1140
        nodes = self.node.node_lookup_bulk(paths)
1141
        return paths, nodes
1142

    
1143
    def _get_properties(self, node, until=None):
1144
        """Return properties until the timestamp given."""
1145

    
1146
        before = until if until is not None else inf
1147
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1148
        if props is None and until is not None:
1149
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1150
        if props is None:
1151
            raise ItemNotExists('Path does not exist')
1152
        return props
1153

    
1154
    def _get_statistics(self, node, until=None):
1155
        """Return count, sum of size and latest timestamp of everything under node."""
1156

    
1157
        if until is None:
1158
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1159
        else:
1160
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1161
        if stats is None:
1162
            stats = (0, 0, 0)
1163
        return stats
1164

    
1165
    def _get_version(self, node, version=None):
1166
        if version is None:
1167
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1168
            if props is None:
1169
                raise ItemNotExists('Object does not exist')
1170
        else:
1171
            try:
1172
                version = int(version)
1173
            except ValueError:
1174
                raise VersionNotExists('Version does not exist')
1175
            props = self.node.version_get_properties(version)
1176
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1177
                raise VersionNotExists('Version does not exist')
1178
        return props
1179

    
1180
    def _get_versions(self, nodes):
1181
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1182

    
1183
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1184
        """Create a new version of the node."""
1185

    
1186
        props = self.node.version_lookup(
1187
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1188
        if props is not None:
1189
            src_version_id = props[self.SERIAL]
1190
            src_hash = props[self.HASH]
1191
            src_size = props[self.SIZE]
1192
            src_type = props[self.TYPE]
1193
            src_checksum = props[self.CHECKSUM]
1194
        else:
1195
            src_version_id = None
1196
            src_hash = None
1197
            src_size = 0
1198
            src_type = ''
1199
            src_checksum = ''
1200
        if size is None:  # Set metadata.
1201
            hash = src_hash  # This way hash can be set to None (account or container).
1202
            size = src_size
1203
        if type is None:
1204
            type = src_type
1205
        if checksum is None:
1206
            checksum = src_checksum
1207
        uuid = self._generate_uuid(
1208
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1209

    
1210
        if src_node is None:
1211
            pre_version_id = src_version_id
1212
        else:
1213
            pre_version_id = None
1214
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1215
            if props is not None:
1216
                pre_version_id = props[self.SERIAL]
1217
        if pre_version_id is not None:
1218
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1219

    
1220
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1221
        return pre_version_id, dest_version_id
1222

    
1223
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1224
        if src_version_id is not None:
1225
            self.node.attribute_copy(src_version_id, dest_version_id)
1226
        if not replace:
1227
            self.node.attribute_del(dest_version_id, domain, (
1228
                k for k, v in meta.iteritems() if v == ''))
1229
            self.node.attribute_set(dest_version_id, domain, (
1230
                (k, v) for k, v in meta.iteritems() if v != ''))
1231
        else:
1232
            self.node.attribute_del(dest_version_id, domain)
1233
            self.node.attribute_set(dest_version_id, domain, ((
1234
                k, v) for k, v in meta.iteritems()))
1235

    
1236
    def _put_metadata(self, user, node, domain, meta, replace=False):
1237
        """Create a new version and store metadata."""
1238

    
1239
        src_version_id, dest_version_id = self._put_version_duplicate(
1240
            user, node)
1241
        self._put_metadata_duplicate(
1242
            src_version_id, dest_version_id, domain, meta, replace)
1243
        return src_version_id, dest_version_id
1244

    
1245
    def _list_limits(self, listing, marker, limit):
1246
        start = 0
1247
        if marker:
1248
            try:
1249
                start = listing.index(marker) + 1
1250
            except ValueError:
1251
                pass
1252
        if not limit or limit > 10000:
1253
            limit = 10000
1254
        return start, limit
1255

    
1256
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1257
        cont_prefix = path + '/'
1258
        prefix = cont_prefix + prefix
1259
        start = cont_prefix + marker if marker else None
1260
        before = until if until is not None else inf
1261
        filterq = keys if domain else []
1262
        sizeq = size_range
1263

    
1264
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1265
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1266
        objects.sort(key=lambda x: x[0])
1267
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1268
        return objects
1269

    
1270
    # Reporting functions.
1271

    
1272
    def _report_size_change(self, user, account, size, details={}):
1273
        account_node = self._lookup_account(account, True)[1]
1274
        total = self._get_statistics(account_node)[1]
1275
        details.update({'user': user, 'total': total})
1276
        logger.debug(
1277
            "_report_size_change: %s %s %s %s", user, account, size, details)
1278
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1279
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1280
                              float(size), details))
1281

    
1282
        if not self.using_external_quotaholder:
1283
            return
1284
        
1285
        serial = self.quotaholder.issue_commission(
1286
                context     =   {},
1287
                target      =   user.uuid,
1288
                key         =   '1',
1289
                clientkey   =   'pithos',
1290
                ownerkey    =   '',
1291
                        name        =   details['path'] if 'path' in details else '',
1292
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1293
        )
1294
        self.serials.append(serial)
1295

    
1296
    def _report_object_change(self, user, account, path, details={}):
1297
        details.update({'user': user})
1298
        logger.debug("_report_object_change: %s %s %s %s", user,
1299
                     account, path, details)
1300
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1301
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1302

    
1303
    def _report_sharing_change(self, user, account, path, details={}):
1304
        logger.debug("_report_permissions_change: %s %s %s %s",
1305
                     user, account, path, details)
1306
        details.update({'user': user})
1307
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1308
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1309

    
1310
    # Policy functions.
1311

    
1312
    def _check_policy(self, policy):
1313
        for k in policy.keys():
1314
            if policy[k] == '':
1315
                policy[k] = self.default_policy.get(k)
1316
        for k, v in policy.iteritems():
1317
            if k == 'quota':
1318
                q = int(v)  # May raise ValueError.
1319
                if q < 0:
1320
                    raise ValueError
1321
            elif k == 'versioning':
1322
                if v not in ['auto', 'none']:
1323
                    raise ValueError
1324
            else:
1325
                raise ValueError
1326

    
1327
    def _put_policy(self, node, policy, replace):
1328
        if replace:
1329
            for k, v in self.default_policy.iteritems():
1330
                if k not in policy:
1331
                    policy[k] = v
1332
        self.node.policy_set(node, policy)
1333

    
1334
    def _get_policy(self, node):
1335
        policy = self.default_policy.copy()
1336
        policy.update(self.node.policy_get(node))
1337
        return policy
1338

    
1339
    def _apply_versioning(self, account, container, version_id):
1340
        """Delete the provided version if such is the policy.
1341
           Return size of object removed.
1342
        """
1343

    
1344
        if version_id is None:
1345
            return 0
1346
        path, node = self._lookup_container(account, container)
1347
        versioning = self._get_policy(node)['versioning']
1348
        if versioning != 'auto':
1349
            hash, size = self.node.version_remove(version_id)
1350
            self.store.map_delete(hash)
1351
            return size
1352
        elif self.free_versioning:
1353
            return self.node.version_get_properties(
1354
                version_id, keys=('size',))[0]
1355
        return 0
1356

    
1357
    # Access control functions.
1358

    
1359
    def _check_groups(self, groups):
1360
        # raise ValueError('Bad characters in groups')
1361
        pass
1362

    
1363
    def _check_permissions(self, path, permissions):
1364
        # raise ValueError('Bad characters in permissions')
1365
        pass
1366

    
1367
    def _get_formatted_paths(self, paths):
1368
        formatted = []
1369
        for p in paths:
1370
            node = self.node.node_lookup(p)
1371
            props = None
1372
            if node is not None:
1373
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1374
            if props is not None:
1375
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1376
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1377
                formatted.append((p, self.MATCH_EXACT))
1378
        return formatted
1379

    
1380
    def _get_permissions_path(self, account, container, name):
1381
        path = '/'.join((account, container, name))
1382
        permission_paths = self.permissions.access_inherit(path)
1383
        permission_paths.sort()
1384
        permission_paths.reverse()
1385
        for p in permission_paths:
1386
            if p == path:
1387
                return p
1388
            else:
1389
                if p.count('/') < 2:
1390
                    continue
1391
                node = self.node.node_lookup(p)
1392
                props = None
1393
                if node is not None:
1394
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1395
                if props is not None:
1396
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1397
                        return p
1398
        return None
1399

    
1400
    def _can_read(self, user, account, container, name):
1401
        if user == account:
1402
            return True
1403
        path = '/'.join((account, container, name))
1404
        if self.permissions.public_get(path) is not None:
1405
            return True
1406
        path = self._get_permissions_path(account, container, name)
1407
        if not path:
1408
            raise NotAllowedError
1409
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1410
            raise NotAllowedError
1411

    
1412
    def _can_write(self, user, account, container, name):
1413
        if user == account:
1414
            return True
1415
        path = '/'.join((account, container, name))
1416
        path = self._get_permissions_path(account, container, name)
1417
        if not path:
1418
            raise NotAllowedError
1419
        if not self.permissions.access_check(path, self.WRITE, user):
1420
            raise NotAllowedError
1421

    
1422
    def _allowed_accounts(self, user):
1423
        allow = set()
1424
        for path in self.permissions.access_list_paths(user):
1425
            allow.add(path.split('/', 1)[0])
1426
        return sorted(allow)
1427

    
1428
    def _allowed_containers(self, user, account):
1429
        allow = set()
1430
        for path in self.permissions.access_list_paths(user, account):
1431
            allow.add(path.split('/', 2)[1])
1432
        return sorted(allow)