Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 56f3c759

History | View | Annotate | Download (64 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from synnefo.lib.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
85
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
86
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
87
DEFAULT_ALPHABET = ('0123456789'
88
                    'abcdefghijklmnopqrstuvwxyz'
89
                    'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
90
DEFAULT_MIN_LENGTH = 8
91

    
92
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
93
QUEUE_CLIENT_ID = 'pithos'
94
QUEUE_INSTANCE_ID = '1'
95

    
96
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
97

    
98
inf = float('inf')
99

    
100
ULTIMATE_ANSWER = 42
101

    
102

    
103
logger = logging.getLogger(__name__)
104

    
105

    
106
def backend_method(func=None, autocommit=1):
107
    if func is None:
108
        def fn(func):
109
            return backend_method(func, autocommit)
110
        return fn
111

    
112
    if not autocommit:
113
        return func
114

    
115
    def fn(self, *args, **kw):
116
        self.wrapper.execute()
117
        serials = []
118
        self.serials = serials
119
        self.messages = []
120

    
121
        try:
122
            ret = func(self, *args, **kw)
123
            for m in self.messages:
124
                self.queue.send(*m)
125
            if serials:
126
                self.quotaholder.accept_commission(
127
                            context     =   {},
128
                            clientkey   =   'pithos',
129
                            serials     =   serials)
130
            self.wrapper.commit()
131
            return ret
132
        except:
133
            if serials:
134
                self.quotaholder.reject_commission(
135
                            context     =   {},
136
                            clientkey   =   'pithos',
137
                            serials     =   serials)
138
            self.wrapper.rollback()
139
            raise
140
    return fn
141

    
142

    
143
class ModularBackend(BaseBackend):
144
    """A modular backend.
145

146
    Uses modules for SQL functions and storage.
147
    """
148

    
149
    def __init__(self, db_module=None, db_connection=None,
150
                 block_module=None, block_path=None, block_umask=None,
151
                 queue_module=None, queue_hosts=None, queue_exchange=None,
152
                 quotaholder_enabled=False,
153
                 quotaholder_url=None, quotaholder_token=None,
154
                 quotaholder_client_poolsize=None,
155
                 free_versioning=True, block_params=None,
156
                 public_url_min_length=None,
157
                 public_url_alphabet=None):
158
        db_module = db_module or DEFAULT_DB_MODULE
159
        db_connection = db_connection or DEFAULT_DB_CONNECTION
160
        block_module = block_module or DEFAULT_BLOCK_MODULE
161
        block_path = block_path or DEFAULT_BLOCK_PATH
162
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
163
        block_params = block_params or DEFAULT_BLOCK_PARAMS
164
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
165

    
166
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
167
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
168
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
169

    
170
        self.public_url_min_length = public_url_min_length or DEFAULT_MIN_LENGTH
171
        self.public_url_alphabet = public_url_alphabet or DEFAULT_ALPHABET
172

    
173
        self.hash_algorithm = 'sha256'
174
        self.block_size = 4 * 1024 * 1024  # 4MB
175
        self.free_versioning = free_versioning
176

    
177
        self.default_policy = {'quota': DEFAULT_QUOTA,
178
                               'versioning': DEFAULT_VERSIONING}
179

    
180
        def load_module(m):
181
            __import__(m)
182
            return sys.modules[m]
183

    
184
        self.db_module = load_module(db_module)
185
        self.wrapper = self.db_module.DBWrapper(db_connection)
186
        params = {'wrapper': self.wrapper}
187
        self.permissions = self.db_module.Permissions(**params)
188
        self.config = self.db_module.Config(**params)
189
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
190
        for x in ['READ', 'WRITE']:
191
            setattr(self, x, getattr(self.db_module, x))
192
        self.node = self.db_module.Node(**params)
193
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
194
            setattr(self, x, getattr(self.db_module, x))
195

    
196
        self.block_module = load_module(block_module)
197
        self.block_params = block_params
198
        params = {'path': block_path,
199
                  'block_size': self.block_size,
200
                  'hash_algorithm': self.hash_algorithm,
201
                  'umask': block_umask}
202
        params.update(self.block_params)
203
        self.store = self.block_module.Store(**params)
204

    
205
        if queue_module and queue_hosts:
206
            self.queue_module = load_module(queue_module)
207
            params = {'hosts': queue_hosts,
208
                      'exchange': queue_exchange,
209
                      'client_id': QUEUE_CLIENT_ID}
210
            self.queue = self.queue_module.Queue(**params)
211
        else:
212
            class NoQueue:
213
                def send(self, *args):
214
                    pass
215

    
216
                def close(self):
217
                    pass
218

    
219
            self.queue = NoQueue()
220

    
221
        self.quotaholder_enabled = quotaholder_enabled
222
        if quotaholder_enabled:
223
            self.quotaholder_url = quotaholder_url
224
            self.quotaholder_token = quotaholder_token
225
            self.quotaholder = QuotaholderClient(
226
                                    quotaholder_url,
227
                                    token=quotaholder_token,
228
                                    poolsize=quotaholder_client_poolsize)
229

    
230
        self.serials = []
231
        self.messages = []
232

    
233
    def close(self):
234
        self.wrapper.close()
235
        self.queue.close()
236

    
237
    @property
238
    def using_external_quotaholder(self):
239
        return self.quotaholder_enabled
240

    
241
    @backend_method
242
    def list_accounts(self, user, marker=None, limit=10000):
243
        """Return a list of accounts the user can access."""
244

    
245
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
246
        allowed = self._allowed_accounts(user)
247
        start, limit = self._list_limits(allowed, marker, limit)
248
        return allowed[start:start + limit]
249

    
250
    @backend_method
251
    def get_account_meta(
252
            self, user, account, domain, until=None, include_user_defined=True,
253
            external_quota=None):
254
        """Return a dictionary with the account metadata for the domain."""
255

    
256
        logger.debug(
257
            "get_account_meta: %s %s %s %s", user, account, domain, until)
258
        path, node = self._lookup_account(account, user == account)
259
        if user != account:
260
            if until or node is None or account not in self._allowed_accounts(user):
261
                raise NotAllowedError
262
        try:
263
            props = self._get_properties(node, until)
264
            mtime = props[self.MTIME]
265
        except NameError:
266
            props = None
267
            mtime = until
268
        count, bytes, tstamp = self._get_statistics(node, until)
269
        tstamp = max(tstamp, mtime)
270
        if until is None:
271
            modified = tstamp
272
        else:
273
            modified = self._get_statistics(
274
                node)[2]  # Overall last modification.
275
            modified = max(modified, mtime)
276

    
277
        if user != account:
278
            meta = {'name': account}
279
        else:
280
            meta = {}
281
            if props is not None and include_user_defined:
282
                meta.update(
283
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
284
            if until is not None:
285
                meta.update({'until_timestamp': tstamp})
286
            meta.update({'name': account, 'count': count, 'bytes': bytes})
287
            if self.using_external_quotaholder:
288
                external_quota = external_quota or {}
289
                meta['bytes'] = external_quota.get('currValue', 0)
290
        meta.update({'modified': modified})
291
        return meta
292

    
293
    @backend_method
294
    def update_account_meta(self, user, account, domain, meta, replace=False):
295
        """Update the metadata associated with the account for the domain."""
296

    
297
        logger.debug("update_account_meta: %s %s %s %s %s", user,
298
                     account, domain, meta, replace)
299
        if user != account:
300
            raise NotAllowedError
301
        path, node = self._lookup_account(account, True)
302
        self._put_metadata(user, node, domain, meta, replace)
303

    
304
    @backend_method
305
    def get_account_groups(self, user, account):
306
        """Return a dictionary with the user groups defined for this account."""
307

    
308
        logger.debug("get_account_groups: %s %s", user, account)
309
        if user != account:
310
            if account not in self._allowed_accounts(user):
311
                raise NotAllowedError
312
            return {}
313
        self._lookup_account(account, True)
314
        return self.permissions.group_dict(account)
315

    
316
    @backend_method
317
    def update_account_groups(self, user, account, groups, replace=False):
318
        """Update the groups associated with the account."""
319

    
320
        logger.debug("update_account_groups: %s %s %s %s", user,
321
                     account, groups, replace)
322
        if user != account:
323
            raise NotAllowedError
324
        self._lookup_account(account, True)
325
        self._check_groups(groups)
326
        if replace:
327
            self.permissions.group_destroy(account)
328
        for k, v in groups.iteritems():
329
            if not replace:  # If not already deleted.
330
                self.permissions.group_delete(account, k)
331
            if v:
332
                self.permissions.group_addmany(account, k, v)
333

    
334
    @backend_method
335
    def get_account_policy(self, user, account, external_quota=None):
336
        """Return a dictionary with the account policy."""
337

    
338
        logger.debug("get_account_policy: %s %s", user, account)
339
        if user != account:
340
            if account not in self._allowed_accounts(user):
341
                raise NotAllowedError
342
            return {}
343
        path, node = self._lookup_account(account, True)
344
        policy = self._get_policy(node)
345
        if self.using_external_quotaholder:
346
            external_quota = external_quota or {}
347
            policy['quota'] = external_quota.get('maxValue', 0)
348
        return policy
349

    
350
    @backend_method
351
    def update_account_policy(self, user, account, policy, replace=False):
352
        """Update the policy associated with the account."""
353

    
354
        logger.debug("update_account_policy: %s %s %s %s", user,
355
                     account, policy, replace)
356
        if user != account:
357
            raise NotAllowedError
358
        path, node = self._lookup_account(account, True)
359
        self._check_policy(policy)
360
        self._put_policy(node, policy, replace)
361

    
362
    @backend_method
363
    def put_account(self, user, account, policy=None):
364
        """Create a new account with the given name."""
365

    
366
        logger.debug("put_account: %s %s %s", user, account, policy)
367
        policy = policy or {}
368
        if user != account:
369
            raise NotAllowedError
370
        node = self.node.node_lookup(account)
371
        if node is not None:
372
            raise AccountExists('Account already exists')
373
        if policy:
374
            self._check_policy(policy)
375
        node = self._put_path(user, self.ROOTNODE, account)
376
        self._put_policy(node, policy, True)
377

    
378
    @backend_method
379
    def delete_account(self, user, account):
380
        """Delete the account with the given name."""
381

    
382
        logger.debug("delete_account: %s %s", user, account)
383
        if user != account:
384
            raise NotAllowedError
385
        node = self.node.node_lookup(account)
386
        if node is None:
387
            return
388
        if not self.node.node_remove(node):
389
            raise AccountNotEmpty('Account is not empty')
390
        self.permissions.group_destroy(account)
391

    
392
    @backend_method
393
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
394
        """Return a list of containers existing under an account."""
395

    
396
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
397
                     account, marker, limit, shared, until, public)
398
        if user != account:
399
            if until or account not in self._allowed_accounts(user):
400
                raise NotAllowedError
401
            allowed = self._allowed_containers(user, account)
402
            start, limit = self._list_limits(allowed, marker, limit)
403
            return allowed[start:start + limit]
404
        if shared or public:
405
            allowed = set()
406
            if shared:
407
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
408
            if public:
409
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
410
            allowed = sorted(allowed)
411
            start, limit = self._list_limits(allowed, marker, limit)
412
            return allowed[start:start + limit]
413
        node = self.node.node_lookup(account)
414
        containers = [x[0] for x in self._list_object_properties(
415
            node, account, '', '/', marker, limit, False, None, [], until)]
416
        start, limit = self._list_limits(
417
            [x[0] for x in containers], marker, limit)
418
        return containers[start:start + limit]
419

    
420
    @backend_method
421
    def list_container_meta(self, user, account, container, domain, until=None):
422
        """Return a list with all the container's object meta keys for the domain."""
423

    
424
        logger.debug("list_container_meta: %s %s %s %s %s", user,
425
                     account, container, domain, until)
426
        allowed = []
427
        if user != account:
428
            if until:
429
                raise NotAllowedError
430
            allowed = self.permissions.access_list_paths(
431
                user, '/'.join((account, container)))
432
            if not allowed:
433
                raise NotAllowedError
434
        path, node = self._lookup_container(account, container)
435
        before = until if until is not None else inf
436
        allowed = self._get_formatted_paths(allowed)
437
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
438

    
439
    @backend_method
440
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
441
        """Return a dictionary with the container metadata for the domain."""
442

    
443
        logger.debug("get_container_meta: %s %s %s %s %s", user,
444
                     account, container, domain, until)
445
        if user != account:
446
            if until or container not in self._allowed_containers(user, account):
447
                raise NotAllowedError
448
        path, node = self._lookup_container(account, container)
449
        props = self._get_properties(node, until)
450
        mtime = props[self.MTIME]
451
        count, bytes, tstamp = self._get_statistics(node, until)
452
        tstamp = max(tstamp, mtime)
453
        if until is None:
454
            modified = tstamp
455
        else:
456
            modified = self._get_statistics(
457
                node)[2]  # Overall last modification.
458
            modified = max(modified, mtime)
459

    
460
        if user != account:
461
            meta = {'name': container}
462
        else:
463
            meta = {}
464
            if include_user_defined:
465
                meta.update(
466
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
467
            if until is not None:
468
                meta.update({'until_timestamp': tstamp})
469
            meta.update({'name': container, 'count': count, 'bytes': bytes})
470
        meta.update({'modified': modified})
471
        return meta
472

    
473
    @backend_method
474
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
475
        """Update the metadata associated with the container for the domain."""
476

    
477
        logger.debug("update_container_meta: %s %s %s %s %s %s",
478
                     user, account, container, domain, meta, replace)
479
        if user != account:
480
            raise NotAllowedError
481
        path, node = self._lookup_container(account, container)
482
        src_version_id, dest_version_id = self._put_metadata(
483
            user, node, domain, meta, replace)
484
        if src_version_id is not None:
485
            versioning = self._get_policy(node)['versioning']
486
            if versioning != 'auto':
487
                self.node.version_remove(src_version_id)
488

    
489
    @backend_method
490
    def get_container_policy(self, user, account, container):
491
        """Return a dictionary with the container policy."""
492

    
493
        logger.debug(
494
            "get_container_policy: %s %s %s", user, account, container)
495
        if user != account:
496
            if container not in self._allowed_containers(user, account):
497
                raise NotAllowedError
498
            return {}
499
        path, node = self._lookup_container(account, container)
500
        return self._get_policy(node)
501

    
502
    @backend_method
503
    def update_container_policy(self, user, account, container, policy, replace=False):
504
        """Update the policy associated with the container."""
505

    
506
        logger.debug("update_container_policy: %s %s %s %s %s",
507
                     user, account, container, policy, replace)
508
        if user != account:
509
            raise NotAllowedError
510
        path, node = self._lookup_container(account, container)
511
        self._check_policy(policy)
512
        self._put_policy(node, policy, replace)
513

    
514
    @backend_method
515
    def put_container(self, user, account, container, policy=None):
516
        """Create a new container with the given name."""
517

    
518
        logger.debug(
519
            "put_container: %s %s %s %s", user, account, container, policy)
520
        policy = policy or {}
521
        if user != account:
522
            raise NotAllowedError
523
        try:
524
            path, node = self._lookup_container(account, container)
525
        except NameError:
526
            pass
527
        else:
528
            raise ContainerExists('Container already exists')
529
        if policy:
530
            self._check_policy(policy)
531
        path = '/'.join((account, container))
532
        node = self._put_path(
533
            user, self._lookup_account(account, True)[1], path)
534
        self._put_policy(node, policy, True)
535

    
536
    @backend_method
537
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
538
        """Delete/purge the container with the given name."""
539

    
540
        logger.debug("delete_container: %s %s %s %s %s %s", user,
541
                     account, container, until, prefix, delimiter)
542
        if user != account:
543
            raise NotAllowedError
544
        path, node = self._lookup_container(account, container)
545

    
546
        if until is not None:
547
            hashes, size, serials = self.node.node_purge_children(
548
                node, until, CLUSTER_HISTORY)
549
            for h in hashes:
550
                self.store.map_delete(h)
551
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
552
            if not self.free_versioning:
553
                self._report_size_change(
554
                    user, account, -size, {
555
                        'action':'container purge',
556
                        'path': path,
557
                        'versions': ','.join(str(i) for i in serials)
558
                    }
559
                )
560
            return
561

    
562
        if not delimiter:
563
            if self._get_statistics(node)[0] > 0:
564
                raise ContainerNotEmpty('Container is not empty')
565
            hashes, size, serials = self.node.node_purge_children(
566
                node, inf, CLUSTER_HISTORY)
567
            for h in hashes:
568
                self.store.map_delete(h)
569
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
570
            self.node.node_remove(node)
571
            if not self.free_versioning:
572
                self._report_size_change(
573
                    user, account, -size, {
574
                        'action':'container purge',
575
                        'path': path,
576
                        'versions': ','.join(str(i) for i in serials)
577
                    }
578
                )
579
        else:
580
            # remove only contents
581
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
582
            paths = []
583
            for t in src_names:
584
                path = '/'.join((account, container, t[0]))
585
                node = t[2]
586
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
587
                del_size = self._apply_versioning(
588
                    account, container, src_version_id)
589
                self._report_size_change(
590
                        user, account, -del_size, {
591
                                'action': 'object delete',
592
                                'path': path,
593
                        'versions': ','.join([str(dest_version_id)])
594
                     }
595
                )
596
                self._report_object_change(
597
                    user, account, path, details={'action': 'object delete'})
598
                paths.append(path)
599
            self.permissions.access_clear_bulk(paths)
600

    
601
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
602
        if user != account and until:
603
            raise NotAllowedError
604
        if shared and public:
605
            # get shared first
606
            shared = self._list_object_permissions(
607
                user, account, container, prefix, shared=True, public=False)
608
            objects = set()
609
            if shared:
610
                path, node = self._lookup_container(account, container)
611
                shared = self._get_formatted_paths(shared)
612
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
613

    
614
            # get public
615
            objects |= set(self._list_public_object_properties(
616
                user, account, container, prefix, all_props))
617
            objects = list(objects)
618

    
619
            objects.sort(key=lambda x: x[0])
620
            start, limit = self._list_limits(
621
                [x[0] for x in objects], marker, limit)
622
            return objects[start:start + limit]
623
        elif public:
624
            objects = self._list_public_object_properties(
625
                user, account, container, prefix, all_props)
626
            start, limit = self._list_limits(
627
                [x[0] for x in objects], marker, limit)
628
            return objects[start:start + limit]
629

    
630
        allowed = self._list_object_permissions(
631
            user, account, container, prefix, shared, public)
632
        if shared and not allowed:
633
            return []
634
        path, node = self._lookup_container(account, container)
635
        allowed = self._get_formatted_paths(allowed)
636
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
637
        start, limit = self._list_limits(
638
            [x[0] for x in objects], marker, limit)
639
        return objects[start:start + limit]
640

    
641
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
642
        public = self._list_object_permissions(
643
            user, account, container, prefix, shared=False, public=True)
644
        paths, nodes = self._lookup_objects(public)
645
        path = '/'.join((account, container))
646
        cont_prefix = path + '/'
647
        paths = [x[len(cont_prefix):] for x in paths]
648
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
649
        objects = [(path,) + props for path, props in zip(paths, props)]
650
        return objects
651

    
652
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
653
        objects = []
654
        while True:
655
            marker = objects[-1] if objects else None
656
            limit = 10000
657
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
658
            objects.extend(l)
659
            if not l or len(l) < limit:
660
                break
661
        return objects
662

    
663
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
664
        allowed = []
665
        path = '/'.join((account, container, prefix)).rstrip('/')
666
        if user != account:
667
            allowed = self.permissions.access_list_paths(user, path)
668
            if not allowed:
669
                raise NotAllowedError
670
        else:
671
            allowed = set()
672
            if shared:
673
                allowed.update(self.permissions.access_list_shared(path))
674
            if public:
675
                allowed.update(
676
                    [x[0] for x in self.permissions.public_list(path)])
677
            allowed = sorted(allowed)
678
            if not allowed:
679
                return []
680
        return allowed
681

    
682
    @backend_method
683
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
684
        """Return a list of object (name, version_id) tuples existing under a container."""
685

    
686
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
687
        keys = keys or []
688
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
689

    
690
    @backend_method
691
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
692
        """Return a list of object metadata dicts existing under a container."""
693

    
694
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
695
        keys = keys or []
696
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
697
        objects = []
698
        for p in props:
699
            if len(p) == 2:
700
                objects.append({'subdir': p[0]})
701
            else:
702
                objects.append({'name': p[0],
703
                                'bytes': p[self.SIZE + 1],
704
                                'type': p[self.TYPE + 1],
705
                                'hash': p[self.HASH + 1],
706
                                'version': p[self.SERIAL + 1],
707
                                'version_timestamp': p[self.MTIME + 1],
708
                                'modified': p[self.MTIME + 1] if until is None else None,
709
                                'modified_by': p[self.MUSER + 1],
710
                                'uuid': p[self.UUID + 1],
711
                                'checksum': p[self.CHECKSUM + 1]})
712
        return objects
713

    
714
    @backend_method
715
    def list_object_permissions(self, user, account, container, prefix=''):
716
        """Return a list of paths that enforce permissions under a container."""
717

    
718
        logger.debug("list_object_permissions: %s %s %s %s", user,
719
                     account, container, prefix)
720
        return self._list_object_permissions(user, account, container, prefix, True, False)
721

    
722
    @backend_method
723
    def list_object_public(self, user, account, container, prefix=''):
724
        """Return a dict mapping paths to public ids for objects that are public under a container."""
725

    
726
        logger.debug("list_object_public: %s %s %s %s", user,
727
                     account, container, prefix)
728
        public = {}
729
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
730
            public[path] = p
731
        return public
732

    
733
    @backend_method
734
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
735
        """Return a dictionary with the object metadata for the domain."""
736

    
737
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
738
                     account, container, name, domain, version)
739
        self._can_read(user, account, container, name)
740
        path, node = self._lookup_object(account, container, name)
741
        props = self._get_version(node, version)
742
        if version is None:
743
            modified = props[self.MTIME]
744
        else:
745
            try:
746
                modified = self._get_version(
747
                    node)[self.MTIME]  # Overall last modification.
748
            except NameError:  # Object may be deleted.
749
                del_props = self.node.version_lookup(
750
                    node, inf, CLUSTER_DELETED)
751
                if del_props is None:
752
                    raise ItemNotExists('Object does not exist')
753
                modified = del_props[self.MTIME]
754

    
755
        meta = {}
756
        if include_user_defined:
757
            meta.update(
758
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
759
        meta.update({'name': name,
760
                     'bytes': props[self.SIZE],
761
                     'type': props[self.TYPE],
762
                     'hash': props[self.HASH],
763
                     'version': props[self.SERIAL],
764
                     'version_timestamp': props[self.MTIME],
765
                     'modified': modified,
766
                     'modified_by': props[self.MUSER],
767
                     'uuid': props[self.UUID],
768
                     'checksum': props[self.CHECKSUM]})
769
        return meta
770

    
771
    @backend_method
772
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
773
        """Update the metadata associated with the object for the domain and return the new version."""
774

    
775
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
776
                     user, account, container, name, domain, meta, replace)
777
        self._can_write(user, account, container, name)
778
        path, node = self._lookup_object(account, container, name)
779
        src_version_id, dest_version_id = self._put_metadata(
780
            user, node, domain, meta, replace)
781
        self._apply_versioning(account, container, src_version_id)
782
        return dest_version_id
783

    
784
    @backend_method
785
    def get_object_permissions(self, user, account, container, name):
786
        """Return the action allowed on the object, the path
787
        from which the object gets its permissions from,
788
        along with a dictionary containing the permissions."""
789

    
790
        logger.debug("get_object_permissions: %s %s %s %s", user,
791
                     account, container, name)
792
        allowed = 'write'
793
        permissions_path = self._get_permissions_path(account, container, name)
794
        if user != account:
795
            if self.permissions.access_check(permissions_path, self.WRITE, user):
796
                allowed = 'write'
797
            elif self.permissions.access_check(permissions_path, self.READ, user):
798
                allowed = 'read'
799
            else:
800
                raise NotAllowedError
801
        self._lookup_object(account, container, name)
802
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
803

    
804
    @backend_method
805
    def update_object_permissions(self, user, account, container, name, permissions):
806
        """Update the permissions associated with the object."""
807

    
808
        logger.debug("update_object_permissions: %s %s %s %s %s",
809
                     user, account, container, name, permissions)
810
        if user != account:
811
            raise NotAllowedError
812
        path = self._lookup_object(account, container, name)[0]
813
        self._check_permissions(path, permissions)
814
        self.permissions.access_set(path, permissions)
815
        self._report_sharing_change(user, account, path, {'members':
816
                                    self.permissions.access_members(path)})
817

    
818
    @backend_method
819
    def get_object_public(self, user, account, container, name):
820
        """Return the public id of the object if applicable."""
821

    
822
        logger.debug(
823
            "get_object_public: %s %s %s %s", user, account, container, name)
824
        self._can_read(user, account, container, name)
825
        path = self._lookup_object(account, container, name)[0]
826
        p = self.permissions.public_get(path)
827
        return p
828

    
829
    @backend_method
830
    def update_object_public(self, user, account, container, name, public):
831
        """Update the public status of the object."""
832

    
833
        logger.debug("update_object_public: %s %s %s %s %s", user,
834
                     account, container, name, public)
835
        self._can_write(user, account, container, name)
836
        path = self._lookup_object(account, container, name)[0]
837
        if not public:
838
            self.permissions.public_unset(path)
839
        else:
840
            self.permissions.public_set(
841
                path, self.public_url_min_length, self.public_url_alphabet
842
            )
843

    
844
    @backend_method
845
    def get_object_hashmap(self, user, account, container, name, version=None):
846
        """Return the object's size and a list with partial hashes."""
847

    
848
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
849
                     account, container, name, version)
850
        self._can_read(user, account, container, name)
851
        path, node = self._lookup_object(account, container, name)
852
        props = self._get_version(node, version)
853
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
854
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
855

    
856
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
857
        if permissions is not None and user != account:
858
            raise NotAllowedError
859
        self._can_write(user, account, container, name)
860
        if permissions is not None:
861
            path = '/'.join((account, container, name))
862
            self._check_permissions(path, permissions)
863

    
864
        account_path, account_node = self._lookup_account(account, True)
865
        container_path, container_node = self._lookup_container(
866
            account, container)
867
        path, node = self._put_object_node(
868
            container_path, container_node, name)
869
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
870

    
871
        # Handle meta.
872
        if src_version_id is None:
873
            src_version_id = pre_version_id
874
        self._put_metadata_duplicate(
875
            src_version_id, dest_version_id, domain, meta, replace_meta)
876

    
877
        del_size = self._apply_versioning(account, container, pre_version_id)
878
        size_delta = size - del_size
879
        if not self.using_external_quotaholder: # Check account quota.
880
            if size_delta > 0:
881
                account_quota = long(self._get_policy(account_node)['quota'])
882
                account_usage = self._get_statistics(account_node)[1] + size_delta
883
                if (account_quota > 0 and account_usage > account_quota):
884
                    raise QuotaError('account quota exceeded: limit: %s, usage: %s' % (
885
                        account_quota, account_usage
886
                    ))
887

    
888
        # Check container quota.
889
        container_quota = long(self._get_policy(container_node)['quota'])
890
        container_usage = self._get_statistics(container_node)[1] + size_delta
891
        if (container_quota > 0 and container_usage > container_quota):
892
            # This must be executed in a transaction, so the version is
893
            # never created if it fails.
894
            raise QuotaError('container quota exceeded: limit: %s, usage: %s' % (
895
                container_quota, container_usage
896
            ))
897

    
898
        self._report_size_change(user, account, size_delta,
899
                                 {'action': 'object update', 'path': path,
900
                                  'versions': ','.join([str(dest_version_id)])})
901
        if permissions is not None:
902
            self.permissions.access_set(path, permissions)
903
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
904

    
905
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
906
        return dest_version_id
907

    
908
    @backend_method
909
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
910
        """Create/update an object with the specified size and partial hashes."""
911

    
912
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
913
                     account, container, name, size, type, hashmap, checksum)
914
        meta = meta or {}
915
        if size == 0:  # No such thing as an empty hashmap.
916
            hashmap = [self.put_block('')]
917
        map = HashMap(self.block_size, self.hash_algorithm)
918
        map.extend([binascii.unhexlify(x) for x in hashmap])
919
        missing = self.store.block_search(map)
920
        if missing:
921
            ie = IndexError()
922
            ie.data = [binascii.hexlify(x) for x in missing]
923
            raise ie
924

    
925
        hash = map.hash()
926
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
927
        self.store.map_put(hash, map)
928
        return dest_version_id
929

    
930
    @backend_method
931
    def update_object_checksum(self, user, account, container, name, version, checksum):
932
        """Update an object's checksum."""
933

    
934
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
935
                     user, account, container, name, version, checksum)
936
        # Update objects with greater version and same hashmap and size (fix metadata updates).
937
        self._can_write(user, account, container, name)
938
        path, node = self._lookup_object(account, container, name)
939
        props = self._get_version(node, version)
940
        versions = self.node.node_get_versions(node)
941
        for x in versions:
942
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
943
                self.node.version_put_property(
944
                    x[self.SERIAL], 'checksum', checksum)
945

    
946
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
947
        dest_meta = dest_meta or {}
948
        dest_version_ids = []
949
        self._can_read(user, src_account, src_container, src_name)
950
        path, node = self._lookup_object(src_account, src_container, src_name)
951
        # TODO: Will do another fetch of the properties in duplicate version...
952
        props = self._get_version(
953
            node, src_version)  # Check to see if source exists.
954
        src_version_id = props[self.SERIAL]
955
        hash = props[self.HASH]
956
        size = props[self.SIZE]
957
        is_copy = not is_move and (src_account, src_container, src_name) != (
958
            dest_account, dest_container, dest_name)  # New uuid.
959
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
960
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
961
            self._delete_object(user, src_account, src_container, src_name)
962

    
963
        if delimiter:
964
            prefix = src_name + \
965
                delimiter if not src_name.endswith(delimiter) else src_name
966
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
967
            src_names.sort(key=lambda x: x[2])  # order by nodes
968
            paths = [elem[0] for elem in src_names]
969
            nodes = [elem[2] for elem in src_names]
970
            # TODO: Will do another fetch of the properties in duplicate version...
971
            props = self._get_versions(nodes)  # Check to see if source exists.
972

    
973
            for prop, path, node in zip(props, paths, nodes):
974
                src_version_id = prop[self.SERIAL]
975
                hash = prop[self.HASH]
976
                vtype = prop[self.TYPE]
977
                size = prop[self.SIZE]
978
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
979
                    delimiter) else dest_name
980
                vdest_name = path.replace(prefix, dest_prefix, 1)
981
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
982
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
983
                    self._delete_object(user, src_account, src_container, path)
984
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
985

    
986
    @backend_method
987
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
988
        """Copy an object's data and metadata."""
989

    
990
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
991
        meta = meta or {}
992
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
993
        return dest_version_id
994

    
995
    @backend_method
996
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
997
        """Move an object's data and metadata."""
998

    
999
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1000
        meta = meta or {}
1001
        if user != src_account:
1002
            raise NotAllowedError
1003
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1004
        return dest_version_id
1005

    
1006
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1007
        if user != account:
1008
            raise NotAllowedError
1009

    
1010
        if until is not None:
1011
            path = '/'.join((account, container, name))
1012
            node = self.node.node_lookup(path)
1013
            if node is None:
1014
                return
1015
            hashes = []
1016
            size = 0
1017
            serials = []
1018
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1019
            hashes += h
1020
            size += s
1021
            serials += v
1022
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1023
            hashes += h
1024
            if not self.free_versioning:
1025
                size += s
1026
            serials += v
1027
            for h in hashes:
1028
                self.store.map_delete(h)
1029
            self.node.node_purge(node, until, CLUSTER_DELETED)
1030
            try:
1031
                props = self._get_version(node)
1032
            except NameError:
1033
                self.permissions.access_clear(path)
1034
            self._report_size_change(
1035
                user, account, -size, {
1036
                    'action': 'object purge',
1037
                    'path': path,
1038
                    'versions': ','.join(str(i) for i in serials)
1039
                }
1040
            )
1041
            return
1042

    
1043
        path, node = self._lookup_object(account, container, name)
1044
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1045
        del_size = self._apply_versioning(account, container, src_version_id)
1046
        self._report_size_change(user, account, -del_size,
1047
                                 {'action': 'object delete', 'path': path,
1048
                                  'versions': ','.join([str(dest_version_id)])})
1049
        self._report_object_change(
1050
            user, account, path, details={'action': 'object delete'})
1051
        self.permissions.access_clear(path)
1052

    
1053
        if delimiter:
1054
            prefix = name + delimiter if not name.endswith(delimiter) else name
1055
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1056
            paths = []
1057
            for t in src_names:
1058
                path = '/'.join((account, container, t[0]))
1059
                node = t[2]
1060
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1061
                del_size = self._apply_versioning(
1062
                    account, container, src_version_id)
1063
                self._report_size_change(user, account, -del_size,
1064
                                         {'action': 'object delete',
1065
                                          'path': path,
1066
                                          'versions': ','.join([str(dest_version_id)])})
1067
                self._report_object_change(
1068
                    user, account, path, details={'action': 'object delete'})
1069
                paths.append(path)
1070
            self.permissions.access_clear_bulk(paths)
1071

    
1072
    @backend_method
1073
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1074
        """Delete/purge an object."""
1075

    
1076
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1077
                     account, container, name, until, prefix, delimiter)
1078
        self._delete_object(user, account, container, name, until, delimiter)
1079

    
1080
    @backend_method
1081
    def list_versions(self, user, account, container, name):
1082
        """Return a list of all (version, version_timestamp) tuples for an object."""
1083

    
1084
        logger.debug(
1085
            "list_versions: %s %s %s %s", user, account, container, name)
1086
        self._can_read(user, account, container, name)
1087
        path, node = self._lookup_object(account, container, name)
1088
        versions = self.node.node_get_versions(node)
1089
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1090

    
1091
    @backend_method
1092
    def get_uuid(self, user, uuid):
1093
        """Return the (account, container, name) for the UUID given."""
1094

    
1095
        logger.debug("get_uuid: %s %s", user, uuid)
1096
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1097
        if info is None:
1098
            raise NameError
1099
        path, serial = info
1100
        account, container, name = path.split('/', 2)
1101
        self._can_read(user, account, container, name)
1102
        return (account, container, name)
1103

    
1104
    @backend_method
1105
    def get_public(self, user, public):
1106
        """Return the (account, container, name) for the public id given."""
1107

    
1108
        logger.debug("get_public: %s %s", user, public)
1109
        path = self.permissions.public_path(public)
1110
        if path is None:
1111
            raise NameError
1112
        account, container, name = path.split('/', 2)
1113
        self._can_read(user, account, container, name)
1114
        return (account, container, name)
1115

    
1116
    @backend_method(autocommit=0)
1117
    def get_block(self, hash):
1118
        """Return a block's data."""
1119

    
1120
        logger.debug("get_block: %s", hash)
1121
        block = self.store.block_get(binascii.unhexlify(hash))
1122
        if not block:
1123
            raise ItemNotExists('Block does not exist')
1124
        return block
1125

    
1126
    @backend_method(autocommit=0)
1127
    def put_block(self, data):
1128
        """Store a block and return the hash."""
1129

    
1130
        logger.debug("put_block: %s", len(data))
1131
        return binascii.hexlify(self.store.block_put(data))
1132

    
1133
    @backend_method(autocommit=0)
1134
    def update_block(self, hash, data, offset=0):
1135
        """Update a known block and return the hash."""
1136

    
1137
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1138
        if offset == 0 and len(data) == self.block_size:
1139
            return self.put_block(data)
1140
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1141
        return binascii.hexlify(h)
1142

    
1143
    # Path functions.
1144

    
1145
    def _generate_uuid(self):
1146
        return str(uuidlib.uuid4())
1147

    
1148
    def _put_object_node(self, path, parent, name):
1149
        path = '/'.join((path, name))
1150
        node = self.node.node_lookup(path)
1151
        if node is None:
1152
            node = self.node.node_create(parent, path)
1153
        return path, node
1154

    
1155
    def _put_path(self, user, parent, path):
1156
        node = self.node.node_create(parent, path)
1157
        self.node.version_create(node, None, 0, '', None, user,
1158
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1159
        return node
1160

    
1161
    def _lookup_account(self, account, create=True):
1162
        node = self.node.node_lookup(account)
1163
        if node is None and create:
1164
            node = self._put_path(
1165
                account, self.ROOTNODE, account)  # User is account.
1166
        return account, node
1167

    
1168
    def _lookup_container(self, account, container):
1169
        path = '/'.join((account, container))
1170
        node = self.node.node_lookup(path)
1171
        if node is None:
1172
            raise ItemNotExists('Container does not exist')
1173
        return path, node
1174

    
1175
    def _lookup_object(self, account, container, name):
1176
        path = '/'.join((account, container, name))
1177
        node = self.node.node_lookup(path)
1178
        if node is None:
1179
            raise ItemNotExists('Object does not exist')
1180
        return path, node
1181

    
1182
    def _lookup_objects(self, paths):
1183
        nodes = self.node.node_lookup_bulk(paths)
1184
        return paths, nodes
1185

    
1186
    def _get_properties(self, node, until=None):
1187
        """Return properties until the timestamp given."""
1188

    
1189
        before = until if until is not None else inf
1190
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1191
        if props is None and until is not None:
1192
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1193
        if props is None:
1194
            raise ItemNotExists('Path does not exist')
1195
        return props
1196

    
1197
    def _get_statistics(self, node, until=None):
1198
        """Return count, sum of size and latest timestamp of everything under node."""
1199

    
1200
        if until is None:
1201
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1202
        else:
1203
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1204
        if stats is None:
1205
            stats = (0, 0, 0)
1206
        return stats
1207

    
1208
    def _get_version(self, node, version=None):
1209
        if version is None:
1210
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1211
            if props is None:
1212
                raise ItemNotExists('Object does not exist')
1213
        else:
1214
            try:
1215
                version = int(version)
1216
            except ValueError:
1217
                raise VersionNotExists('Version does not exist')
1218
            props = self.node.version_get_properties(version)
1219
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1220
                raise VersionNotExists('Version does not exist')
1221
        return props
1222

    
1223
    def _get_versions(self, nodes):
1224
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1225

    
1226
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1227
        """Create a new version of the node."""
1228

    
1229
        props = self.node.version_lookup(
1230
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1231
        if props is not None:
1232
            src_version_id = props[self.SERIAL]
1233
            src_hash = props[self.HASH]
1234
            src_size = props[self.SIZE]
1235
            src_type = props[self.TYPE]
1236
            src_checksum = props[self.CHECKSUM]
1237
        else:
1238
            src_version_id = None
1239
            src_hash = None
1240
            src_size = 0
1241
            src_type = ''
1242
            src_checksum = ''
1243
        if size is None:  # Set metadata.
1244
            hash = src_hash  # This way hash can be set to None (account or container).
1245
            size = src_size
1246
        if type is None:
1247
            type = src_type
1248
        if checksum is None:
1249
            checksum = src_checksum
1250
        uuid = self._generate_uuid(
1251
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1252

    
1253
        if src_node is None:
1254
            pre_version_id = src_version_id
1255
        else:
1256
            pre_version_id = None
1257
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1258
            if props is not None:
1259
                pre_version_id = props[self.SERIAL]
1260
        if pre_version_id is not None:
1261
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1262

    
1263
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1264
        return pre_version_id, dest_version_id
1265

    
1266
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1267
        if src_version_id is not None:
1268
            self.node.attribute_copy(src_version_id, dest_version_id)
1269
        if not replace:
1270
            self.node.attribute_del(dest_version_id, domain, (
1271
                k for k, v in meta.iteritems() if v == ''))
1272
            self.node.attribute_set(dest_version_id, domain, (
1273
                (k, v) for k, v in meta.iteritems() if v != ''))
1274
        else:
1275
            self.node.attribute_del(dest_version_id, domain)
1276
            self.node.attribute_set(dest_version_id, domain, ((
1277
                k, v) for k, v in meta.iteritems()))
1278

    
1279
    def _put_metadata(self, user, node, domain, meta, replace=False):
1280
        """Create a new version and store metadata."""
1281

    
1282
        src_version_id, dest_version_id = self._put_version_duplicate(
1283
            user, node)
1284
        self._put_metadata_duplicate(
1285
            src_version_id, dest_version_id, domain, meta, replace)
1286
        return src_version_id, dest_version_id
1287

    
1288
    def _list_limits(self, listing, marker, limit):
1289
        start = 0
1290
        if marker:
1291
            try:
1292
                start = listing.index(marker) + 1
1293
            except ValueError:
1294
                pass
1295
        if not limit or limit > 10000:
1296
            limit = 10000
1297
        return start, limit
1298

    
1299
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1300
        keys = keys or []
1301
        allowed = allowed or []
1302
        cont_prefix = path + '/'
1303
        prefix = cont_prefix + prefix
1304
        start = cont_prefix + marker if marker else None
1305
        before = until if until is not None else inf
1306
        filterq = keys if domain else []
1307
        sizeq = size_range
1308

    
1309
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1310
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1311
        objects.sort(key=lambda x: x[0])
1312
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1313
        return objects
1314

    
1315
    # Reporting functions.
1316

    
1317
    def _report_size_change(self, user, account, size, details=None):
1318
        details = details or {}
1319

    
1320
        if size == 0:
1321
            return
1322

    
1323
        account_node = self._lookup_account(account, True)[1]
1324
        total = self._get_statistics(account_node)[1]
1325
        details.update({'user': user, 'total': total})
1326
        logger.debug(
1327
            "_report_size_change: %s %s %s %s", user, account, size, details)
1328
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1329
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1330
                              float(size), details))
1331

    
1332
        if not self.using_external_quotaholder:
1333
            return
1334

    
1335
        try:
1336
            serial = self.quotaholder.issue_commission(
1337
                    context     =   {},
1338
                    target      =   account,
1339
                    key         =   '1',
1340
                    clientkey   =   'pithos',
1341
                    ownerkey    =   '',
1342
                    name        =   details['path'] if 'path' in details else '',
1343
                    provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1344
            )
1345
        except BaseException, e:
1346
            raise QuotaError(e)
1347
        else:
1348
            self.serials.append(serial)
1349

    
1350
    def _report_object_change(self, user, account, path, details=None):
1351
        details = details or {}
1352
        details.update({'user': user})
1353
        logger.debug("_report_object_change: %s %s %s %s", user,
1354
                     account, path, details)
1355
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1356
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1357

    
1358
    def _report_sharing_change(self, user, account, path, details=None):
1359
        logger.debug("_report_permissions_change: %s %s %s %s",
1360
                     user, account, path, details)
1361
        details = details or {}
1362
        details.update({'user': user})
1363
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1364
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1365

    
1366
    # Policy functions.
1367

    
1368
    def _check_policy(self, policy):
1369
        for k in policy.keys():
1370
            if policy[k] == '':
1371
                policy[k] = self.default_policy.get(k)
1372
        for k, v in policy.iteritems():
1373
            if k == 'quota':
1374
                q = int(v)  # May raise ValueError.
1375
                if q < 0:
1376
                    raise ValueError
1377
            elif k == 'versioning':
1378
                if v not in ['auto', 'none']:
1379
                    raise ValueError
1380
            else:
1381
                raise ValueError
1382

    
1383
    def _put_policy(self, node, policy, replace):
1384
        if replace:
1385
            for k, v in self.default_policy.iteritems():
1386
                if k not in policy:
1387
                    policy[k] = v
1388
        self.node.policy_set(node, policy)
1389

    
1390
    def _get_policy(self, node):
1391
        policy = self.default_policy.copy()
1392
        policy.update(self.node.policy_get(node))
1393
        return policy
1394

    
1395
    def _apply_versioning(self, account, container, version_id):
1396
        """Delete the provided version if such is the policy.
1397
           Return size of object removed.
1398
        """
1399

    
1400
        if version_id is None:
1401
            return 0
1402
        path, node = self._lookup_container(account, container)
1403
        versioning = self._get_policy(node)['versioning']
1404
        if versioning != 'auto':
1405
            hash, size = self.node.version_remove(version_id)
1406
            self.store.map_delete(hash)
1407
            return size
1408
        elif self.free_versioning:
1409
            return self.node.version_get_properties(
1410
                version_id, keys=('size',))[0]
1411
        return 0
1412

    
1413
    # Access control functions.
1414

    
1415
    def _check_groups(self, groups):
1416
        # raise ValueError('Bad characters in groups')
1417
        pass
1418

    
1419
    def _check_permissions(self, path, permissions):
1420
        # raise ValueError('Bad characters in permissions')
1421
        pass
1422

    
1423
    def _get_formatted_paths(self, paths):
1424
        formatted = []
1425
        for p in paths:
1426
            node = self.node.node_lookup(p)
1427
            props = None
1428
            if node is not None:
1429
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1430
            if props is not None:
1431
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1432
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1433
                formatted.append((p, self.MATCH_EXACT))
1434
        return formatted
1435

    
1436
    def _get_permissions_path(self, account, container, name):
1437
        path = '/'.join((account, container, name))
1438
        permission_paths = self.permissions.access_inherit(path)
1439
        permission_paths.sort()
1440
        permission_paths.reverse()
1441
        for p in permission_paths:
1442
            if p == path:
1443
                return p
1444
            else:
1445
                if p.count('/') < 2:
1446
                    continue
1447
                node = self.node.node_lookup(p)
1448
                props = None
1449
                if node is not None:
1450
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1451
                if props is not None:
1452
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1453
                        return p
1454
        return None
1455

    
1456
    def _can_read(self, user, account, container, name):
1457
        if user == account:
1458
            return True
1459
        path = '/'.join((account, container, name))
1460
        if self.permissions.public_get(path) is not None:
1461
            return True
1462
        path = self._get_permissions_path(account, container, name)
1463
        if not path:
1464
            raise NotAllowedError
1465
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1466
            raise NotAllowedError
1467

    
1468
    def _can_write(self, user, account, container, name):
1469
        if user == account:
1470
            return True
1471
        path = '/'.join((account, container, name))
1472
        path = self._get_permissions_path(account, container, name)
1473
        if not path:
1474
            raise NotAllowedError
1475
        if not self.permissions.access_check(path, self.WRITE, user):
1476
            raise NotAllowedError
1477

    
1478
    def _allowed_accounts(self, user):
1479
        allow = set()
1480
        for path in self.permissions.access_list_paths(user):
1481
            allow.add(path.split('/', 1)[0])
1482
        return sorted(allow)
1483

    
1484
    def _allowed_containers(self, user, account):
1485
        allow = set()
1486
        for path in self.permissions.access_list_paths(user, account):
1487
            allow.add(path.split('/', 2)[1])
1488
        return sorted(allow)