Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 73fbe301

History | View | Annotate | Download (61.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            if serials:
129
                self.quotaholder.reject_commission(
130
                            context     =   {},
131
                            clientkey   =   'pithos',
132
                            serials     =   serials)
133
            self.wrapper.rollback()
134
            raise
135
    return fn
136

    
137

    
138
class ModularBackend(BaseBackend):
139
    """A modular backend.
140

141
    Uses modules for SQL functions and storage.
142
    """
143

    
144
    def __init__(self, db_module=None, db_connection=None,
145
                 block_module=None, block_path=None, block_umask=None,
146
                 queue_module=None, queue_hosts=None,
147
                 queue_exchange=None, quotaholder_url=None,
148
                 free_versioning=True):
149
        db_module = db_module or DEFAULT_DB_MODULE
150
        db_connection = db_connection or DEFAULT_DB_CONNECTION
151
        block_module = block_module or DEFAULT_BLOCK_MODULE
152
        block_path = block_path or DEFAULT_BLOCK_PATH
153
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
154
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
155
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
156
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
157
        
158
        self.hash_algorithm = 'sha256'
159
        self.block_size = 4 * 1024 * 1024  # 4MB
160
        self.free_versioning = free_versioning
161

    
162
        self.default_policy = {'quota': DEFAULT_QUOTA,
163
                               'versioning': DEFAULT_VERSIONING}
164

    
165
        def load_module(m):
166
            __import__(m)
167
            return sys.modules[m]
168

    
169
        self.db_module = load_module(db_module)
170
        self.wrapper = self.db_module.DBWrapper(db_connection)
171
        params = {'wrapper': self.wrapper}
172
        self.permissions = self.db_module.Permissions(**params)
173
        self.config = self.db_module.Config(**params)
174
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
175
        for x in ['READ', 'WRITE']:
176
            setattr(self, x, getattr(self.db_module, x))
177
        self.node = self.db_module.Node(**params)
178
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
179
            setattr(self, x, getattr(self.db_module, x))
180

    
181
        self.block_module = load_module(block_module)
182
        params = {'path': block_path,
183
                  'block_size': self.block_size,
184
                  'hash_algorithm': self.hash_algorithm,
185
                  'umask': block_umask}
186
        self.store = self.block_module.Store(**params)
187

    
188
        if queue_module and queue_hosts:
189
            self.queue_module = load_module(queue_module)
190
            params = {'hosts': queue_hosts,
191
                              'exchange': queue_exchange,
192
                      'client_id': QUEUE_CLIENT_ID}
193
            self.queue = self.queue_module.Queue(**params)
194
        else:
195
            class NoQueue:
196
                def send(self, *args):
197
                    pass
198

    
199
                def close(self):
200
                    pass
201

    
202
            self.queue = NoQueue()
203

    
204
        self.quotaholder_url = quotaholder_url
205
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
206
        self.serials = []
207
        self.messages = []
208

    
209
    def close(self):
210
        self.wrapper.close()
211
        self.queue.close()
212

    
213
    @backend_method
214
    def list_accounts(self, user, marker=None, limit=10000):
215
        """Return a list of accounts the user can access."""
216

    
217
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
218
        allowed = self._allowed_accounts(user)
219
        start, limit = self._list_limits(allowed, marker, limit)
220
        return allowed[start:start + limit]
221

    
222
    @backend_method
223
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
224
        """Return a dictionary with the account metadata for the domain."""
225

    
226
        logger.debug(
227
            "get_account_meta: %s %s %s %s", user, account, domain, until)
228
        path, node = self._lookup_account(account, user == account)
229
        if user != account:
230
            if until or node is None or account not in self._allowed_accounts(user):
231
                raise NotAllowedError
232
        try:
233
            props = self._get_properties(node, until)
234
            mtime = props[self.MTIME]
235
        except NameError:
236
            props = None
237
            mtime = until
238
        count, bytes, tstamp = self._get_statistics(node, until)
239
        tstamp = max(tstamp, mtime)
240
        if until is None:
241
            modified = tstamp
242
        else:
243
            modified = self._get_statistics(
244
                node)[2]  # Overall last modification.
245
            modified = max(modified, mtime)
246

    
247
        if user != account:
248
            meta = {'name': account}
249
        else:
250
            meta = {}
251
            if props is not None and include_user_defined:
252
                meta.update(
253
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
254
            if until is not None:
255
                meta.update({'until_timestamp': tstamp})
256
            meta.update({'name': account, 'count': count, 'bytes': bytes})
257
        meta.update({'modified': modified})
258
        return meta
259

    
260
    @backend_method
261
    def update_account_meta(self, user, account, domain, meta, replace=False):
262
        """Update the metadata associated with the account for the domain."""
263

    
264
        logger.debug("update_account_meta: %s %s %s %s %s", user,
265
                     account, domain, meta, replace)
266
        if user != account:
267
            raise NotAllowedError
268
        path, node = self._lookup_account(account, True)
269
        self._put_metadata(user, node, domain, meta, replace)
270

    
271
    @backend_method
272
    def get_account_groups(self, user, account):
273
        """Return a dictionary with the user groups defined for this account."""
274

    
275
        logger.debug("get_account_groups: %s %s", user, account)
276
        if user != account:
277
            if account not in self._allowed_accounts(user):
278
                raise NotAllowedError
279
            return {}
280
        self._lookup_account(account, True)
281
        return self.permissions.group_dict(account)
282

    
283
    @backend_method
284
    def update_account_groups(self, user, account, groups, replace=False):
285
        """Update the groups associated with the account."""
286

    
287
        logger.debug("update_account_groups: %s %s %s %s", user,
288
                     account, groups, replace)
289
        if user != account:
290
            raise NotAllowedError
291
        self._lookup_account(account, True)
292
        self._check_groups(groups)
293
        if replace:
294
            self.permissions.group_destroy(account)
295
        for k, v in groups.iteritems():
296
            if not replace:  # If not already deleted.
297
                self.permissions.group_delete(account, k)
298
            if v:
299
                self.permissions.group_addmany(account, k, v)
300

    
301
    @backend_method
302
    def get_account_policy(self, user, account):
303
        """Return a dictionary with the account policy."""
304

    
305
        logger.debug("get_account_policy: %s %s", user, account)
306
        if user != account:
307
            if account not in self._allowed_accounts(user):
308
                raise NotAllowedError
309
            return {}
310
        path, node = self._lookup_account(account, True)
311
        return self._get_policy(node)
312

    
313
    @backend_method
314
    def update_account_policy(self, user, account, policy, replace=False):
315
        """Update the policy associated with the account."""
316

    
317
        logger.debug("update_account_policy: %s %s %s %s", user,
318
                     account, policy, replace)
319
        if user != account:
320
            raise NotAllowedError
321
        path, node = self._lookup_account(account, True)
322
        self._check_policy(policy)
323
        self._put_policy(node, policy, replace)
324

    
325
    @backend_method
326
    def put_account(self, user, account, policy={}):
327
        """Create a new account with the given name."""
328

    
329
        logger.debug("put_account: %s %s %s", user, account, policy)
330
        if user != account:
331
            raise NotAllowedError
332
        node = self.node.node_lookup(account)
333
        if node is not None:
334
            raise AccountExists('Account already exists')
335
        if policy:
336
            self._check_policy(policy)
337
        node = self._put_path(user, self.ROOTNODE, account)
338
        self._put_policy(node, policy, True)
339

    
340
    @backend_method
341
    def delete_account(self, user, account):
342
        """Delete the account with the given name."""
343

    
344
        logger.debug("delete_account: %s %s", user, account)
345
        if user != account:
346
            raise NotAllowedError
347
        node = self.node.node_lookup(account)
348
        if node is None:
349
            return
350
        if not self.node.node_remove(node):
351
            raise AccountNotEmpty('Account is not empty')
352
        self.permissions.group_destroy(account)
353

    
354
    @backend_method
355
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
356
        """Return a list of containers existing under an account."""
357

    
358
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
359
                     account, marker, limit, shared, until, public)
360
        if user != account:
361
            if until or account not in self._allowed_accounts(user):
362
                raise NotAllowedError
363
            allowed = self._allowed_containers(user, account)
364
            start, limit = self._list_limits(allowed, marker, limit)
365
            return allowed[start:start + limit]
366
        if shared or public:
367
            allowed = set()
368
            if shared:
369
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
370
            if public:
371
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
372
            allowed = sorted(allowed)
373
            start, limit = self._list_limits(allowed, marker, limit)
374
            return allowed[start:start + limit]
375
        node = self.node.node_lookup(account)
376
        containers = [x[0] for x in self._list_object_properties(
377
            node, account, '', '/', marker, limit, False, None, [], until)]
378
        start, limit = self._list_limits(
379
            [x[0] for x in containers], marker, limit)
380
        return containers[start:start + limit]
381

    
382
    @backend_method
383
    def list_container_meta(self, user, account, container, domain, until=None):
384
        """Return a list with all the container's object meta keys for the domain."""
385

    
386
        logger.debug("list_container_meta: %s %s %s %s %s", user,
387
                     account, container, domain, until)
388
        allowed = []
389
        if user != account:
390
            if until:
391
                raise NotAllowedError
392
            allowed = self.permissions.access_list_paths(
393
                user, '/'.join((account, container)))
394
            if not allowed:
395
                raise NotAllowedError
396
        path, node = self._lookup_container(account, container)
397
        before = until if until is not None else inf
398
        allowed = self._get_formatted_paths(allowed)
399
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
400

    
401
    @backend_method
402
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
403
        """Return a dictionary with the container metadata for the domain."""
404

    
405
        logger.debug("get_container_meta: %s %s %s %s %s", user,
406
                     account, container, domain, until)
407
        if user != account:
408
            if until or container not in self._allowed_containers(user, account):
409
                raise NotAllowedError
410
        path, node = self._lookup_container(account, container)
411
        props = self._get_properties(node, until)
412
        mtime = props[self.MTIME]
413
        count, bytes, tstamp = self._get_statistics(node, until)
414
        tstamp = max(tstamp, mtime)
415
        if until is None:
416
            modified = tstamp
417
        else:
418
            modified = self._get_statistics(
419
                node)[2]  # Overall last modification.
420
            modified = max(modified, mtime)
421

    
422
        if user != account:
423
            meta = {'name': container}
424
        else:
425
            meta = {}
426
            if include_user_defined:
427
                meta.update(
428
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
429
            if until is not None:
430
                meta.update({'until_timestamp': tstamp})
431
            meta.update({'name': container, 'count': count, 'bytes': bytes})
432
        meta.update({'modified': modified})
433
        return meta
434

    
435
    @backend_method
436
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
437
        """Update the metadata associated with the container for the domain."""
438

    
439
        logger.debug("update_container_meta: %s %s %s %s %s %s",
440
                     user, account, container, domain, meta, replace)
441
        if user != account:
442
            raise NotAllowedError
443
        path, node = self._lookup_container(account, container)
444
        src_version_id, dest_version_id = self._put_metadata(
445
            user, node, domain, meta, replace)
446
        if src_version_id is not None:
447
            versioning = self._get_policy(node)['versioning']
448
            if versioning != 'auto':
449
                self.node.version_remove(src_version_id)
450

    
451
    @backend_method
452
    def get_container_policy(self, user, account, container):
453
        """Return a dictionary with the container policy."""
454

    
455
        logger.debug(
456
            "get_container_policy: %s %s %s", user, account, container)
457
        if user != account:
458
            if container not in self._allowed_containers(user, account):
459
                raise NotAllowedError
460
            return {}
461
        path, node = self._lookup_container(account, container)
462
        return self._get_policy(node)
463

    
464
    @backend_method
465
    def update_container_policy(self, user, account, container, policy, replace=False):
466
        """Update the policy associated with the container."""
467

    
468
        logger.debug("update_container_policy: %s %s %s %s %s",
469
                     user, account, container, policy, replace)
470
        if user != account:
471
            raise NotAllowedError
472
        path, node = self._lookup_container(account, container)
473
        self._check_policy(policy)
474
        self._put_policy(node, policy, replace)
475

    
476
    @backend_method
477
    def put_container(self, user, account, container, policy={}):
478
        """Create a new container with the given name."""
479

    
480
        logger.debug(
481
            "put_container: %s %s %s %s", user, account, container, policy)
482
        if user != account:
483
            raise NotAllowedError
484
        try:
485
            path, node = self._lookup_container(account, container)
486
        except NameError:
487
            pass
488
        else:
489
            raise ContainerExists('Container already exists')
490
        if policy:
491
            self._check_policy(policy)
492
        path = '/'.join((account, container))
493
        node = self._put_path(
494
            user, self._lookup_account(account, True)[1], path)
495
        self._put_policy(node, policy, True)
496

    
497
    @backend_method
498
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
499
        """Delete/purge the container with the given name."""
500

    
501
        logger.debug("delete_container: %s %s %s %s %s %s", user,
502
                     account, container, until, prefix, delimiter)
503
        if user != account:
504
            raise NotAllowedError
505
        path, node = self._lookup_container(account, container)
506

    
507
        if until is not None:
508
            hashes, size, serials = self.node.node_purge_children(
509
                node, until, CLUSTER_HISTORY)
510
            for h in hashes:
511
                self.store.map_delete(h)
512
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
513
            self._report_size_change(user, account, -size,
514
                                     {'action':'container purge', 'path': path,
515
                                      'versions': ','.join(str(i) for i in serials)})
516
            return
517

    
518
        if not delimiter:
519
            if self._get_statistics(node)[0] > 0:
520
                raise ContainerNotEmpty('Container is not empty')
521
            hashes, size, serials = self.node.node_purge_children(
522
                node, inf, CLUSTER_HISTORY)
523
            for h in hashes:
524
                self.store.map_delete(h)
525
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
526
            self.node.node_remove(node)
527
            self._report_size_change(user, account, -size,
528
                                     {'action': 'container delete',
529
                                      'path': path,
530
                                      'versions': ','.join(str(i) for i in serials)})
531
        else:
532
            # remove only contents
533
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
534
            paths = []
535
            for t in src_names:
536
                path = '/'.join((account, container, t[0]))
537
                node = t[2]
538
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
539
                del_size = self._apply_versioning(
540
                    account, container, src_version_id)
541
                self._report_size_change(
542
                        user, account, -del_size, {
543
                                'action': 'object delete',
544
                                'path': path,
545
                             'versions': ','.join([str(dest_version_id)])
546
                     }
547
                )
548
                self._report_object_change(
549
                    user, account, path, details={'action': 'object delete'})
550
                paths.append(path)
551
            self.permissions.access_clear_bulk(paths)
552

    
553
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
554
        if user != account and until:
555
            raise NotAllowedError
556
        if shared and public:
557
            # get shared first
558
            shared = self._list_object_permissions(
559
                user, account, container, prefix, shared=True, public=False)
560
            objects = set()
561
            if shared:
562
                path, node = self._lookup_container(account, container)
563
                shared = self._get_formatted_paths(shared)
564
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
565

    
566
            # get public
567
            objects |= set(self._list_public_object_properties(
568
                user, account, container, prefix, all_props))
569
            objects = list(objects)
570

    
571
            objects.sort(key=lambda x: x[0])
572
            start, limit = self._list_limits(
573
                [x[0] for x in objects], marker, limit)
574
            return objects[start:start + limit]
575
        elif public:
576
            objects = self._list_public_object_properties(
577
                user, account, container, prefix, all_props)
578
            start, limit = self._list_limits(
579
                [x[0] for x in objects], marker, limit)
580
            return objects[start:start + limit]
581

    
582
        allowed = self._list_object_permissions(
583
            user, account, container, prefix, shared, public)
584
        if shared and not allowed:
585
            return []
586
        path, node = self._lookup_container(account, container)
587
        allowed = self._get_formatted_paths(allowed)
588
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
589
        start, limit = self._list_limits(
590
            [x[0] for x in objects], marker, limit)
591
        return objects[start:start + limit]
592

    
593
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
594
        public = self._list_object_permissions(
595
            user, account, container, prefix, shared=False, public=True)
596
        paths, nodes = self._lookup_objects(public)
597
        path = '/'.join((account, container))
598
        cont_prefix = path + '/'
599
        paths = [x[len(cont_prefix):] for x in paths]
600
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
601
        objects = [(path,) + props for path, props in zip(paths, props)]
602
        return objects
603

    
604
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
605
        objects = []
606
        while True:
607
            marker = objects[-1] if objects else None
608
            limit = 10000
609
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
610
            objects.extend(l)
611
            if not l or len(l) < limit:
612
                break
613
        return objects
614

    
615
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
616
        allowed = []
617
        path = '/'.join((account, container, prefix)).rstrip('/')
618
        if user != account:
619
            allowed = self.permissions.access_list_paths(user, path)
620
            if not allowed:
621
                raise NotAllowedError
622
        else:
623
            allowed = set()
624
            if shared:
625
                allowed.update(self.permissions.access_list_shared(path))
626
            if public:
627
                allowed.update(
628
                    [x[0] for x in self.permissions.public_list(path)])
629
            allowed = sorted(allowed)
630
            if not allowed:
631
                return []
632
        return allowed
633

    
634
    @backend_method
635
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
636
        """Return a list of object (name, version_id) tuples existing under a container."""
637

    
638
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
639
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
640

    
641
    @backend_method
642
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
643
        """Return a list of object metadata dicts existing under a container."""
644

    
645
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
646
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
647
        objects = []
648
        for p in props:
649
            if len(p) == 2:
650
                objects.append({'subdir': p[0]})
651
            else:
652
                objects.append({'name': p[0],
653
                                'bytes': p[self.SIZE + 1],
654
                                'type': p[self.TYPE + 1],
655
                                'hash': p[self.HASH + 1],
656
                                'version': p[self.SERIAL + 1],
657
                                'version_timestamp': p[self.MTIME + 1],
658
                                'modified': p[self.MTIME + 1] if until is None else None,
659
                                'modified_by': p[self.MUSER + 1],
660
                                'uuid': p[self.UUID + 1],
661
                                'checksum': p[self.CHECKSUM + 1]})
662
        return objects
663

    
664
    @backend_method
665
    def list_object_permissions(self, user, account, container, prefix=''):
666
        """Return a list of paths that enforce permissions under a container."""
667

    
668
        logger.debug("list_object_permissions: %s %s %s %s", user,
669
                     account, container, prefix)
670
        return self._list_object_permissions(user, account, container, prefix, True, False)
671

    
672
    @backend_method
673
    def list_object_public(self, user, account, container, prefix=''):
674
        """Return a dict mapping paths to public ids for objects that are public under a container."""
675

    
676
        logger.debug("list_object_public: %s %s %s %s", user,
677
                     account, container, prefix)
678
        public = {}
679
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
680
            public[path] = p + ULTIMATE_ANSWER
681
        return public
682

    
683
    @backend_method
684
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
685
        """Return a dictionary with the object metadata for the domain."""
686

    
687
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
688
                     account, container, name, domain, version)
689
        self._can_read(user, account, container, name)
690
        path, node = self._lookup_object(account, container, name)
691
        props = self._get_version(node, version)
692
        if version is None:
693
            modified = props[self.MTIME]
694
        else:
695
            try:
696
                modified = self._get_version(
697
                    node)[self.MTIME]  # Overall last modification.
698
            except NameError:  # Object may be deleted.
699
                del_props = self.node.version_lookup(
700
                    node, inf, CLUSTER_DELETED)
701
                if del_props is None:
702
                    raise ItemNotExists('Object does not exist')
703
                modified = del_props[self.MTIME]
704

    
705
        meta = {}
706
        if include_user_defined:
707
            meta.update(
708
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
709
        meta.update({'name': name,
710
                     'bytes': props[self.SIZE],
711
                     'type': props[self.TYPE],
712
                     'hash': props[self.HASH],
713
                     'version': props[self.SERIAL],
714
                     'version_timestamp': props[self.MTIME],
715
                     'modified': modified,
716
                     'modified_by': props[self.MUSER],
717
                     'uuid': props[self.UUID],
718
                     'checksum': props[self.CHECKSUM]})
719
        return meta
720

    
721
    @backend_method
722
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
723
        """Update the metadata associated with the object for the domain and return the new version."""
724

    
725
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
726
                     user, account, container, name, domain, meta, replace)
727
        self._can_write(user, account, container, name)
728
        path, node = self._lookup_object(account, container, name)
729
        src_version_id, dest_version_id = self._put_metadata(
730
            user, node, domain, meta, replace)
731
        self._apply_versioning(account, container, src_version_id)
732
        return dest_version_id
733

    
734
    @backend_method
735
    def get_object_permissions(self, user, account, container, name):
736
        """Return the action allowed on the object, the path
737
        from which the object gets its permissions from,
738
        along with a dictionary containing the permissions."""
739

    
740
        logger.debug("get_object_permissions: %s %s %s %s", user,
741
                     account, container, name)
742
        allowed = 'write'
743
        permissions_path = self._get_permissions_path(account, container, name)
744
        if user != account:
745
            if self.permissions.access_check(permissions_path, self.WRITE, user):
746
                allowed = 'write'
747
            elif self.permissions.access_check(permissions_path, self.READ, user):
748
                allowed = 'read'
749
            else:
750
                raise NotAllowedError
751
        self._lookup_object(account, container, name)
752
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
753

    
754
    @backend_method
755
    def update_object_permissions(self, user, account, container, name, permissions):
756
        """Update the permissions associated with the object."""
757

    
758
        logger.debug("update_object_permissions: %s %s %s %s %s",
759
                     user, account, container, name, permissions)
760
        if user != account:
761
            raise NotAllowedError
762
        path = self._lookup_object(account, container, name)[0]
763
        self._check_permissions(path, permissions)
764
        self.permissions.access_set(path, permissions)
765
        self._report_sharing_change(user, account, path, {'members':
766
                                    self.permissions.access_members(path)})
767

    
768
    @backend_method
769
    def get_object_public(self, user, account, container, name):
770
        """Return the public id of the object if applicable."""
771

    
772
        logger.debug(
773
            "get_object_public: %s %s %s %s", user, account, container, name)
774
        self._can_read(user, account, container, name)
775
        path = self._lookup_object(account, container, name)[0]
776
        p = self.permissions.public_get(path)
777
        if p is not None:
778
            p += ULTIMATE_ANSWER
779
        return p
780

    
781
    @backend_method
782
    def update_object_public(self, user, account, container, name, public):
783
        """Update the public status of the object."""
784

    
785
        logger.debug("update_object_public: %s %s %s %s %s", user,
786
                     account, container, name, public)
787
        self._can_write(user, account, container, name)
788
        path = self._lookup_object(account, container, name)[0]
789
        if not public:
790
            self.permissions.public_unset(path)
791
        else:
792
            self.permissions.public_set(path)
793

    
794
    @backend_method
795
    def get_object_hashmap(self, user, account, container, name, version=None):
796
        """Return the object's size and a list with partial hashes."""
797

    
798
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
799
                     account, container, name, version)
800
        self._can_read(user, account, container, name)
801
        path, node = self._lookup_object(account, container, name)
802
        props = self._get_version(node, version)
803
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
804
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
805

    
806
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
807
        if permissions is not None and user != account:
808
            raise NotAllowedError
809
        self._can_write(user, account, container, name)
810
        if permissions is not None:
811
            path = '/'.join((account, container, name))
812
            self._check_permissions(path, permissions)
813

    
814
        account_path, account_node = self._lookup_account(account, True)
815
        container_path, container_node = self._lookup_container(
816
            account, container)
817
        path, node = self._put_object_node(
818
            container_path, container_node, name)
819
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
820

    
821
        # Handle meta.
822
        if src_version_id is None:
823
            src_version_id = pre_version_id
824
        self._put_metadata_duplicate(
825
            src_version_id, dest_version_id, domain, meta, replace_meta)
826

    
827
        del_size = self._apply_versioning(account, container, pre_version_id)
828
        size_delta = size - del_size
829
        if not self.quotaholder_url: # Check quota.
830
            if size_delta > 0:
831
                account_quota = long(self._get_policy(account_node)['quota'])
832
                account_usage = self._get_statistics(account_node)[1] + size_delta
833
                container_quota = long(self._get_policy(container_node)['quota'])
834
                container_usage = self._get_statistics(container_node)[1] + size_delta
835
                if (account_quota > 0 and account_usage > account_quota):
836
                    logger.error('account_quota: %s, account_usage: %s' % (
837
                        account_quota, account_usage
838
                    ))
839
                    raise QuotaError
840
                if (container_quota > 0 and container_usage > container_quota):
841
                    # This must be executed in a transaction, so the version is
842
                    # never created if it fails.
843
                    logger.error('container_quota: %s, container_usage: %s' % (
844
                        container_quota, container_usage
845
                    ))
846
                    raise QuotaError
847
        self._report_size_change(user, account, size_delta,
848
                                 {'action': 'object update', 'path': path,
849
                                  'versions': ','.join([str(dest_version_id)])})
850
        if permissions is not None:
851
            self.permissions.access_set(path, permissions)
852
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
853

    
854
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
855
        return dest_version_id
856

    
857
    @backend_method
858
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
859
        """Create/update an object with the specified size and partial hashes."""
860

    
861
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
862
                     account, container, name, size, type, hashmap, checksum)
863
        if size == 0:  # No such thing as an empty hashmap.
864
            hashmap = [self.put_block('')]
865
        map = HashMap(self.block_size, self.hash_algorithm)
866
        map.extend([binascii.unhexlify(x) for x in hashmap])
867
        missing = self.store.block_search(map)
868
        if missing:
869
            ie = IndexError()
870
            ie.data = [binascii.hexlify(x) for x in missing]
871
            raise ie
872

    
873
        hash = map.hash()
874
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
875
        self.store.map_put(hash, map)
876
        return dest_version_id
877

    
878
    @backend_method
879
    def update_object_checksum(self, user, account, container, name, version, checksum):
880
        """Update an object's checksum."""
881

    
882
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
883
                     user, account, container, name, version, checksum)
884
        # Update objects with greater version and same hashmap and size (fix metadata updates).
885
        self._can_write(user, account, container, name)
886
        path, node = self._lookup_object(account, container, name)
887
        props = self._get_version(node, version)
888
        versions = self.node.node_get_versions(node)
889
        for x in versions:
890
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
891
                self.node.version_put_property(
892
                    x[self.SERIAL], 'checksum', checksum)
893

    
894
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
895
        dest_version_ids = []
896
        self._can_read(user, src_account, src_container, src_name)
897
        path, node = self._lookup_object(src_account, src_container, src_name)
898
        # TODO: Will do another fetch of the properties in duplicate version...
899
        props = self._get_version(
900
            node, src_version)  # Check to see if source exists.
901
        src_version_id = props[self.SERIAL]
902
        hash = props[self.HASH]
903
        size = props[self.SIZE]
904
        is_copy = not is_move and (src_account, src_container, src_name) != (
905
            dest_account, dest_container, dest_name)  # New uuid.
906
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
907
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
908
            self._delete_object(user, src_account, src_container, src_name)
909

    
910
        if delimiter:
911
            prefix = src_name + \
912
                delimiter if not src_name.endswith(delimiter) else src_name
913
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
914
            src_names.sort(key=lambda x: x[2])  # order by nodes
915
            paths = [elem[0] for elem in src_names]
916
            nodes = [elem[2] for elem in src_names]
917
            # TODO: Will do another fetch of the properties in duplicate version...
918
            props = self._get_versions(nodes)  # Check to see if source exists.
919

    
920
            for prop, path, node in zip(props, paths, nodes):
921
                src_version_id = prop[self.SERIAL]
922
                hash = prop[self.HASH]
923
                vtype = prop[self.TYPE]
924
                size = prop[self.SIZE]
925
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
926
                    delimiter) else dest_name
927
                vdest_name = path.replace(prefix, dest_prefix, 1)
928
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
929
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
930
                    self._delete_object(user, src_account, src_container, path)
931
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
932

    
933
    @backend_method
934
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
935
        """Copy an object's data and metadata."""
936

    
937
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
938
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
939
        return dest_version_id
940

    
941
    @backend_method
942
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
943
        """Move an object's data and metadata."""
944

    
945
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
946
        if user != src_account:
947
            raise NotAllowedError
948
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
949
        return dest_version_id
950

    
951
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
952
        if user != account:
953
            raise NotAllowedError
954

    
955
        if until is not None:
956
            path = '/'.join((account, container, name))
957
            node = self.node.node_lookup(path)
958
            if node is None:
959
                return
960
            hashes = []
961
            size = 0
962
            serials = []
963
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
964
            hashes += h
965
            size += s
966
            serials += v
967
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
968
            hashes += h
969
            size += s
970
            serials += v
971
            for h in hashes:
972
                self.store.map_delete(h)
973
            self.node.node_purge(node, until, CLUSTER_DELETED)
974
            try:
975
                props = self._get_version(node)
976
            except NameError:
977
                self.permissions.access_clear(path)
978
            self._report_size_change(user, account, -size,
979
                                    {'action': 'object purge', 'path': path,
980
                                     'versions': ','.join(str(i) for i in serials)})
981
            return
982

    
983
        path, node = self._lookup_object(account, container, name)
984
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
985
        del_size = self._apply_versioning(account, container, src_version_id)
986
        self._report_size_change(user, account, -del_size,
987
                                 {'action': 'object delete', 'path': path,
988
                                  'versions': ','.join([str(dest_version_id)])})
989
        self._report_object_change(
990
            user, account, path, details={'action': 'object delete'})
991
        self.permissions.access_clear(path)
992

    
993
        if delimiter:
994
            prefix = name + delimiter if not name.endswith(delimiter) else name
995
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
996
            paths = []
997
            for t in src_names:
998
                path = '/'.join((account, container, t[0]))
999
                node = t[2]
1000
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1001
                del_size = self._apply_versioning(
1002
                    account, container, src_version_id)
1003
                self._report_size_change(user, account, -del_size,
1004
                                         {'action': 'object delete',
1005
                                          'path': path,
1006
                                          'versions': ','.join([str(dest_version_id)])})
1007
                self._report_object_change(
1008
                    user, account, path, details={'action': 'object delete'})
1009
                paths.append(path)
1010
            self.permissions.access_clear_bulk(paths)
1011

    
1012
    @backend_method
1013
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1014
        """Delete/purge an object."""
1015

    
1016
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1017
                     account, container, name, until, prefix, delimiter)
1018
        self._delete_object(user, account, container, name, until, delimiter)
1019

    
1020
    @backend_method
1021
    def list_versions(self, user, account, container, name):
1022
        """Return a list of all (version, version_timestamp) tuples for an object."""
1023

    
1024
        logger.debug(
1025
            "list_versions: %s %s %s %s", user, account, container, name)
1026
        self._can_read(user, account, container, name)
1027
        path, node = self._lookup_object(account, container, name)
1028
        versions = self.node.node_get_versions(node)
1029
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1030

    
1031
    @backend_method
1032
    def get_uuid(self, user, uuid):
1033
        """Return the (account, container, name) for the UUID given."""
1034

    
1035
        logger.debug("get_uuid: %s %s", user, uuid)
1036
        info = self.node.latest_uuid(uuid)
1037
        if info is None:
1038
            raise NameError
1039
        path, serial = info
1040
        account, container, name = path.split('/', 2)
1041
        self._can_read(user, account, container, name)
1042
        return (account, container, name)
1043

    
1044
    @backend_method
1045
    def get_public(self, user, public):
1046
        """Return the (account, container, name) for the public id given."""
1047

    
1048
        logger.debug("get_public: %s %s", user, public)
1049
        if public is None or public < ULTIMATE_ANSWER:
1050
            raise NameError
1051
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1052
        if path is None:
1053
            raise NameError
1054
        account, container, name = path.split('/', 2)
1055
        self._can_read(user, account, container, name)
1056
        return (account, container, name)
1057

    
1058
    @backend_method(autocommit=0)
1059
    def get_block(self, hash):
1060
        """Return a block's data."""
1061

    
1062
        logger.debug("get_block: %s", hash)
1063
        block = self.store.block_get(binascii.unhexlify(hash))
1064
        if not block:
1065
            raise ItemNotExists('Block does not exist')
1066
        return block
1067

    
1068
    @backend_method(autocommit=0)
1069
    def put_block(self, data):
1070
        """Store a block and return the hash."""
1071

    
1072
        logger.debug("put_block: %s", len(data))
1073
        return binascii.hexlify(self.store.block_put(data))
1074

    
1075
    @backend_method(autocommit=0)
1076
    def update_block(self, hash, data, offset=0):
1077
        """Update a known block and return the hash."""
1078

    
1079
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1080
        if offset == 0 and len(data) == self.block_size:
1081
            return self.put_block(data)
1082
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1083
        return binascii.hexlify(h)
1084

    
1085
    # Path functions.
1086

    
1087
    def _generate_uuid(self):
1088
        return str(uuidlib.uuid4())
1089

    
1090
    def _put_object_node(self, path, parent, name):
1091
        path = '/'.join((path, name))
1092
        node = self.node.node_lookup(path)
1093
        if node is None:
1094
            node = self.node.node_create(parent, path)
1095
        return path, node
1096

    
1097
    def _put_path(self, user, parent, path):
1098
        node = self.node.node_create(parent, path)
1099
        self.node.version_create(node, None, 0, '', None, user,
1100
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1101
        return node
1102

    
1103
    def _lookup_account(self, account, create=True):
1104
        node = self.node.node_lookup(account)
1105
        if node is None and create:
1106
            node = self._put_path(
1107
                account, self.ROOTNODE, account)  # User is account.
1108
        return account, node
1109

    
1110
    def _lookup_container(self, account, container):
1111
        path = '/'.join((account, container))
1112
        node = self.node.node_lookup(path)
1113
        if node is None:
1114
            raise ItemNotExists('Container does not exist')
1115
        return path, node
1116

    
1117
    def _lookup_object(self, account, container, name):
1118
        path = '/'.join((account, container, name))
1119
        node = self.node.node_lookup(path)
1120
        if node is None:
1121
            raise ItemNotExists('Object does not exist')
1122
        return path, node
1123

    
1124
    def _lookup_objects(self, paths):
1125
        nodes = self.node.node_lookup_bulk(paths)
1126
        return paths, nodes
1127

    
1128
    def _get_properties(self, node, until=None):
1129
        """Return properties until the timestamp given."""
1130

    
1131
        before = until if until is not None else inf
1132
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1133
        if props is None and until is not None:
1134
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1135
        if props is None:
1136
            raise ItemNotExists('Path does not exist')
1137
        return props
1138

    
1139
    def _get_statistics(self, node, until=None):
1140
        """Return count, sum of size and latest timestamp of everything under node."""
1141

    
1142
        if until is None:
1143
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1144
        else:
1145
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1146
        if stats is None:
1147
            stats = (0, 0, 0)
1148
        return stats
1149

    
1150
    def _get_version(self, node, version=None):
1151
        if version is None:
1152
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1153
            if props is None:
1154
                raise ItemNotExists('Object does not exist')
1155
        else:
1156
            try:
1157
                version = int(version)
1158
            except ValueError:
1159
                raise VersionNotExists('Version does not exist')
1160
            props = self.node.version_get_properties(version)
1161
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1162
                raise VersionNotExists('Version does not exist')
1163
        return props
1164

    
1165
    def _get_versions(self, nodes):
1166
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1167

    
1168
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1169
        """Create a new version of the node."""
1170

    
1171
        props = self.node.version_lookup(
1172
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1173
        if props is not None:
1174
            src_version_id = props[self.SERIAL]
1175
            src_hash = props[self.HASH]
1176
            src_size = props[self.SIZE]
1177
            src_type = props[self.TYPE]
1178
            src_checksum = props[self.CHECKSUM]
1179
        else:
1180
            src_version_id = None
1181
            src_hash = None
1182
            src_size = 0
1183
            src_type = ''
1184
            src_checksum = ''
1185
        if size is None:  # Set metadata.
1186
            hash = src_hash  # This way hash can be set to None (account or container).
1187
            size = src_size
1188
        if type is None:
1189
            type = src_type
1190
        if checksum is None:
1191
            checksum = src_checksum
1192
        uuid = self._generate_uuid(
1193
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1194

    
1195
        if src_node is None:
1196
            pre_version_id = src_version_id
1197
        else:
1198
            pre_version_id = None
1199
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1200
            if props is not None:
1201
                pre_version_id = props[self.SERIAL]
1202
        if pre_version_id is not None:
1203
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1204

    
1205
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1206
        return pre_version_id, dest_version_id
1207

    
1208
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1209
        if src_version_id is not None:
1210
            self.node.attribute_copy(src_version_id, dest_version_id)
1211
        if not replace:
1212
            self.node.attribute_del(dest_version_id, domain, (
1213
                k for k, v in meta.iteritems() if v == ''))
1214
            self.node.attribute_set(dest_version_id, domain, (
1215
                (k, v) for k, v in meta.iteritems() if v != ''))
1216
        else:
1217
            self.node.attribute_del(dest_version_id, domain)
1218
            self.node.attribute_set(dest_version_id, domain, ((
1219
                k, v) for k, v in meta.iteritems()))
1220

    
1221
    def _put_metadata(self, user, node, domain, meta, replace=False):
1222
        """Create a new version and store metadata."""
1223

    
1224
        src_version_id, dest_version_id = self._put_version_duplicate(
1225
            user, node)
1226
        self._put_metadata_duplicate(
1227
            src_version_id, dest_version_id, domain, meta, replace)
1228
        return src_version_id, dest_version_id
1229

    
1230
    def _list_limits(self, listing, marker, limit):
1231
        start = 0
1232
        if marker:
1233
            try:
1234
                start = listing.index(marker) + 1
1235
            except ValueError:
1236
                pass
1237
        if not limit or limit > 10000:
1238
            limit = 10000
1239
        return start, limit
1240

    
1241
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1242
        cont_prefix = path + '/'
1243
        prefix = cont_prefix + prefix
1244
        start = cont_prefix + marker if marker else None
1245
        before = until if until is not None else inf
1246
        filterq = keys if domain else []
1247
        sizeq = size_range
1248

    
1249
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1250
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1251
        objects.sort(key=lambda x: x[0])
1252
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1253
        return objects
1254

    
1255
    # Reporting functions.
1256

    
1257
    def _report_size_change(self, user, account, size, details={}):
1258
        account_node = self._lookup_account(account, True)[1]
1259
        total = self._get_statistics(account_node)[1]
1260
        details.update({'user': user, 'total': total})
1261
        logger.debug(
1262
            "_report_size_change: %s %s %s %s", user, account, size, details)
1263
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1264
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1265
                              float(size), details))
1266

    
1267
        if not self.quotaholder_url:
1268
            return
1269
        
1270
        serial = self.quotaholder.issue_commission(
1271
                context     =   {},
1272
                target      =   account,
1273
                key         =   '1',
1274
                clientkey   =   'pithos',
1275
                ownerkey    =   '',
1276
                name        =   details['path'] if 'path' in details else '',
1277
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1278
        )
1279
        self.serials.append(serial)
1280

    
1281
    def _report_object_change(self, user, account, path, details={}):
1282
        details.update({'user': user})
1283
        logger.debug("_report_object_change: %s %s %s %s", user,
1284
                     account, path, details)
1285
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1286
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1287

    
1288
    def _report_sharing_change(self, user, account, path, details={}):
1289
        logger.debug("_report_permissions_change: %s %s %s %s",
1290
                     user, account, path, details)
1291
        details.update({'user': user})
1292
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1293
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1294

    
1295
    # Policy functions.
1296

    
1297
    def _check_policy(self, policy):
1298
        for k in policy.keys():
1299
            if policy[k] == '':
1300
                policy[k] = self.default_policy.get(k)
1301
        for k, v in policy.iteritems():
1302
            if k == 'quota':
1303
                q = int(v)  # May raise ValueError.
1304
                if q < 0:
1305
                    raise ValueError
1306
            elif k == 'versioning':
1307
                if v not in ['auto', 'none']:
1308
                    raise ValueError
1309
            else:
1310
                raise ValueError
1311

    
1312
    def _put_policy(self, node, policy, replace):
1313
        if replace:
1314
            for k, v in self.default_policy.iteritems():
1315
                if k not in policy:
1316
                    policy[k] = v
1317
        self.node.policy_set(node, policy)
1318

    
1319
    def _get_policy(self, node):
1320
        policy = self.default_policy.copy()
1321
        policy.update(self.node.policy_get(node))
1322
        return policy
1323

    
1324
    def _apply_versioning(self, account, container, version_id):
1325
        """Delete the provided version if such is the policy.
1326
           Return size of object removed.
1327
        """
1328

    
1329
        if version_id is None:
1330
            return 0
1331
        path, node = self._lookup_container(account, container)
1332
        versioning = self._get_policy(node)['versioning']
1333
        if versioning != 'auto' or self.free_versioning:
1334
            hash, size = self.node.version_remove(version_id)
1335
            self.store.map_delete(hash)
1336
            return size
1337
        return 0
1338

    
1339
    # Access control functions.
1340

    
1341
    def _check_groups(self, groups):
1342
        # raise ValueError('Bad characters in groups')
1343
        pass
1344

    
1345
    def _check_permissions(self, path, permissions):
1346
        # raise ValueError('Bad characters in permissions')
1347
        pass
1348

    
1349
    def _get_formatted_paths(self, paths):
1350
        formatted = []
1351
        for p in paths:
1352
            node = self.node.node_lookup(p)
1353
            props = None
1354
            if node is not None:
1355
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1356
            if props is not None:
1357
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1358
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1359
                formatted.append((p, self.MATCH_EXACT))
1360
        return formatted
1361

    
1362
    def _get_permissions_path(self, account, container, name):
1363
        path = '/'.join((account, container, name))
1364
        permission_paths = self.permissions.access_inherit(path)
1365
        permission_paths.sort()
1366
        permission_paths.reverse()
1367
        for p in permission_paths:
1368
            if p == path:
1369
                return p
1370
            else:
1371
                if p.count('/') < 2:
1372
                    continue
1373
                node = self.node.node_lookup(p)
1374
                if node is not None:
1375
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1376
                if props is not None:
1377
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1378
                        return p
1379
        return None
1380

    
1381
    def _can_read(self, user, account, container, name):
1382
        if user == account:
1383
            return True
1384
        path = '/'.join((account, container, name))
1385
        if self.permissions.public_get(path) is not None:
1386
            return True
1387
        path = self._get_permissions_path(account, container, name)
1388
        if not path:
1389
            raise NotAllowedError
1390
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1391
            raise NotAllowedError
1392

    
1393
    def _can_write(self, user, account, container, name):
1394
        if user == account:
1395
            return True
1396
        path = '/'.join((account, container, name))
1397
        path = self._get_permissions_path(account, container, name)
1398
        if not path:
1399
            raise NotAllowedError
1400
        if not self.permissions.access_check(path, self.WRITE, user):
1401
            raise NotAllowedError
1402

    
1403
    def _allowed_accounts(self, user):
1404
        allow = set()
1405
        for path in self.permissions.access_list_paths(user):
1406
            allow.add(path.split('/', 1)[0])
1407
        return sorted(allow)
1408

    
1409
    def _allowed_containers(self, user, account):
1410
        allow = set()
1411
        for path in self.permissions.access_list_paths(user, account):
1412
            allow.add(path.split('/', 2)[1])
1413
        return sorted(allow)