Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ b2691c21

History | View | Annotate | Download (60.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            self.quotaholder.reject_commission(
129
                        context     =   {},
130
                        clientkey   =   'pithos',
131
                        serials     =   serials)
132
            self.wrapper.rollback()
133
            raise
134
    return fn
135

    
136

    
137
class ModularBackend(BaseBackend):
138
    """A modular backend.
139

140
    Uses modules for SQL functions and storage.
141
    """
142

    
143
    def __init__(self, db_module=None, db_connection=None,
144
                 block_module=None, block_path=None, block_umask=None,
145
                 queue_module=None, queue_hosts=None,
146
                 queue_exchange=None, quotaholder_url=None,
147
                 free_versioning=True):
148
        db_module = db_module or DEFAULT_DB_MODULE
149
        db_connection = db_connection or DEFAULT_DB_CONNECTION
150
        block_module = block_module or DEFAULT_BLOCK_MODULE
151
        block_path = block_path or DEFAULT_BLOCK_PATH
152
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
153
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
154
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
155
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
156
        
157
        self.hash_algorithm = 'sha256'
158
        self.block_size = 4 * 1024 * 1024  # 4MB
159
        self.free_versioning = free_versioning
160

    
161
        self.default_policy = {'quota': DEFAULT_QUOTA,
162
                               'versioning': DEFAULT_VERSIONING}
163

    
164
        def load_module(m):
165
            __import__(m)
166
            return sys.modules[m]
167

    
168
        self.db_module = load_module(db_module)
169
        self.wrapper = self.db_module.DBWrapper(db_connection)
170
        params = {'wrapper': self.wrapper}
171
        self.permissions = self.db_module.Permissions(**params)
172
        self.config = self.db_module.Config(**params)
173
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
174
        for x in ['READ', 'WRITE']:
175
            setattr(self, x, getattr(self.db_module, x))
176
        self.node = self.db_module.Node(**params)
177
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
178
            setattr(self, x, getattr(self.db_module, x))
179

    
180
        self.block_module = load_module(block_module)
181
        params = {'path': block_path,
182
                  'block_size': self.block_size,
183
                  'hash_algorithm': self.hash_algorithm,
184
                  'umask': block_umask}
185
        self.store = self.block_module.Store(**params)
186

    
187
        if queue_module and queue_hosts:
188
            self.queue_module = load_module(queue_module)
189
            params = {'hosts': queue_hosts,
190
                              'exchange': queue_exchange,
191
                      'client_id': QUEUE_CLIENT_ID}
192
            self.queue = self.queue_module.Queue(**params)
193
        else:
194
            class NoQueue:
195
                def send(self, *args):
196
                    pass
197

    
198
                def close(self):
199
                    pass
200

    
201
            self.queue = NoQueue()
202

    
203
        self.quotaholder_url = quotaholder_url
204
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
205
        self.serials = []
206
        self.messages = []
207

    
208
    def close(self):
209
        self.wrapper.close()
210
        self.queue.close()
211

    
212
    @backend_method
213
    def list_accounts(self, user, marker=None, limit=10000):
214
        """Return a list of accounts the user can access."""
215

    
216
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
217
        allowed = self._allowed_accounts(user)
218
        start, limit = self._list_limits(allowed, marker, limit)
219
        return allowed[start:start + limit]
220

    
221
    @backend_method
222
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
223
        """Return a dictionary with the account metadata for the domain."""
224

    
225
        logger.debug(
226
            "get_account_meta: %s %s %s %s", user, account, domain, until)
227
        path, node = self._lookup_account(account, user == account)
228
        if user != account:
229
            if until or node is None or account not in self._allowed_accounts(user):
230
                raise NotAllowedError
231
        try:
232
            props = self._get_properties(node, until)
233
            mtime = props[self.MTIME]
234
        except NameError:
235
            props = None
236
            mtime = until
237
        count, bytes, tstamp = self._get_statistics(node, until)
238
        tstamp = max(tstamp, mtime)
239
        if until is None:
240
            modified = tstamp
241
        else:
242
            modified = self._get_statistics(
243
                node)[2]  # Overall last modification.
244
            modified = max(modified, mtime)
245

    
246
        if user != account:
247
            meta = {'name': account}
248
        else:
249
            meta = {}
250
            if props is not None and include_user_defined:
251
                meta.update(
252
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
253
            if until is not None:
254
                meta.update({'until_timestamp': tstamp})
255
            meta.update({'name': account, 'count': count, 'bytes': bytes})
256
        meta.update({'modified': modified})
257
        return meta
258

    
259
    @backend_method
260
    def update_account_meta(self, user, account, domain, meta, replace=False):
261
        """Update the metadata associated with the account for the domain."""
262

    
263
        logger.debug("update_account_meta: %s %s %s %s %s", user,
264
                     account, domain, meta, replace)
265
        if user != account:
266
            raise NotAllowedError
267
        path, node = self._lookup_account(account, True)
268
        self._put_metadata(user, node, domain, meta, replace)
269

    
270
    @backend_method
271
    def get_account_groups(self, user, account):
272
        """Return a dictionary with the user groups defined for this account."""
273

    
274
        logger.debug("get_account_groups: %s %s", user, account)
275
        if user != account:
276
            if account not in self._allowed_accounts(user):
277
                raise NotAllowedError
278
            return {}
279
        self._lookup_account(account, True)
280
        return self.permissions.group_dict(account)
281

    
282
    @backend_method
283
    def update_account_groups(self, user, account, groups, replace=False):
284
        """Update the groups associated with the account."""
285

    
286
        logger.debug("update_account_groups: %s %s %s %s", user,
287
                     account, groups, replace)
288
        if user != account:
289
            raise NotAllowedError
290
        self._lookup_account(account, True)
291
        self._check_groups(groups)
292
        if replace:
293
            self.permissions.group_destroy(account)
294
        for k, v in groups.iteritems():
295
            if not replace:  # If not already deleted.
296
                self.permissions.group_delete(account, k)
297
            if v:
298
                self.permissions.group_addmany(account, k, v)
299

    
300
    @backend_method
301
    def get_account_policy(self, user, account):
302
        """Return a dictionary with the account policy."""
303

    
304
        logger.debug("get_account_policy: %s %s", user, account)
305
        if user != account:
306
            if account not in self._allowed_accounts(user):
307
                raise NotAllowedError
308
            return {}
309
        path, node = self._lookup_account(account, True)
310
        return self._get_policy(node)
311

    
312
    @backend_method
313
    def update_account_policy(self, user, account, policy, replace=False):
314
        """Update the policy associated with the account."""
315

    
316
        logger.debug("update_account_policy: %s %s %s %s", user,
317
                     account, policy, replace)
318
        if user != account:
319
            raise NotAllowedError
320
        path, node = self._lookup_account(account, True)
321
        self._check_policy(policy)
322
        self._put_policy(node, policy, replace)
323

    
324
    @backend_method
325
    def put_account(self, user, account, policy={}):
326
        """Create a new account with the given name."""
327

    
328
        logger.debug("put_account: %s %s %s", user, account, policy)
329
        if user != account:
330
            raise NotAllowedError
331
        node = self.node.node_lookup(account)
332
        if node is not None:
333
            raise AccountExists('Account already exists')
334
        if policy:
335
            self._check_policy(policy)
336
        node = self._put_path(user, self.ROOTNODE, account)
337
        self._put_policy(node, policy, True)
338

    
339
    @backend_method
340
    def delete_account(self, user, account):
341
        """Delete the account with the given name."""
342

    
343
        logger.debug("delete_account: %s %s", user, account)
344
        if user != account:
345
            raise NotAllowedError
346
        node = self.node.node_lookup(account)
347
        if node is None:
348
            return
349
        if not self.node.node_remove(node):
350
            raise AccountNotEmpty('Account is not empty')
351
        self.permissions.group_destroy(account)
352

    
353
    @backend_method
354
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
355
        """Return a list of containers existing under an account."""
356

    
357
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
358
                     account, marker, limit, shared, until, public)
359
        if user != account:
360
            if until or account not in self._allowed_accounts(user):
361
                raise NotAllowedError
362
            allowed = self._allowed_containers(user, account)
363
            start, limit = self._list_limits(allowed, marker, limit)
364
            return allowed[start:start + limit]
365
        if shared or public:
366
            allowed = set()
367
            if shared:
368
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
369
            if public:
370
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
371
            allowed = sorted(allowed)
372
            start, limit = self._list_limits(allowed, marker, limit)
373
            return allowed[start:start + limit]
374
        node = self.node.node_lookup(account)
375
        containers = [x[0] for x in self._list_object_properties(
376
            node, account, '', '/', marker, limit, False, None, [], until)]
377
        start, limit = self._list_limits(
378
            [x[0] for x in containers], marker, limit)
379
        return containers[start:start + limit]
380

    
381
    @backend_method
382
    def list_container_meta(self, user, account, container, domain, until=None):
383
        """Return a list with all the container's object meta keys for the domain."""
384

    
385
        logger.debug("list_container_meta: %s %s %s %s %s", user,
386
                     account, container, domain, until)
387
        allowed = []
388
        if user != account:
389
            if until:
390
                raise NotAllowedError
391
            allowed = self.permissions.access_list_paths(
392
                user, '/'.join((account, container)))
393
            if not allowed:
394
                raise NotAllowedError
395
        path, node = self._lookup_container(account, container)
396
        before = until if until is not None else inf
397
        allowed = self._get_formatted_paths(allowed)
398
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
399

    
400
    @backend_method
401
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
402
        """Return a dictionary with the container metadata for the domain."""
403

    
404
        logger.debug("get_container_meta: %s %s %s %s %s", user,
405
                     account, container, domain, until)
406
        if user != account:
407
            if until or container not in self._allowed_containers(user, account):
408
                raise NotAllowedError
409
        path, node = self._lookup_container(account, container)
410
        props = self._get_properties(node, until)
411
        mtime = props[self.MTIME]
412
        count, bytes, tstamp = self._get_statistics(node, until)
413
        tstamp = max(tstamp, mtime)
414
        if until is None:
415
            modified = tstamp
416
        else:
417
            modified = self._get_statistics(
418
                node)[2]  # Overall last modification.
419
            modified = max(modified, mtime)
420

    
421
        if user != account:
422
            meta = {'name': container}
423
        else:
424
            meta = {}
425
            if include_user_defined:
426
                meta.update(
427
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
428
            if until is not None:
429
                meta.update({'until_timestamp': tstamp})
430
            meta.update({'name': container, 'count': count, 'bytes': bytes})
431
        meta.update({'modified': modified})
432
        return meta
433

    
434
    @backend_method
435
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
436
        """Update the metadata associated with the container for the domain."""
437

    
438
        logger.debug("update_container_meta: %s %s %s %s %s %s",
439
                     user, account, container, domain, meta, replace)
440
        if user != account:
441
            raise NotAllowedError
442
        path, node = self._lookup_container(account, container)
443
        src_version_id, dest_version_id = self._put_metadata(
444
            user, node, domain, meta, replace)
445
        if src_version_id is not None:
446
            versioning = self._get_policy(node)['versioning']
447
            if versioning != 'auto':
448
                self.node.version_remove(src_version_id)
449

    
450
    @backend_method
451
    def get_container_policy(self, user, account, container):
452
        """Return a dictionary with the container policy."""
453

    
454
        logger.debug(
455
            "get_container_policy: %s %s %s", user, account, container)
456
        if user != account:
457
            if container not in self._allowed_containers(user, account):
458
                raise NotAllowedError
459
            return {}
460
        path, node = self._lookup_container(account, container)
461
        return self._get_policy(node)
462

    
463
    @backend_method
464
    def update_container_policy(self, user, account, container, policy, replace=False):
465
        """Update the policy associated with the container."""
466

    
467
        logger.debug("update_container_policy: %s %s %s %s %s",
468
                     user, account, container, policy, replace)
469
        if user != account:
470
            raise NotAllowedError
471
        path, node = self._lookup_container(account, container)
472
        self._check_policy(policy)
473
        self._put_policy(node, policy, replace)
474

    
475
    @backend_method
476
    def put_container(self, user, account, container, policy={}):
477
        """Create a new container with the given name."""
478

    
479
        logger.debug(
480
            "put_container: %s %s %s %s", user, account, container, policy)
481
        if user != account:
482
            raise NotAllowedError
483
        try:
484
            path, node = self._lookup_container(account, container)
485
        except NameError:
486
            pass
487
        else:
488
            raise ContainerExists('Container already exists')
489
        if policy:
490
            self._check_policy(policy)
491
        path = '/'.join((account, container))
492
        node = self._put_path(
493
            user, self._lookup_account(account, True)[1], path)
494
        self._put_policy(node, policy, True)
495

    
496
    @backend_method
497
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
498
        """Delete/purge the container with the given name."""
499

    
500
        logger.debug("delete_container: %s %s %s %s %s %s", user,
501
                     account, container, until, prefix, delimiter)
502
        if user != account:
503
            raise NotAllowedError
504
        path, node = self._lookup_container(account, container)
505

    
506
        if until is not None:
507
            hashes, size, serials = self.node.node_purge_children(
508
                node, until, CLUSTER_HISTORY)
509
            for h in hashes:
510
                self.store.map_delete(h)
511
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
512
            self._report_size_change(user, account, -size,
513
                                     {'action':'container purge', 'path': path,
514
                                      'versions': ','.join(str(i) for i in serials)})
515
            return
516

    
517
        if not delimiter:
518
            if self._get_statistics(node)[0] > 0:
519
                raise ContainerNotEmpty('Container is not empty')
520
            hashes, size, serials = self.node.node_purge_children(
521
                node, inf, CLUSTER_HISTORY)
522
            for h in hashes:
523
                self.store.map_delete(h)
524
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
525
            self.node.node_remove(node)
526
            self._report_size_change(user, account, -size,
527
                                     {'action': 'container delete',
528
                                      'path': path,
529
                                      'versions': ','.join(str(i) for i in serials)})
530
        else:
531
            # remove only contents
532
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
533
            paths = []
534
            for t in src_names:
535
                path = '/'.join((account, container, t[0]))
536
                node = t[2]
537
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
538
                del_size = self._apply_versioning(
539
                    account, container, src_version_id)
540
                self._report_size_change(
541
                        user, account, -del_size, {
542
                                'action': 'object delete',
543
                                'path': path,
544
                             'versions': ','.join([str(dest_version_id)])
545
                     }
546
                )
547
                self._report_object_change(
548
                    user, account, path, details={'action': 'object delete'})
549
                paths.append(path)
550
            self.permissions.access_clear_bulk(paths)
551

    
552
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
553
        if user != account and until:
554
            raise NotAllowedError
555
        if shared and public:
556
            # get shared first
557
            shared = self._list_object_permissions(
558
                user, account, container, prefix, shared=True, public=False)
559
            objects = set()
560
            if shared:
561
                path, node = self._lookup_container(account, container)
562
                shared = self._get_formatted_paths(shared)
563
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
564

    
565
            # get public
566
            objects |= set(self._list_public_object_properties(
567
                user, account, container, prefix, all_props))
568
            objects = list(objects)
569

    
570
            objects.sort(key=lambda x: x[0])
571
            start, limit = self._list_limits(
572
                [x[0] for x in objects], marker, limit)
573
            return objects[start:start + limit]
574
        elif public:
575
            objects = self._list_public_object_properties(
576
                user, account, container, prefix, all_props)
577
            start, limit = self._list_limits(
578
                [x[0] for x in objects], marker, limit)
579
            return objects[start:start + limit]
580

    
581
        allowed = self._list_object_permissions(
582
            user, account, container, prefix, shared, public)
583
        if shared and not allowed:
584
            return []
585
        path, node = self._lookup_container(account, container)
586
        allowed = self._get_formatted_paths(allowed)
587
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
588
        start, limit = self._list_limits(
589
            [x[0] for x in objects], marker, limit)
590
        return objects[start:start + limit]
591

    
592
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
593
        public = self._list_object_permissions(
594
            user, account, container, prefix, shared=False, public=True)
595
        paths, nodes = self._lookup_objects(public)
596
        path = '/'.join((account, container))
597
        cont_prefix = path + '/'
598
        paths = [x[len(cont_prefix):] for x in paths]
599
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
600
        objects = [(path,) + props for path, props in zip(paths, props)]
601
        return objects
602

    
603
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
604
        objects = []
605
        while True:
606
            marker = objects[-1] if objects else None
607
            limit = 10000
608
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
609
            objects.extend(l)
610
            if not l or len(l) < limit:
611
                break
612
        return objects
613

    
614
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
615
        allowed = []
616
        path = '/'.join((account, container, prefix)).rstrip('/')
617
        if user != account:
618
            allowed = self.permissions.access_list_paths(user, path)
619
            if not allowed:
620
                raise NotAllowedError
621
        else:
622
            allowed = set()
623
            if shared:
624
                allowed.update(self.permissions.access_list_shared(path))
625
            if public:
626
                allowed.update(
627
                    [x[0] for x in self.permissions.public_list(path)])
628
            allowed = sorted(allowed)
629
            if not allowed:
630
                return []
631
        return allowed
632

    
633
    @backend_method
634
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
635
        """Return a list of object (name, version_id) tuples existing under a container."""
636

    
637
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
638
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
639

    
640
    @backend_method
641
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
642
        """Return a list of object metadata dicts existing under a container."""
643

    
644
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
645
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
646
        objects = []
647
        for p in props:
648
            if len(p) == 2:
649
                objects.append({'subdir': p[0]})
650
            else:
651
                objects.append({'name': p[0],
652
                                'bytes': p[self.SIZE + 1],
653
                                'type': p[self.TYPE + 1],
654
                                'hash': p[self.HASH + 1],
655
                                'version': p[self.SERIAL + 1],
656
                                'version_timestamp': p[self.MTIME + 1],
657
                                'modified': p[self.MTIME + 1] if until is None else None,
658
                                'modified_by': p[self.MUSER + 1],
659
                                'uuid': p[self.UUID + 1],
660
                                'checksum': p[self.CHECKSUM + 1]})
661
        return objects
662

    
663
    @backend_method
664
    def list_object_permissions(self, user, account, container, prefix=''):
665
        """Return a list of paths that enforce permissions under a container."""
666

    
667
        logger.debug("list_object_permissions: %s %s %s %s", user,
668
                     account, container, prefix)
669
        return self._list_object_permissions(user, account, container, prefix, True, False)
670

    
671
    @backend_method
672
    def list_object_public(self, user, account, container, prefix=''):
673
        """Return a dict mapping paths to public ids for objects that are public under a container."""
674

    
675
        logger.debug("list_object_public: %s %s %s %s", user,
676
                     account, container, prefix)
677
        public = {}
678
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
679
            public[path] = p + ULTIMATE_ANSWER
680
        return public
681

    
682
    @backend_method
683
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
684
        """Return a dictionary with the object metadata for the domain."""
685

    
686
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
687
                     account, container, name, domain, version)
688
        self._can_read(user, account, container, name)
689
        path, node = self._lookup_object(account, container, name)
690
        props = self._get_version(node, version)
691
        if version is None:
692
            modified = props[self.MTIME]
693
        else:
694
            try:
695
                modified = self._get_version(
696
                    node)[self.MTIME]  # Overall last modification.
697
            except NameError:  # Object may be deleted.
698
                del_props = self.node.version_lookup(
699
                    node, inf, CLUSTER_DELETED)
700
                if del_props is None:
701
                    raise ItemNotExists('Object does not exist')
702
                modified = del_props[self.MTIME]
703

    
704
        meta = {}
705
        if include_user_defined:
706
            meta.update(
707
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
708
        meta.update({'name': name,
709
                     'bytes': props[self.SIZE],
710
                     'type': props[self.TYPE],
711
                     'hash': props[self.HASH],
712
                     'version': props[self.SERIAL],
713
                     'version_timestamp': props[self.MTIME],
714
                     'modified': modified,
715
                     'modified_by': props[self.MUSER],
716
                     'uuid': props[self.UUID],
717
                     'checksum': props[self.CHECKSUM]})
718
        return meta
719

    
720
    @backend_method
721
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
722
        """Update the metadata associated with the object for the domain and return the new version."""
723

    
724
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
725
                     user, account, container, name, domain, meta, replace)
726
        self._can_write(user, account, container, name)
727
        path, node = self._lookup_object(account, container, name)
728
        src_version_id, dest_version_id = self._put_metadata(
729
            user, node, domain, meta, replace)
730
        self._apply_versioning(account, container, src_version_id)
731
        return dest_version_id
732

    
733
    @backend_method
734
    def get_object_permissions(self, user, account, container, name):
735
        """Return the action allowed on the object, the path
736
        from which the object gets its permissions from,
737
        along with a dictionary containing the permissions."""
738

    
739
        logger.debug("get_object_permissions: %s %s %s %s", user,
740
                     account, container, name)
741
        allowed = 'write'
742
        permissions_path = self._get_permissions_path(account, container, name)
743
        if user != account:
744
            if self.permissions.access_check(permissions_path, self.WRITE, user):
745
                allowed = 'write'
746
            elif self.permissions.access_check(permissions_path, self.READ, user):
747
                allowed = 'read'
748
            else:
749
                raise NotAllowedError
750
        self._lookup_object(account, container, name)
751
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
752

    
753
    @backend_method
754
    def update_object_permissions(self, user, account, container, name, permissions):
755
        """Update the permissions associated with the object."""
756

    
757
        logger.debug("update_object_permissions: %s %s %s %s %s",
758
                     user, account, container, name, permissions)
759
        if user != account:
760
            raise NotAllowedError
761
        path = self._lookup_object(account, container, name)[0]
762
        self._check_permissions(path, permissions)
763
        self.permissions.access_set(path, permissions)
764
        self._report_sharing_change(user, account, path, {'members':
765
                                    self.permissions.access_members(path)})
766

    
767
    @backend_method
768
    def get_object_public(self, user, account, container, name):
769
        """Return the public id of the object if applicable."""
770

    
771
        logger.debug(
772
            "get_object_public: %s %s %s %s", user, account, container, name)
773
        self._can_read(user, account, container, name)
774
        path = self._lookup_object(account, container, name)[0]
775
        p = self.permissions.public_get(path)
776
        if p is not None:
777
            p += ULTIMATE_ANSWER
778
        return p
779

    
780
    @backend_method
781
    def update_object_public(self, user, account, container, name, public):
782
        """Update the public status of the object."""
783

    
784
        logger.debug("update_object_public: %s %s %s %s %s", user,
785
                     account, container, name, public)
786
        self._can_write(user, account, container, name)
787
        path = self._lookup_object(account, container, name)[0]
788
        if not public:
789
            self.permissions.public_unset(path)
790
        else:
791
            self.permissions.public_set(path)
792

    
793
    @backend_method
794
    def get_object_hashmap(self, user, account, container, name, version=None):
795
        """Return the object's size and a list with partial hashes."""
796

    
797
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
798
                     account, container, name, version)
799
        self._can_read(user, account, container, name)
800
        path, node = self._lookup_object(account, container, name)
801
        props = self._get_version(node, version)
802
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
803
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
804

    
805
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
806
        if permissions is not None and user != account:
807
            raise NotAllowedError
808
        self._can_write(user, account, container, name)
809
        if permissions is not None:
810
            path = '/'.join((account, container, name))
811
            self._check_permissions(path, permissions)
812

    
813
        account_path, account_node = self._lookup_account(account, True)
814
        container_path, container_node = self._lookup_container(
815
            account, container)
816
        path, node = self._put_object_node(
817
            container_path, container_node, name)
818
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
819

    
820
        # Handle meta.
821
        if src_version_id is None:
822
            src_version_id = pre_version_id
823
        self._put_metadata_duplicate(
824
            src_version_id, dest_version_id, domain, meta, replace_meta)
825

    
826
        del_size = self._apply_versioning(account, container, pre_version_id)
827
        size_delta = size - del_size
828
        if not quotaholder_url:        # Check quota.
829
        if size_delta > 0:
830
            account_quota = long(self._get_policy(account_node)['quota'])
831
            container_quota = long(self._get_policy(container_node)['quota'])
832
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
833
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
834
                # This must be executed in a transaction, so the version is never created if it fails.
835
                raise QuotaError
836
        self._report_size_change(user, account, size_delta,
837
                                 {'action': 'object update', 'path': path,
838
                                  'versions': ','.join([str(dest_version_id)])})
839
        if permissions is not None:
840
            self.permissions.access_set(path, permissions)
841
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
842

    
843
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
844
        return dest_version_id
845

    
846
    @backend_method
847
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
848
        """Create/update an object with the specified size and partial hashes."""
849

    
850
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
851
                     account, container, name, size, type, hashmap, checksum)
852
        if size == 0:  # No such thing as an empty hashmap.
853
            hashmap = [self.put_block('')]
854
        map = HashMap(self.block_size, self.hash_algorithm)
855
        map.extend([binascii.unhexlify(x) for x in hashmap])
856
        missing = self.store.block_search(map)
857
        if missing:
858
            ie = IndexError()
859
            ie.data = [binascii.hexlify(x) for x in missing]
860
            raise ie
861

    
862
        hash = map.hash()
863
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
864
        self.store.map_put(hash, map)
865
        return dest_version_id
866

    
867
    @backend_method
868
    def update_object_checksum(self, user, account, container, name, version, checksum):
869
        """Update an object's checksum."""
870

    
871
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
872
                     user, account, container, name, version, checksum)
873
        # Update objects with greater version and same hashmap and size (fix metadata updates).
874
        self._can_write(user, account, container, name)
875
        path, node = self._lookup_object(account, container, name)
876
        props = self._get_version(node, version)
877
        versions = self.node.node_get_versions(node)
878
        for x in versions:
879
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
880
                self.node.version_put_property(
881
                    x[self.SERIAL], 'checksum', checksum)
882

    
883
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
884
        dest_version_ids = []
885
        self._can_read(user, src_account, src_container, src_name)
886
        path, node = self._lookup_object(src_account, src_container, src_name)
887
        # TODO: Will do another fetch of the properties in duplicate version...
888
        props = self._get_version(
889
            node, src_version)  # Check to see if source exists.
890
        src_version_id = props[self.SERIAL]
891
        hash = props[self.HASH]
892
        size = props[self.SIZE]
893
        is_copy = not is_move and (src_account, src_container, src_name) != (
894
            dest_account, dest_container, dest_name)  # New uuid.
895
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
896
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
897
            self._delete_object(user, src_account, src_container, src_name)
898

    
899
        if delimiter:
900
            prefix = src_name + \
901
                delimiter if not src_name.endswith(delimiter) else src_name
902
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
903
            src_names.sort(key=lambda x: x[2])  # order by nodes
904
            paths = [elem[0] for elem in src_names]
905
            nodes = [elem[2] for elem in src_names]
906
            # TODO: Will do another fetch of the properties in duplicate version...
907
            props = self._get_versions(nodes)  # Check to see if source exists.
908

    
909
            for prop, path, node in zip(props, paths, nodes):
910
                src_version_id = prop[self.SERIAL]
911
                hash = prop[self.HASH]
912
                vtype = prop[self.TYPE]
913
                size = prop[self.SIZE]
914
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
915
                    delimiter) else dest_name
916
                vdest_name = path.replace(prefix, dest_prefix, 1)
917
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
918
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
919
                    self._delete_object(user, src_account, src_container, path)
920
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
921

    
922
    @backend_method
923
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
924
        """Copy an object's data and metadata."""
925

    
926
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
927
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
928
        return dest_version_id
929

    
930
    @backend_method
931
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
932
        """Move an object's data and metadata."""
933

    
934
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
935
        if user != src_account:
936
            raise NotAllowedError
937
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
938
        return dest_version_id
939

    
940
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
941
        if user != account:
942
            raise NotAllowedError
943

    
944
        if until is not None:
945
            path = '/'.join((account, container, name))
946
            node = self.node.node_lookup(path)
947
            if node is None:
948
                return
949
            hashes = []
950
            size = 0
951
            serials = []
952
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
953
            hashes += h
954
            size += s
955
            serials += v
956
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
957
            hashes += h
958
            size += s
959
            serials += v
960
            for h in hashes:
961
                self.store.map_delete(h)
962
            self.node.node_purge(node, until, CLUSTER_DELETED)
963
            try:
964
                props = self._get_version(node)
965
            except NameError:
966
                self.permissions.access_clear(path)
967
            self._report_size_change(user, account, -size,
968
                                    {'action': 'object purge', 'path': path,
969
                                     'versions': ','.join(str(i) for i in serials)})
970
            return
971

    
972
        path, node = self._lookup_object(account, container, name)
973
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
974
        del_size = self._apply_versioning(account, container, src_version_id)
975
        self._report_size_change(user, account, -del_size,
976
                                 {'action': 'object delete', 'path': path,
977
                                  'versions': ','.join([str(dest_version_id)])})
978
        self._report_object_change(
979
            user, account, path, details={'action': 'object delete'})
980
        self.permissions.access_clear(path)
981

    
982
        if delimiter:
983
            prefix = name + delimiter if not name.endswith(delimiter) else name
984
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
985
            paths = []
986
            for t in src_names:
987
                path = '/'.join((account, container, t[0]))
988
                node = t[2]
989
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
990
                del_size = self._apply_versioning(
991
                    account, container, src_version_id)
992
                self._report_size_change(user, account, -del_size,
993
                                         {'action': 'object delete',
994
                                          'path': path,
995
                                          'versions': ','.join([str(dest_version_id)])})
996
                self._report_object_change(
997
                    user, account, path, details={'action': 'object delete'})
998
                paths.append(path)
999
            self.permissions.access_clear_bulk(paths)
1000

    
1001
    @backend_method
1002
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1003
        """Delete/purge an object."""
1004

    
1005
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1006
                     account, container, name, until, prefix, delimiter)
1007
        self._delete_object(user, account, container, name, until, delimiter)
1008

    
1009
    @backend_method
1010
    def list_versions(self, user, account, container, name):
1011
        """Return a list of all (version, version_timestamp) tuples for an object."""
1012

    
1013
        logger.debug(
1014
            "list_versions: %s %s %s %s", user, account, container, name)
1015
        self._can_read(user, account, container, name)
1016
        path, node = self._lookup_object(account, container, name)
1017
        versions = self.node.node_get_versions(node)
1018
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1019

    
1020
    @backend_method
1021
    def get_uuid(self, user, uuid):
1022
        """Return the (account, container, name) for the UUID given."""
1023

    
1024
        logger.debug("get_uuid: %s %s", user, uuid)
1025
        info = self.node.latest_uuid(uuid)
1026
        if info is None:
1027
            raise NameError
1028
        path, serial = info
1029
        account, container, name = path.split('/', 2)
1030
        self._can_read(user, account, container, name)
1031
        return (account, container, name)
1032

    
1033
    @backend_method
1034
    def get_public(self, user, public):
1035
        """Return the (account, container, name) for the public id given."""
1036

    
1037
        logger.debug("get_public: %s %s", user, public)
1038
        if public is None or public < ULTIMATE_ANSWER:
1039
            raise NameError
1040
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1041
        if path is None:
1042
            raise NameError
1043
        account, container, name = path.split('/', 2)
1044
        self._can_read(user, account, container, name)
1045
        return (account, container, name)
1046

    
1047
    @backend_method(autocommit=0)
1048
    def get_block(self, hash):
1049
        """Return a block's data."""
1050

    
1051
        logger.debug("get_block: %s", hash)
1052
        block = self.store.block_get(binascii.unhexlify(hash))
1053
        if not block:
1054
            raise ItemNotExists('Block does not exist')
1055
        return block
1056

    
1057
    @backend_method(autocommit=0)
1058
    def put_block(self, data):
1059
        """Store a block and return the hash."""
1060

    
1061
        logger.debug("put_block: %s", len(data))
1062
        return binascii.hexlify(self.store.block_put(data))
1063

    
1064
    @backend_method(autocommit=0)
1065
    def update_block(self, hash, data, offset=0):
1066
        """Update a known block and return the hash."""
1067

    
1068
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1069
        if offset == 0 and len(data) == self.block_size:
1070
            return self.put_block(data)
1071
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1072
        return binascii.hexlify(h)
1073

    
1074
    # Path functions.
1075

    
1076
    def _generate_uuid(self):
1077
        return str(uuidlib.uuid4())
1078

    
1079
    def _put_object_node(self, path, parent, name):
1080
        path = '/'.join((path, name))
1081
        node = self.node.node_lookup(path)
1082
        if node is None:
1083
            node = self.node.node_create(parent, path)
1084
        return path, node
1085

    
1086
    def _put_path(self, user, parent, path):
1087
        node = self.node.node_create(parent, path)
1088
        self.node.version_create(node, None, 0, '', None, user,
1089
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1090
        return node
1091

    
1092
    def _lookup_account(self, account, create=True):
1093
        node = self.node.node_lookup(account)
1094
        if node is None and create:
1095
            node = self._put_path(
1096
                account, self.ROOTNODE, account)  # User is account.
1097
        return account, node
1098

    
1099
    def _lookup_container(self, account, container):
1100
        path = '/'.join((account, container))
1101
        node = self.node.node_lookup(path)
1102
        if node is None:
1103
            raise ItemNotExists('Container does not exist')
1104
        return path, node
1105

    
1106
    def _lookup_object(self, account, container, name):
1107
        path = '/'.join((account, container, name))
1108
        node = self.node.node_lookup(path)
1109
        if node is None:
1110
            raise ItemNotExists('Object does not exist')
1111
        return path, node
1112

    
1113
    def _lookup_objects(self, paths):
1114
        nodes = self.node.node_lookup_bulk(paths)
1115
        return paths, nodes
1116

    
1117
    def _get_properties(self, node, until=None):
1118
        """Return properties until the timestamp given."""
1119

    
1120
        before = until if until is not None else inf
1121
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1122
        if props is None and until is not None:
1123
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1124
        if props is None:
1125
            raise ItemNotExists('Path does not exist')
1126
        return props
1127

    
1128
    def _get_statistics(self, node, until=None):
1129
        """Return count, sum of size and latest timestamp of everything under node."""
1130

    
1131
        if until is None:
1132
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1133
        else:
1134
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1135
        if stats is None:
1136
            stats = (0, 0, 0)
1137
        return stats
1138

    
1139
    def _get_version(self, node, version=None):
1140
        if version is None:
1141
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1142
            if props is None:
1143
                raise ItemNotExists('Object does not exist')
1144
        else:
1145
            try:
1146
                version = int(version)
1147
            except ValueError:
1148
                raise VersionNotExists('Version does not exist')
1149
            props = self.node.version_get_properties(version)
1150
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1151
                raise VersionNotExists('Version does not exist')
1152
        return props
1153

    
1154
    def _get_versions(self, nodes):
1155
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1156

    
1157
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1158
        """Create a new version of the node."""
1159

    
1160
        props = self.node.version_lookup(
1161
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1162
        if props is not None:
1163
            src_version_id = props[self.SERIAL]
1164
            src_hash = props[self.HASH]
1165
            src_size = props[self.SIZE]
1166
            src_type = props[self.TYPE]
1167
            src_checksum = props[self.CHECKSUM]
1168
        else:
1169
            src_version_id = None
1170
            src_hash = None
1171
            src_size = 0
1172
            src_type = ''
1173
            src_checksum = ''
1174
        if size is None:  # Set metadata.
1175
            hash = src_hash  # This way hash can be set to None (account or container).
1176
            size = src_size
1177
        if type is None:
1178
            type = src_type
1179
        if checksum is None:
1180
            checksum = src_checksum
1181
        uuid = self._generate_uuid(
1182
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1183

    
1184
        if src_node is None:
1185
            pre_version_id = src_version_id
1186
        else:
1187
            pre_version_id = None
1188
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1189
            if props is not None:
1190
                pre_version_id = props[self.SERIAL]
1191
        if pre_version_id is not None:
1192
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1193

    
1194
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1195
        return pre_version_id, dest_version_id
1196

    
1197
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1198
        if src_version_id is not None:
1199
            self.node.attribute_copy(src_version_id, dest_version_id)
1200
        if not replace:
1201
            self.node.attribute_del(dest_version_id, domain, (
1202
                k for k, v in meta.iteritems() if v == ''))
1203
            self.node.attribute_set(dest_version_id, domain, (
1204
                (k, v) for k, v in meta.iteritems() if v != ''))
1205
        else:
1206
            self.node.attribute_del(dest_version_id, domain)
1207
            self.node.attribute_set(dest_version_id, domain, ((
1208
                k, v) for k, v in meta.iteritems()))
1209

    
1210
    def _put_metadata(self, user, node, domain, meta, replace=False):
1211
        """Create a new version and store metadata."""
1212

    
1213
        src_version_id, dest_version_id = self._put_version_duplicate(
1214
            user, node)
1215
        self._put_metadata_duplicate(
1216
            src_version_id, dest_version_id, domain, meta, replace)
1217
        return src_version_id, dest_version_id
1218

    
1219
    def _list_limits(self, listing, marker, limit):
1220
        start = 0
1221
        if marker:
1222
            try:
1223
                start = listing.index(marker) + 1
1224
            except ValueError:
1225
                pass
1226
        if not limit or limit > 10000:
1227
            limit = 10000
1228
        return start, limit
1229

    
1230
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1231
        cont_prefix = path + '/'
1232
        prefix = cont_prefix + prefix
1233
        start = cont_prefix + marker if marker else None
1234
        before = until if until is not None else inf
1235
        filterq = keys if domain else []
1236
        sizeq = size_range
1237

    
1238
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1239
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1240
        objects.sort(key=lambda x: x[0])
1241
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1242
        return objects
1243

    
1244
    # Reporting functions.
1245

    
1246
    def _report_size_change(self, user, account, size, details={}):
1247
        account_node = self._lookup_account(account, True)[1]
1248
        total = self._get_statistics(account_node)[1]
1249
        details.update({'user': user, 'total': total})
1250
        logger.debug(
1251
            "_report_size_change: %s %s %s %s", user, account, size, details)
1252
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1253
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1254
                              float(size), details))
1255

    
1256
        serial = self.quotaholder.issue_commission(
1257
                context     =   {},
1258
                target      =   account,
1259
                key         =   '1',
1260
                clientkey   =   'pithos',
1261
                ownerkey    =   '',
1262
                name        =   details['path'] if 'path' in details else '',
1263
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1264
        )
1265
        self.serials.append(serial)
1266

    
1267
    def _report_object_change(self, user, account, path, details={}):
1268
        details.update({'user': user})
1269
        logger.debug("_report_object_change: %s %s %s %s", user,
1270
                     account, path, details)
1271
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1272
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1273

    
1274
    def _report_sharing_change(self, user, account, path, details={}):
1275
        logger.debug("_report_permissions_change: %s %s %s %s",
1276
                     user, account, path, details)
1277
        details.update({'user': user})
1278
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1279
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1280

    
1281
    # Policy functions.
1282

    
1283
    def _check_policy(self, policy):
1284
        for k in policy.keys():
1285
            if policy[k] == '':
1286
                policy[k] = self.default_policy.get(k)
1287
        for k, v in policy.iteritems():
1288
            if k == 'quota':
1289
                q = int(v)  # May raise ValueError.
1290
                if q < 0:
1291
                    raise ValueError
1292
            elif k == 'versioning':
1293
                if v not in ['auto', 'none']:
1294
                    raise ValueError
1295
            else:
1296
                raise ValueError
1297

    
1298
    def _put_policy(self, node, policy, replace):
1299
        if replace:
1300
            for k, v in self.default_policy.iteritems():
1301
                if k not in policy:
1302
                    policy[k] = v
1303
        self.node.policy_set(node, policy)
1304

    
1305
    def _get_policy(self, node):
1306
        policy = self.default_policy.copy()
1307
        policy.update(self.node.policy_get(node))
1308
        return policy
1309

    
1310
    def _apply_versioning(self, account, container, version_id):
1311
        """Delete the provided version if such is the policy.
1312
           Return size of object removed.
1313
        """
1314

    
1315
        if version_id is None:
1316
            return 0
1317
        path, node = self._lookup_container(account, container)
1318
        versioning = self._get_policy(node)['versioning']
1319
        if versioning != 'auto' or self.free_versioning:
1320
            hash, size = self.node.version_remove(version_id)
1321
            self.store.map_delete(hash)
1322
            return size
1323
        return 0
1324

    
1325
    # Access control functions.
1326

    
1327
    def _check_groups(self, groups):
1328
        # raise ValueError('Bad characters in groups')
1329
        pass
1330

    
1331
    def _check_permissions(self, path, permissions):
1332
        # raise ValueError('Bad characters in permissions')
1333
        pass
1334

    
1335
    def _get_formatted_paths(self, paths):
1336
        formatted = []
1337
        for p in paths:
1338
            node = self.node.node_lookup(p)
1339
            props = None
1340
            if node is not None:
1341
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1342
            if props is not None:
1343
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1344
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1345
                formatted.append((p, self.MATCH_EXACT))
1346
        return formatted
1347

    
1348
    def _get_permissions_path(self, account, container, name):
1349
        path = '/'.join((account, container, name))
1350
        permission_paths = self.permissions.access_inherit(path)
1351
        permission_paths.sort()
1352
        permission_paths.reverse()
1353
        for p in permission_paths:
1354
            if p == path:
1355
                return p
1356
            else:
1357
                if p.count('/') < 2:
1358
                    continue
1359
                node = self.node.node_lookup(p)
1360
                if node is not None:
1361
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1362
                if props is not None:
1363
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1364
                        return p
1365
        return None
1366

    
1367
    def _can_read(self, user, account, container, name):
1368
        if user == account:
1369
            return True
1370
        path = '/'.join((account, container, name))
1371
        if self.permissions.public_get(path) is not None:
1372
            return True
1373
        path = self._get_permissions_path(account, container, name)
1374
        if not path:
1375
            raise NotAllowedError
1376
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1377
            raise NotAllowedError
1378

    
1379
    def _can_write(self, user, account, container, name):
1380
        if user == account:
1381
            return True
1382
        path = '/'.join((account, container, name))
1383
        path = self._get_permissions_path(account, container, name)
1384
        if not path:
1385
            raise NotAllowedError
1386
        if not self.permissions.access_check(path, self.WRITE, user):
1387
            raise NotAllowedError
1388

    
1389
    def _allowed_accounts(self, user):
1390
        allow = set()
1391
        for path in self.permissions.access_list_paths(user):
1392
            allow.add(path.split('/', 1)[0])
1393
        return sorted(allow)
1394

    
1395
    def _allowed_containers(self, user, account):
1396
        allow = set()
1397
        for path in self.permissions.access_list_paths(user, account):
1398
            allow.add(path.split('/', 2)[1])
1399
        return sorted(allow)