Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 6eb0de11

History | View | Annotate | Download (62.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87

    
88
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
89
QUEUE_CLIENT_ID = 'pithos'
90
QUEUE_INSTANCE_ID = '1'
91

    
92
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
93

    
94
inf = float('inf')
95

    
96
ULTIMATE_ANSWER = 42
97

    
98

    
99
logger = logging.getLogger(__name__)
100

    
101

    
102
def backend_method(func=None, autocommit=1):
103
    if func is None:
104
        def fn(func):
105
            return backend_method(func, autocommit)
106
        return fn
107

    
108
    if not autocommit:
109
        return func
110

    
111
    def fn(self, *args, **kw):
112
        self.wrapper.execute()
113
        serials = []
114
        self.serials = serials
115
        self.messages = []
116

    
117
        try:
118
            ret = func(self, *args, **kw)
119
            for m in self.messages:
120
                self.queue.send(*m)
121
            if serials:
122
                self.quotaholder.accept_commission(
123
                            context     =   {},
124
                            clientkey   =   'pithos',
125
                            serials     =   serials)
126
            self.wrapper.commit()
127
            return ret
128
        except:
129
            if serials:
130
                self.quotaholder.reject_commission(
131
                            context     =   {},
132
                            clientkey   =   'pithos',
133
                            serials     =   serials)
134
            self.wrapper.rollback()
135
            raise
136
    return fn
137

    
138

    
139
class ModularBackend(BaseBackend):
140
    """A modular backend.
141

142
    Uses modules for SQL functions and storage.
143
    """
144

    
145
    def __init__(self, db_module=None, db_connection=None,
146
                 block_module=None, block_path=None, block_umask=None,
147
                 queue_module=None, queue_hosts=None, queue_exchange=None,
148
                 quotaholder_url=None, quotaholder_token=None,
149
                 free_versioning=True, block_params=None):
150
        db_module = db_module or DEFAULT_DB_MODULE
151
        db_connection = db_connection or DEFAULT_DB_CONNECTION
152
        block_module = block_module or DEFAULT_BLOCK_MODULE
153
        block_path = block_path or DEFAULT_BLOCK_PATH
154
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
155
        block_params = block_params or DEFAULT_BLOCK_PARAMS
156
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
157

    
158
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
159
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
160
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
161

    
162
        self.hash_algorithm = 'sha256'
163
        self.block_size = 4 * 1024 * 1024  # 4MB
164
        self.free_versioning = free_versioning
165

    
166
        self.default_policy = {'quota': DEFAULT_QUOTA,
167
                               'versioning': DEFAULT_VERSIONING}
168

    
169
        def load_module(m):
170
            __import__(m)
171
            return sys.modules[m]
172

    
173
        self.db_module = load_module(db_module)
174
        self.wrapper = self.db_module.DBWrapper(db_connection)
175
        params = {'wrapper': self.wrapper}
176
        self.permissions = self.db_module.Permissions(**params)
177
        self.config = self.db_module.Config(**params)
178
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
179
        for x in ['READ', 'WRITE']:
180
            setattr(self, x, getattr(self.db_module, x))
181
        self.node = self.db_module.Node(**params)
182
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
183
            setattr(self, x, getattr(self.db_module, x))
184

    
185
        self.block_module = load_module(block_module)
186
        self.block_params = block_params
187
        params = {'path': block_path,
188
                  'block_size': self.block_size,
189
                  'hash_algorithm': self.hash_algorithm,
190
                  'umask': block_umask}
191
        params.update(self.block_params)
192
        self.store = self.block_module.Store(**params)
193

    
194
        if queue_module and queue_hosts:
195
            self.queue_module = load_module(queue_module)
196
            params = {'hosts': queue_hosts,
197
                      'exchange': queue_exchange,
198
                      'client_id': QUEUE_CLIENT_ID}
199
            self.queue = self.queue_module.Queue(**params)
200
        else:
201
            class NoQueue:
202
                def send(self, *args):
203
                    pass
204

    
205
                def close(self):
206
                    pass
207

    
208
            self.queue = NoQueue()
209

    
210
        self.quotaholder_url = quotaholder_url
211
        self.quotaholder_token = quotaholder_token
212
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
213
        self.serials = []
214
        self.messages = []
215

    
216
    def close(self):
217
        self.wrapper.close()
218
        self.queue.close()
219

    
220
    @property
221
    def using_external_quotaholder(self):
222
        if self.quotaholder_url:
223
            return True
224
        return False
225

    
226
    @backend_method
227
    def list_accounts(self, user, marker=None, limit=10000):
228
        """Return a list of accounts the user can access."""
229

    
230
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
231
        allowed = self._allowed_accounts(user)
232
        start, limit = self._list_limits(allowed, marker, limit)
233
        return allowed[start:start + limit]
234

    
235
    @backend_method
236
    def get_account_meta(
237
            self, user, account, domain, until=None, include_user_defined=True,
238
            external_quota=None):
239
        """Return a dictionary with the account metadata for the domain."""
240

    
241
        logger.debug(
242
            "get_account_meta: %s %s %s %s", user, account, domain, until)
243
        path, node = self._lookup_account(account, user == account)
244
        if user != account:
245
            if until or node is None or account not in self._allowed_accounts(user):
246
                raise NotAllowedError
247
        try:
248
            props = self._get_properties(node, until)
249
            mtime = props[self.MTIME]
250
        except NameError:
251
            props = None
252
            mtime = until
253
        count, bytes, tstamp = self._get_statistics(node, until)
254
        tstamp = max(tstamp, mtime)
255
        if until is None:
256
            modified = tstamp
257
        else:
258
            modified = self._get_statistics(
259
                node)[2]  # Overall last modification.
260
            modified = max(modified, mtime)
261

    
262
        if user != account:
263
            meta = {'name': account}
264
        else:
265
            meta = {}
266
            if props is not None and include_user_defined:
267
                meta.update(
268
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
269
            if until is not None:
270
                meta.update({'until_timestamp': tstamp})
271
            meta.update({'name': account, 'count': count, 'bytes': bytes})
272
            if self.using_external_quotaholder:
273
                external_quota = external_quota or {}
274
                meta['bytes'] = external_quota.get('currValue', 0)
275
        meta.update({'modified': modified})
276
        return meta
277

    
278
    @backend_method
279
    def update_account_meta(self, user, account, domain, meta, replace=False):
280
        """Update the metadata associated with the account for the domain."""
281

    
282
        logger.debug("update_account_meta: %s %s %s %s %s", user,
283
                     account, domain, meta, replace)
284
        if user != account:
285
            raise NotAllowedError
286
        path, node = self._lookup_account(account, True)
287
        self._put_metadata(user, node, domain, meta, replace)
288

    
289
    @backend_method
290
    def get_account_groups(self, user, account):
291
        """Return a dictionary with the user groups defined for this account."""
292

    
293
        logger.debug("get_account_groups: %s %s", user, account)
294
        if user != account:
295
            if account not in self._allowed_accounts(user):
296
                raise NotAllowedError
297
            return {}
298
        self._lookup_account(account, True)
299
        return self.permissions.group_dict(account)
300

    
301
    @backend_method
302
    def update_account_groups(self, user, account, groups, replace=False):
303
        """Update the groups associated with the account."""
304

    
305
        logger.debug("update_account_groups: %s %s %s %s", user,
306
                     account, groups, replace)
307
        if user != account:
308
            raise NotAllowedError
309
        self._lookup_account(account, True)
310
        self._check_groups(groups)
311
        if replace:
312
            self.permissions.group_destroy(account)
313
        for k, v in groups.iteritems():
314
            if not replace:  # If not already deleted.
315
                self.permissions.group_delete(account, k)
316
            if v:
317
                self.permissions.group_addmany(account, k, v)
318

    
319
    @backend_method
320
    def get_account_policy(self, user, account, external_quota=None):
321
        """Return a dictionary with the account policy."""
322

    
323
        logger.debug("get_account_policy: %s %s", user, account)
324
        if user != account:
325
            if account not in self._allowed_accounts(user):
326
                raise NotAllowedError
327
            return {}
328
        path, node = self._lookup_account(account, True)
329
        policy = self._get_policy(node)
330
        if self.using_external_quotaholder:
331
            external_quota = external_quota or {}
332
            policy['quota'] = external_quota.get('maxValue', 0)
333
        return policy
334

    
335
    @backend_method
336
    def update_account_policy(self, user, account, policy, replace=False):
337
        """Update the policy associated with the account."""
338

    
339
        logger.debug("update_account_policy: %s %s %s %s", user,
340
                     account, policy, replace)
341
        if user != account:
342
            raise NotAllowedError
343
        path, node = self._lookup_account(account, True)
344
        self._check_policy(policy)
345
        self._put_policy(node, policy, replace)
346

    
347
    @backend_method
348
    def put_account(self, user, account, policy={}):
349
        """Create a new account with the given name."""
350

    
351
        logger.debug("put_account: %s %s %s", user, account, policy)
352
        if user != account:
353
            raise NotAllowedError
354
        node = self.node.node_lookup(account)
355
        if node is not None:
356
            raise AccountExists('Account already exists')
357
        if policy:
358
            self._check_policy(policy)
359
        node = self._put_path(user, self.ROOTNODE, account)
360
        self._put_policy(node, policy, True)
361

    
362
    @backend_method
363
    def delete_account(self, user, account):
364
        """Delete the account with the given name."""
365

    
366
        logger.debug("delete_account: %s %s", user, account)
367
        if user != account:
368
            raise NotAllowedError
369
        node = self.node.node_lookup(account)
370
        if node is None:
371
            return
372
        if not self.node.node_remove(node):
373
            raise AccountNotEmpty('Account is not empty')
374
        self.permissions.group_destroy(account)
375

    
376
    @backend_method
377
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
378
        """Return a list of containers existing under an account."""
379

    
380
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
381
                     account, marker, limit, shared, until, public)
382
        if user != account:
383
            if until or account not in self._allowed_accounts(user):
384
                raise NotAllowedError
385
            allowed = self._allowed_containers(user, account)
386
            start, limit = self._list_limits(allowed, marker, limit)
387
            return allowed[start:start + limit]
388
        if shared or public:
389
            allowed = set()
390
            if shared:
391
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
392
            if public:
393
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
394
            allowed = sorted(allowed)
395
            start, limit = self._list_limits(allowed, marker, limit)
396
            return allowed[start:start + limit]
397
        node = self.node.node_lookup(account)
398
        containers = [x[0] for x in self._list_object_properties(
399
            node, account, '', '/', marker, limit, False, None, [], until)]
400
        start, limit = self._list_limits(
401
            [x[0] for x in containers], marker, limit)
402
        return containers[start:start + limit]
403

    
404
    @backend_method
405
    def list_container_meta(self, user, account, container, domain, until=None):
406
        """Return a list with all the container's object meta keys for the domain."""
407

    
408
        logger.debug("list_container_meta: %s %s %s %s %s", user,
409
                     account, container, domain, until)
410
        allowed = []
411
        if user != account:
412
            if until:
413
                raise NotAllowedError
414
            allowed = self.permissions.access_list_paths(
415
                user, '/'.join((account, container)))
416
            if not allowed:
417
                raise NotAllowedError
418
        path, node = self._lookup_container(account, container)
419
        before = until if until is not None else inf
420
        allowed = self._get_formatted_paths(allowed)
421
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
422

    
423
    @backend_method
424
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
425
        """Return a dictionary with the container metadata for the domain."""
426

    
427
        logger.debug("get_container_meta: %s %s %s %s %s", user,
428
                     account, container, domain, until)
429
        if user != account:
430
            if until or container not in self._allowed_containers(user, account):
431
                raise NotAllowedError
432
        path, node = self._lookup_container(account, container)
433
        props = self._get_properties(node, until)
434
        mtime = props[self.MTIME]
435
        count, bytes, tstamp = self._get_statistics(node, until)
436
        tstamp = max(tstamp, mtime)
437
        if until is None:
438
            modified = tstamp
439
        else:
440
            modified = self._get_statistics(
441
                node)[2]  # Overall last modification.
442
            modified = max(modified, mtime)
443

    
444
        if user != account:
445
            meta = {'name': container}
446
        else:
447
            meta = {}
448
            if include_user_defined:
449
                meta.update(
450
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
451
            if until is not None:
452
                meta.update({'until_timestamp': tstamp})
453
            meta.update({'name': container, 'count': count, 'bytes': bytes})
454
        meta.update({'modified': modified})
455
        return meta
456

    
457
    @backend_method
458
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
459
        """Update the metadata associated with the container for the domain."""
460

    
461
        logger.debug("update_container_meta: %s %s %s %s %s %s",
462
                     user, account, container, domain, meta, replace)
463
        if user != account:
464
            raise NotAllowedError
465
        path, node = self._lookup_container(account, container)
466
        src_version_id, dest_version_id = self._put_metadata(
467
            user, node, domain, meta, replace)
468
        if src_version_id is not None:
469
            versioning = self._get_policy(node)['versioning']
470
            if versioning != 'auto':
471
                self.node.version_remove(src_version_id)
472

    
473
    @backend_method
474
    def get_container_policy(self, user, account, container):
475
        """Return a dictionary with the container policy."""
476

    
477
        logger.debug(
478
            "get_container_policy: %s %s %s", user, account, container)
479
        if user != account:
480
            if container not in self._allowed_containers(user, account):
481
                raise NotAllowedError
482
            return {}
483
        path, node = self._lookup_container(account, container)
484
        return self._get_policy(node)
485

    
486
    @backend_method
487
    def update_container_policy(self, user, account, container, policy, replace=False):
488
        """Update the policy associated with the container."""
489

    
490
        logger.debug("update_container_policy: %s %s %s %s %s",
491
                     user, account, container, policy, replace)
492
        if user != account:
493
            raise NotAllowedError
494
        path, node = self._lookup_container(account, container)
495
        self._check_policy(policy)
496
        self._put_policy(node, policy, replace)
497

    
498
    @backend_method
499
    def put_container(self, user, account, container, policy={}):
500
        """Create a new container with the given name."""
501

    
502
        logger.debug(
503
            "put_container: %s %s %s %s", user, account, container, policy)
504
        if user != account:
505
            raise NotAllowedError
506
        try:
507
            path, node = self._lookup_container(account, container)
508
        except NameError:
509
            pass
510
        else:
511
            raise ContainerExists('Container already exists')
512
        if policy:
513
            self._check_policy(policy)
514
        path = '/'.join((account, container))
515
        node = self._put_path(
516
            user, self._lookup_account(account, True)[1], path)
517
        self._put_policy(node, policy, True)
518

    
519
    @backend_method
520
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
521
        """Delete/purge the container with the given name."""
522

    
523
        logger.debug("delete_container: %s %s %s %s %s %s", user,
524
                     account, container, until, prefix, delimiter)
525
        if user != account:
526
            raise NotAllowedError
527
        path, node = self._lookup_container(account, container)
528

    
529
        if until is not None:
530
            hashes, size, serials = self.node.node_purge_children(
531
                node, until, CLUSTER_HISTORY)
532
            for h in hashes:
533
                self.store.map_delete(h)
534
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
535
            self._report_size_change(user, account, -size,
536
                                     {'action':'container purge', 'path': path,
537
                                      'versions': ','.join(str(i) for i in serials)})
538
            return
539

    
540
        if not delimiter:
541
            if self._get_statistics(node)[0] > 0:
542
                raise ContainerNotEmpty('Container is not empty')
543
            hashes, size, serials = self.node.node_purge_children(
544
                node, inf, CLUSTER_HISTORY)
545
            for h in hashes:
546
                self.store.map_delete(h)
547
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
548
            self.node.node_remove(node)
549
            self._report_size_change(user, account, -size,
550
                                     {'action': 'container delete',
551
                                      'path': path,
552
                                      'versions': ','.join(str(i) for i in serials)})
553
        else:
554
            # remove only contents
555
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
556
            paths = []
557
            for t in src_names:
558
                path = '/'.join((account, container, t[0]))
559
                node = t[2]
560
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
561
                del_size = self._apply_versioning(
562
                    account, container, src_version_id)
563
                self._report_size_change(
564
                        user, account, -del_size, {
565
                                'action': 'object delete',
566
                                'path': path,
567
                        'versions': ','.join([str(dest_version_id)])
568
                     }
569
                )
570
                self._report_object_change(
571
                    user, account, path, details={'action': 'object delete'})
572
                paths.append(path)
573
            self.permissions.access_clear_bulk(paths)
574

    
575
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
576
        if user != account and until:
577
            raise NotAllowedError
578
        if shared and public:
579
            # get shared first
580
            shared = self._list_object_permissions(
581
                user, account, container, prefix, shared=True, public=False)
582
            objects = set()
583
            if shared:
584
                path, node = self._lookup_container(account, container)
585
                shared = self._get_formatted_paths(shared)
586
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
587

    
588
            # get public
589
            objects |= set(self._list_public_object_properties(
590
                user, account, container, prefix, all_props))
591
            objects = list(objects)
592

    
593
            objects.sort(key=lambda x: x[0])
594
            start, limit = self._list_limits(
595
                [x[0] for x in objects], marker, limit)
596
            return objects[start:start + limit]
597
        elif public:
598
            objects = self._list_public_object_properties(
599
                user, account, container, prefix, all_props)
600
            start, limit = self._list_limits(
601
                [x[0] for x in objects], marker, limit)
602
            return objects[start:start + limit]
603

    
604
        allowed = self._list_object_permissions(
605
            user, account, container, prefix, shared, public)
606
        if shared and not allowed:
607
            return []
608
        path, node = self._lookup_container(account, container)
609
        allowed = self._get_formatted_paths(allowed)
610
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
611
        start, limit = self._list_limits(
612
            [x[0] for x in objects], marker, limit)
613
        return objects[start:start + limit]
614

    
615
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
616
        public = self._list_object_permissions(
617
            user, account, container, prefix, shared=False, public=True)
618
        paths, nodes = self._lookup_objects(public)
619
        path = '/'.join((account, container))
620
        cont_prefix = path + '/'
621
        paths = [x[len(cont_prefix):] for x in paths]
622
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
623
        objects = [(path,) + props for path, props in zip(paths, props)]
624
        return objects
625

    
626
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
627
        objects = []
628
        while True:
629
            marker = objects[-1] if objects else None
630
            limit = 10000
631
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
632
            objects.extend(l)
633
            if not l or len(l) < limit:
634
                break
635
        return objects
636

    
637
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
638
        allowed = []
639
        path = '/'.join((account, container, prefix)).rstrip('/')
640
        if user != account:
641
            allowed = self.permissions.access_list_paths(user, path)
642
            if not allowed:
643
                raise NotAllowedError
644
        else:
645
            allowed = set()
646
            if shared:
647
                allowed.update(self.permissions.access_list_shared(path))
648
            if public:
649
                allowed.update(
650
                    [x[0] for x in self.permissions.public_list(path)])
651
            allowed = sorted(allowed)
652
            if not allowed:
653
                return []
654
        return allowed
655

    
656
    @backend_method
657
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
658
        """Return a list of object (name, version_id) tuples existing under a container."""
659

    
660
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
661
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
662

    
663
    @backend_method
664
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
665
        """Return a list of object metadata dicts existing under a container."""
666

    
667
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
668
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
669
        objects = []
670
        for p in props:
671
            if len(p) == 2:
672
                objects.append({'subdir': p[0]})
673
            else:
674
                objects.append({'name': p[0],
675
                                'bytes': p[self.SIZE + 1],
676
                                'type': p[self.TYPE + 1],
677
                                'hash': p[self.HASH + 1],
678
                                'version': p[self.SERIAL + 1],
679
                                'version_timestamp': p[self.MTIME + 1],
680
                                'modified': p[self.MTIME + 1] if until is None else None,
681
                                'modified_by': p[self.MUSER + 1],
682
                                'uuid': p[self.UUID + 1],
683
                                'checksum': p[self.CHECKSUM + 1]})
684
        return objects
685

    
686
    @backend_method
687
    def list_object_permissions(self, user, account, container, prefix=''):
688
        """Return a list of paths that enforce permissions under a container."""
689

    
690
        logger.debug("list_object_permissions: %s %s %s %s", user,
691
                     account, container, prefix)
692
        return self._list_object_permissions(user, account, container, prefix, True, False)
693

    
694
    @backend_method
695
    def list_object_public(self, user, account, container, prefix=''):
696
        """Return a dict mapping paths to public ids for objects that are public under a container."""
697

    
698
        logger.debug("list_object_public: %s %s %s %s", user,
699
                     account, container, prefix)
700
        public = {}
701
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
702
            public[path] = p + ULTIMATE_ANSWER
703
        return public
704

    
705
    @backend_method
706
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
707
        """Return a dictionary with the object metadata for the domain."""
708

    
709
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
710
                     account, container, name, domain, version)
711
        self._can_read(user, account, container, name)
712
        path, node = self._lookup_object(account, container, name)
713
        props = self._get_version(node, version)
714
        if version is None:
715
            modified = props[self.MTIME]
716
        else:
717
            try:
718
                modified = self._get_version(
719
                    node)[self.MTIME]  # Overall last modification.
720
            except NameError:  # Object may be deleted.
721
                del_props = self.node.version_lookup(
722
                    node, inf, CLUSTER_DELETED)
723
                if del_props is None:
724
                    raise ItemNotExists('Object does not exist')
725
                modified = del_props[self.MTIME]
726

    
727
        meta = {}
728
        if include_user_defined:
729
            meta.update(
730
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
731
        meta.update({'name': name,
732
                     'bytes': props[self.SIZE],
733
                     'type': props[self.TYPE],
734
                     'hash': props[self.HASH],
735
                     'version': props[self.SERIAL],
736
                     'version_timestamp': props[self.MTIME],
737
                     'modified': modified,
738
                     'modified_by': props[self.MUSER],
739
                     'uuid': props[self.UUID],
740
                     'checksum': props[self.CHECKSUM]})
741
        return meta
742

    
743
    @backend_method
744
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
745
        """Update the metadata associated with the object for the domain and return the new version."""
746

    
747
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
748
                     user, account, container, name, domain, meta, replace)
749
        self._can_write(user, account, container, name)
750
        path, node = self._lookup_object(account, container, name)
751
        src_version_id, dest_version_id = self._put_metadata(
752
            user, node, domain, meta, replace)
753
        self._apply_versioning(account, container, src_version_id)
754
        return dest_version_id
755

    
756
    @backend_method
757
    def get_object_permissions(self, user, account, container, name):
758
        """Return the action allowed on the object, the path
759
        from which the object gets its permissions from,
760
        along with a dictionary containing the permissions."""
761

    
762
        logger.debug("get_object_permissions: %s %s %s %s", user,
763
                     account, container, name)
764
        allowed = 'write'
765
        permissions_path = self._get_permissions_path(account, container, name)
766
        if user != account:
767
            if self.permissions.access_check(permissions_path, self.WRITE, user):
768
                allowed = 'write'
769
            elif self.permissions.access_check(permissions_path, self.READ, user):
770
                allowed = 'read'
771
            else:
772
                raise NotAllowedError
773
        self._lookup_object(account, container, name)
774
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
775

    
776
    @backend_method
777
    def update_object_permissions(self, user, account, container, name, permissions):
778
        """Update the permissions associated with the object."""
779

    
780
        logger.debug("update_object_permissions: %s %s %s %s %s",
781
                     user, account, container, name, permissions)
782
        if user != account:
783
            raise NotAllowedError
784
        path = self._lookup_object(account, container, name)[0]
785
        self._check_permissions(path, permissions)
786
        self.permissions.access_set(path, permissions)
787
        self._report_sharing_change(user, account, path, {'members':
788
                                    self.permissions.access_members(path)})
789

    
790
    @backend_method
791
    def get_object_public(self, user, account, container, name):
792
        """Return the public id of the object if applicable."""
793

    
794
        logger.debug(
795
            "get_object_public: %s %s %s %s", user, account, container, name)
796
        self._can_read(user, account, container, name)
797
        path = self._lookup_object(account, container, name)[0]
798
        p = self.permissions.public_get(path)
799
        if p is not None:
800
            p += ULTIMATE_ANSWER
801
        return p
802

    
803
    @backend_method
804
    def update_object_public(self, user, account, container, name, public):
805
        """Update the public status of the object."""
806

    
807
        logger.debug("update_object_public: %s %s %s %s %s", user,
808
                     account, container, name, public)
809
        self._can_write(user, account, container, name)
810
        path = self._lookup_object(account, container, name)[0]
811
        if not public:
812
            self.permissions.public_unset(path)
813
        else:
814
            self.permissions.public_set(path)
815

    
816
    @backend_method
817
    def get_object_hashmap(self, user, account, container, name, version=None):
818
        """Return the object's size and a list with partial hashes."""
819

    
820
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
821
                     account, container, name, version)
822
        self._can_read(user, account, container, name)
823
        path, node = self._lookup_object(account, container, name)
824
        props = self._get_version(node, version)
825
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
826
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
827

    
828
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
829
        if permissions is not None and user != account:
830
            raise NotAllowedError
831
        self._can_write(user, account, container, name)
832
        if permissions is not None:
833
            path = '/'.join((account, container, name))
834
            self._check_permissions(path, permissions)
835

    
836
        account_path, account_node = self._lookup_account(account, True)
837
        container_path, container_node = self._lookup_container(
838
            account, container)
839
        path, node = self._put_object_node(
840
            container_path, container_node, name)
841
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
842

    
843
        # Handle meta.
844
        if src_version_id is None:
845
            src_version_id = pre_version_id
846
        self._put_metadata_duplicate(
847
            src_version_id, dest_version_id, domain, meta, replace_meta)
848

    
849
        del_size = self._apply_versioning(account, container, pre_version_id)
850
        size_delta = size - del_size
851
        if not self.using_external_quotaholder: # Check account quota.
852
            if size_delta > 0:
853
                account_quota = long(self._get_policy(account_node)['quota'])
854
                account_usage = self._get_statistics(account_node)[1] + size_delta
855
                if (account_quota > 0 and account_usage > account_quota):
856
                    logger.error('account_quota: %s, account_usage: %s' % (
857
                        account_quota, account_usage
858
                    ))
859
                    raise QuotaError
860

    
861
        # Check container quota.
862
        container_quota = long(self._get_policy(container_node)['quota'])
863
        container_usage = self._get_statistics(container_node)[1] + size_delta
864
        if (container_quota > 0 and container_usage > container_quota):
865
            # This must be executed in a transaction, so the version is
866
            # never created if it fails.
867
            logger.error('container_quota: %s, container_usage: %s' % (
868
                container_quota, container_usage
869
            ))
870
            raise QuotaError
871

    
872
        self._report_size_change(user, account, size_delta,
873
                                 {'action': 'object update', 'path': path,
874
                                  'versions': ','.join([str(dest_version_id)])})
875
        if permissions is not None:
876
            self.permissions.access_set(path, permissions)
877
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
878

    
879
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
880
        return dest_version_id
881

    
882
    @backend_method
883
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
884
        """Create/update an object with the specified size and partial hashes."""
885

    
886
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
887
                     account, container, name, size, type, hashmap, checksum)
888
        if size == 0:  # No such thing as an empty hashmap.
889
            hashmap = [self.put_block('')]
890
        map = HashMap(self.block_size, self.hash_algorithm)
891
        map.extend([binascii.unhexlify(x) for x in hashmap])
892
        missing = self.store.block_search(map)
893
        if missing:
894
            ie = IndexError()
895
            ie.data = [binascii.hexlify(x) for x in missing]
896
            raise ie
897

    
898
        hash = map.hash()
899
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
900
        self.store.map_put(hash, map)
901
        return dest_version_id
902

    
903
    @backend_method
904
    def update_object_checksum(self, user, account, container, name, version, checksum):
905
        """Update an object's checksum."""
906

    
907
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
908
                     user, account, container, name, version, checksum)
909
        # Update objects with greater version and same hashmap and size (fix metadata updates).
910
        self._can_write(user, account, container, name)
911
        path, node = self._lookup_object(account, container, name)
912
        props = self._get_version(node, version)
913
        versions = self.node.node_get_versions(node)
914
        for x in versions:
915
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
916
                self.node.version_put_property(
917
                    x[self.SERIAL], 'checksum', checksum)
918

    
919
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
920
        dest_version_ids = []
921
        self._can_read(user, src_account, src_container, src_name)
922
        path, node = self._lookup_object(src_account, src_container, src_name)
923
        # TODO: Will do another fetch of the properties in duplicate version...
924
        props = self._get_version(
925
            node, src_version)  # Check to see if source exists.
926
        src_version_id = props[self.SERIAL]
927
        hash = props[self.HASH]
928
        size = props[self.SIZE]
929
        is_copy = not is_move and (src_account, src_container, src_name) != (
930
            dest_account, dest_container, dest_name)  # New uuid.
931
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
932
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
933
            self._delete_object(user, src_account, src_container, src_name)
934

    
935
        if delimiter:
936
            prefix = src_name + \
937
                delimiter if not src_name.endswith(delimiter) else src_name
938
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
939
            src_names.sort(key=lambda x: x[2])  # order by nodes
940
            paths = [elem[0] for elem in src_names]
941
            nodes = [elem[2] for elem in src_names]
942
            # TODO: Will do another fetch of the properties in duplicate version...
943
            props = self._get_versions(nodes)  # Check to see if source exists.
944

    
945
            for prop, path, node in zip(props, paths, nodes):
946
                src_version_id = prop[self.SERIAL]
947
                hash = prop[self.HASH]
948
                vtype = prop[self.TYPE]
949
                size = prop[self.SIZE]
950
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
951
                    delimiter) else dest_name
952
                vdest_name = path.replace(prefix, dest_prefix, 1)
953
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
954
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
955
                    self._delete_object(user, src_account, src_container, path)
956
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
957

    
958
    @backend_method
959
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
960
        """Copy an object's data and metadata."""
961

    
962
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
963
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
964
        return dest_version_id
965

    
966
    @backend_method
967
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
968
        """Move an object's data and metadata."""
969

    
970
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
971
        if user != src_account:
972
            raise NotAllowedError
973
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
974
        return dest_version_id
975

    
976
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
977
        if user != account:
978
            raise NotAllowedError
979

    
980
        if until is not None:
981
            path = '/'.join((account, container, name))
982
            node = self.node.node_lookup(path)
983
            if node is None:
984
                return
985
            hashes = []
986
            size = 0
987
            serials = []
988
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
989
            hashes += h
990
            size += s
991
            serials += v
992
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
993
            hashes += h
994
            size += s
995
            serials += v
996
            for h in hashes:
997
                self.store.map_delete(h)
998
            self.node.node_purge(node, until, CLUSTER_DELETED)
999
            try:
1000
                props = self._get_version(node)
1001
            except NameError:
1002
                self.permissions.access_clear(path)
1003
            self._report_size_change(user, account, -size,
1004
                                    {'action': 'object purge', 'path': path,
1005
                                     'versions': ','.join(str(i) for i in serials)})
1006
            return
1007

    
1008
        path, node = self._lookup_object(account, container, name)
1009
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1010
        del_size = self._apply_versioning(account, container, src_version_id)
1011
        self._report_size_change(user, account, -del_size,
1012
                                 {'action': 'object delete', 'path': path,
1013
                                  'versions': ','.join([str(dest_version_id)])})
1014
        self._report_object_change(
1015
            user, account, path, details={'action': 'object delete'})
1016
        self.permissions.access_clear(path)
1017

    
1018
        if delimiter:
1019
            prefix = name + delimiter if not name.endswith(delimiter) else name
1020
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1021
            paths = []
1022
            for t in src_names:
1023
                path = '/'.join((account, container, t[0]))
1024
                node = t[2]
1025
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1026
                del_size = self._apply_versioning(
1027
                    account, container, src_version_id)
1028
                self._report_size_change(user, account, -del_size,
1029
                                         {'action': 'object delete',
1030
                                          'path': path,
1031
                                          'versions': ','.join([str(dest_version_id)])})
1032
                self._report_object_change(
1033
                    user, account, path, details={'action': 'object delete'})
1034
                paths.append(path)
1035
            self.permissions.access_clear_bulk(paths)
1036

    
1037
    @backend_method
1038
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1039
        """Delete/purge an object."""
1040

    
1041
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1042
                     account, container, name, until, prefix, delimiter)
1043
        self._delete_object(user, account, container, name, until, delimiter)
1044

    
1045
    @backend_method
1046
    def list_versions(self, user, account, container, name):
1047
        """Return a list of all (version, version_timestamp) tuples for an object."""
1048

    
1049
        logger.debug(
1050
            "list_versions: %s %s %s %s", user, account, container, name)
1051
        self._can_read(user, account, container, name)
1052
        path, node = self._lookup_object(account, container, name)
1053
        versions = self.node.node_get_versions(node)
1054
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1055

    
1056
    @backend_method
1057
    def get_uuid(self, user, uuid):
1058
        """Return the (account, container, name) for the UUID given."""
1059

    
1060
        logger.debug("get_uuid: %s %s", user, uuid)
1061
        info = self.node.latest_uuid(uuid)
1062
        if info is None:
1063
            raise NameError
1064
        path, serial = info
1065
        account, container, name = path.split('/', 2)
1066
        self._can_read(user, account, container, name)
1067
        return (account, container, name)
1068

    
1069
    @backend_method
1070
    def get_public(self, user, public):
1071
        """Return the (account, container, name) for the public id given."""
1072

    
1073
        logger.debug("get_public: %s %s", user, public)
1074
        if public is None or public < ULTIMATE_ANSWER:
1075
            raise NameError
1076
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1077
        if path is None:
1078
            raise NameError
1079
        account, container, name = path.split('/', 2)
1080
        self._can_read(user, account, container, name)
1081
        return (account, container, name)
1082

    
1083
    @backend_method(autocommit=0)
1084
    def get_block(self, hash):
1085
        """Return a block's data."""
1086

    
1087
        logger.debug("get_block: %s", hash)
1088
        block = self.store.block_get(binascii.unhexlify(hash))
1089
        if not block:
1090
            raise ItemNotExists('Block does not exist')
1091
        return block
1092

    
1093
    @backend_method(autocommit=0)
1094
    def put_block(self, data):
1095
        """Store a block and return the hash."""
1096

    
1097
        logger.debug("put_block: %s", len(data))
1098
        return binascii.hexlify(self.store.block_put(data))
1099

    
1100
    @backend_method(autocommit=0)
1101
    def update_block(self, hash, data, offset=0):
1102
        """Update a known block and return the hash."""
1103

    
1104
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1105
        if offset == 0 and len(data) == self.block_size:
1106
            return self.put_block(data)
1107
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1108
        return binascii.hexlify(h)
1109

    
1110
    # Path functions.
1111

    
1112
    def _generate_uuid(self):
1113
        return str(uuidlib.uuid4())
1114

    
1115
    def _put_object_node(self, path, parent, name):
1116
        path = '/'.join((path, name))
1117
        node = self.node.node_lookup(path)
1118
        if node is None:
1119
            node = self.node.node_create(parent, path)
1120
        return path, node
1121

    
1122
    def _put_path(self, user, parent, path):
1123
        node = self.node.node_create(parent, path)
1124
        self.node.version_create(node, None, 0, '', None, user,
1125
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1126
        return node
1127

    
1128
    def _lookup_account(self, account, create=True):
1129
        node = self.node.node_lookup(account)
1130
        if node is None and create:
1131
            node = self._put_path(
1132
                account, self.ROOTNODE, account)  # User is account.
1133
        return account, node
1134

    
1135
    def _lookup_container(self, account, container):
1136
        path = '/'.join((account, container))
1137
        node = self.node.node_lookup(path)
1138
        if node is None:
1139
            raise ItemNotExists('Container does not exist')
1140
        return path, node
1141

    
1142
    def _lookup_object(self, account, container, name):
1143
        path = '/'.join((account, container, name))
1144
        node = self.node.node_lookup(path)
1145
        if node is None:
1146
            raise ItemNotExists('Object does not exist')
1147
        return path, node
1148

    
1149
    def _lookup_objects(self, paths):
1150
        nodes = self.node.node_lookup_bulk(paths)
1151
        return paths, nodes
1152

    
1153
    def _get_properties(self, node, until=None):
1154
        """Return properties until the timestamp given."""
1155

    
1156
        before = until if until is not None else inf
1157
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1158
        if props is None and until is not None:
1159
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1160
        if props is None:
1161
            raise ItemNotExists('Path does not exist')
1162
        return props
1163

    
1164
    def _get_statistics(self, node, until=None):
1165
        """Return count, sum of size and latest timestamp of everything under node."""
1166

    
1167
        if until is None:
1168
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1169
        else:
1170
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1171
        if stats is None:
1172
            stats = (0, 0, 0)
1173
        return stats
1174

    
1175
    def _get_version(self, node, version=None):
1176
        if version is None:
1177
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1178
            if props is None:
1179
                raise ItemNotExists('Object does not exist')
1180
        else:
1181
            try:
1182
                version = int(version)
1183
            except ValueError:
1184
                raise VersionNotExists('Version does not exist')
1185
            props = self.node.version_get_properties(version)
1186
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1187
                raise VersionNotExists('Version does not exist')
1188
        return props
1189

    
1190
    def _get_versions(self, nodes):
1191
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1192

    
1193
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1194
        """Create a new version of the node."""
1195

    
1196
        props = self.node.version_lookup(
1197
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1198
        if props is not None:
1199
            src_version_id = props[self.SERIAL]
1200
            src_hash = props[self.HASH]
1201
            src_size = props[self.SIZE]
1202
            src_type = props[self.TYPE]
1203
            src_checksum = props[self.CHECKSUM]
1204
        else:
1205
            src_version_id = None
1206
            src_hash = None
1207
            src_size = 0
1208
            src_type = ''
1209
            src_checksum = ''
1210
        if size is None:  # Set metadata.
1211
            hash = src_hash  # This way hash can be set to None (account or container).
1212
            size = src_size
1213
        if type is None:
1214
            type = src_type
1215
        if checksum is None:
1216
            checksum = src_checksum
1217
        uuid = self._generate_uuid(
1218
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1219

    
1220
        if src_node is None:
1221
            pre_version_id = src_version_id
1222
        else:
1223
            pre_version_id = None
1224
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1225
            if props is not None:
1226
                pre_version_id = props[self.SERIAL]
1227
        if pre_version_id is not None:
1228
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1229

    
1230
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1231
        return pre_version_id, dest_version_id
1232

    
1233
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1234
        if src_version_id is not None:
1235
            self.node.attribute_copy(src_version_id, dest_version_id)
1236
        if not replace:
1237
            self.node.attribute_del(dest_version_id, domain, (
1238
                k for k, v in meta.iteritems() if v == ''))
1239
            self.node.attribute_set(dest_version_id, domain, (
1240
                (k, v) for k, v in meta.iteritems() if v != ''))
1241
        else:
1242
            self.node.attribute_del(dest_version_id, domain)
1243
            self.node.attribute_set(dest_version_id, domain, ((
1244
                k, v) for k, v in meta.iteritems()))
1245

    
1246
    def _put_metadata(self, user, node, domain, meta, replace=False):
1247
        """Create a new version and store metadata."""
1248

    
1249
        src_version_id, dest_version_id = self._put_version_duplicate(
1250
            user, node)
1251
        self._put_metadata_duplicate(
1252
            src_version_id, dest_version_id, domain, meta, replace)
1253
        return src_version_id, dest_version_id
1254

    
1255
    def _list_limits(self, listing, marker, limit):
1256
        start = 0
1257
        if marker:
1258
            try:
1259
                start = listing.index(marker) + 1
1260
            except ValueError:
1261
                pass
1262
        if not limit or limit > 10000:
1263
            limit = 10000
1264
        return start, limit
1265

    
1266
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1267
        cont_prefix = path + '/'
1268
        prefix = cont_prefix + prefix
1269
        start = cont_prefix + marker if marker else None
1270
        before = until if until is not None else inf
1271
        filterq = keys if domain else []
1272
        sizeq = size_range
1273

    
1274
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1275
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1276
        objects.sort(key=lambda x: x[0])
1277
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1278
        return objects
1279

    
1280
    # Reporting functions.
1281

    
1282
    def _report_size_change(self, user, account, size, details={}):
1283
        account_node = self._lookup_account(account, True)[1]
1284
        total = self._get_statistics(account_node)[1]
1285
        details.update({'user': user, 'total': total})
1286
        logger.debug(
1287
            "_report_size_change: %s %s %s %s", user, account, size, details)
1288
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1289
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1290
                              float(size), details))
1291

    
1292
        if not self.using_external_quotaholder:
1293
            return
1294

    
1295
        serial = self.quotaholder.issue_commission(
1296
                context     =   {},
1297
                target      =   account,
1298
                key         =   '1',
1299
                clientkey   =   'pithos',
1300
                ownerkey    =   '',
1301
                name        =   details['path'] if 'path' in details else '',
1302
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1303
        )
1304
        self.serials.append(serial)
1305

    
1306
    def _report_object_change(self, user, account, path, details={}):
1307
        details.update({'user': user})
1308
        logger.debug("_report_object_change: %s %s %s %s", user,
1309
                     account, path, details)
1310
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1311
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1312

    
1313
    def _report_sharing_change(self, user, account, path, details={}):
1314
        logger.debug("_report_permissions_change: %s %s %s %s",
1315
                     user, account, path, details)
1316
        details.update({'user': user})
1317
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1318
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1319

    
1320
    # Policy functions.
1321

    
1322
    def _check_policy(self, policy):
1323
        for k in policy.keys():
1324
            if policy[k] == '':
1325
                policy[k] = self.default_policy.get(k)
1326
        for k, v in policy.iteritems():
1327
            if k == 'quota':
1328
                q = int(v)  # May raise ValueError.
1329
                if q < 0:
1330
                    raise ValueError
1331
            elif k == 'versioning':
1332
                if v not in ['auto', 'none']:
1333
                    raise ValueError
1334
            else:
1335
                raise ValueError
1336

    
1337
    def _put_policy(self, node, policy, replace):
1338
        if replace:
1339
            for k, v in self.default_policy.iteritems():
1340
                if k not in policy:
1341
                    policy[k] = v
1342
        self.node.policy_set(node, policy)
1343

    
1344
    def _get_policy(self, node):
1345
        policy = self.default_policy.copy()
1346
        policy.update(self.node.policy_get(node))
1347
        return policy
1348

    
1349
    def _apply_versioning(self, account, container, version_id):
1350
        """Delete the provided version if such is the policy.
1351
           Return size of object removed.
1352
        """
1353

    
1354
        if version_id is None:
1355
            return 0
1356
        path, node = self._lookup_container(account, container)
1357
        versioning = self._get_policy(node)['versioning']
1358
        if versioning != 'auto':
1359
            hash, size = self.node.version_remove(version_id)
1360
            self.store.map_delete(hash)
1361
            return size
1362
        elif self.free_versioning:
1363
            return self.node.version_get_properties(
1364
                version_id, keys=('size',))[0]
1365
        return 0
1366

    
1367
    # Access control functions.
1368

    
1369
    def _check_groups(self, groups):
1370
        # raise ValueError('Bad characters in groups')
1371
        pass
1372

    
1373
    def _check_permissions(self, path, permissions):
1374
        # raise ValueError('Bad characters in permissions')
1375
        pass
1376

    
1377
    def _get_formatted_paths(self, paths):
1378
        formatted = []
1379
        for p in paths:
1380
            node = self.node.node_lookup(p)
1381
            props = None
1382
            if node is not None:
1383
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1384
            if props is not None:
1385
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1386
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1387
                formatted.append((p, self.MATCH_EXACT))
1388
        return formatted
1389

    
1390
    def _get_permissions_path(self, account, container, name):
1391
        path = '/'.join((account, container, name))
1392
        permission_paths = self.permissions.access_inherit(path)
1393
        permission_paths.sort()
1394
        permission_paths.reverse()
1395
        for p in permission_paths:
1396
            if p == path:
1397
                return p
1398
            else:
1399
                if p.count('/') < 2:
1400
                    continue
1401
                node = self.node.node_lookup(p)
1402
                props = None
1403
                if node is not None:
1404
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1405
                if props is not None:
1406
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1407
                        return p
1408
        return None
1409

    
1410
    def _can_read(self, user, account, container, name):
1411
        if user == account:
1412
            return True
1413
        path = '/'.join((account, container, name))
1414
        if self.permissions.public_get(path) is not None:
1415
            return True
1416
        path = self._get_permissions_path(account, container, name)
1417
        if not path:
1418
            raise NotAllowedError
1419
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1420
            raise NotAllowedError
1421

    
1422
    def _can_write(self, user, account, container, name):
1423
        if user == account:
1424
            return True
1425
        path = '/'.join((account, container, name))
1426
        path = self._get_permissions_path(account, container, name)
1427
        if not path:
1428
            raise NotAllowedError
1429
        if not self.permissions.access_check(path, self.WRITE, user):
1430
            raise NotAllowedError
1431

    
1432
    def _allowed_accounts(self, user):
1433
        allow = set()
1434
        for path in self.permissions.access_list_paths(user):
1435
            allow.add(path.split('/', 1)[0])
1436
        return sorted(allow)
1437

    
1438
    def _allowed_containers(self, user, account):
1439
        allow = set()
1440
        for path in self.permissions.access_list_paths(user, account):
1441
            allow.add(path.split('/', 2)[1])
1442
        return sorted(allow)