Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ a7f7699d

History | View | Annotate | Download (60.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115
        try:
116
            ret = func(self, *args, **kw)
117
            for m in self.messages:
118
                self.queue.send(*m)
119
            if serials:
120
                self.quotaholder.accept_commission(
121
                            context     =   {},
122
                            clientkey   =   'pithos',
123
                            serials     =   serials)
124
            self.wrapper.commit()
125
            return ret
126
        except:
127
            self.quotaholder.reject_commission(
128
                        context     =   {},
129
                        clientkey   =   'pithos',
130
                        serials     =   serials)
131
            self.wrapper.rollback()
132
            raise
133
    return fn
134

    
135

    
136
class ModularBackend(BaseBackend):
137
    """A modular backend.
138

139
    Uses modules for SQL functions and storage.
140
    """
141

    
142
    def __init__(self, db_module=None, db_connection=None,
143
                 block_module=None, block_path=None, block_umask=None,
144
                 queue_module=None, queue_hosts=None,
145
                 queue_exchange=None, quotaholder_url=None):
146
        db_module = db_module or DEFAULT_DB_MODULE
147
        db_connection = db_connection or DEFAULT_DB_CONNECTION
148
        block_module = block_module or DEFAULT_BLOCK_MODULE
149
        block_path = block_path or DEFAULT_BLOCK_PATH
150
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
151
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
152
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
153
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
154
                
155
        self.hash_algorithm = 'sha256'
156
        self.block_size = 4 * 1024 * 1024  # 4MB
157

    
158
        self.default_policy = {'quota': DEFAULT_QUOTA,
159
                               'versioning': DEFAULT_VERSIONING}
160

    
161
        def load_module(m):
162
            __import__(m)
163
            return sys.modules[m]
164

    
165
        self.db_module = load_module(db_module)
166
        self.wrapper = self.db_module.DBWrapper(db_connection)
167
        params = {'wrapper': self.wrapper}
168
        self.permissions = self.db_module.Permissions(**params)
169
        self.config = self.db_module.Config(**params)
170
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
171
        for x in ['READ', 'WRITE']:
172
            setattr(self, x, getattr(self.db_module, x))
173
        self.node = self.db_module.Node(**params)
174
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
175
            setattr(self, x, getattr(self.db_module, x))
176

    
177
        self.block_module = load_module(block_module)
178
        params = {'path': block_path,
179
                  'block_size': self.block_size,
180
                  'hash_algorithm': self.hash_algorithm,
181
                  'umask': block_umask}
182
        self.store = self.block_module.Store(**params)
183

    
184
        if queue_module and queue_hosts:
185
            self.queue_module = load_module(queue_module)
186
            params = {'hosts': queue_hosts,
187
                          'exchange': queue_exchange,
188
                      'client_id': QUEUE_CLIENT_ID}
189
            self.queue = self.queue_module.Queue(**params)
190
        else:
191
            class NoQueue:
192
                def send(self, *args):
193
                    pass
194

    
195
                def close(self):
196
                    pass
197

    
198
            self.queue = NoQueue()
199

    
200
        self.quotaholder_url = quotaholder_url
201
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
202
        self.serials = []
203
        self.messages = []
204

    
205
    def close(self):
206
        self.wrapper.close()
207
        self.queue.close()
208

    
209
    @backend_method
210
    def list_accounts(self, user, marker=None, limit=10000):
211
        """Return a list of accounts the user can access."""
212

    
213
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
214
        allowed = self._allowed_accounts(user)
215
        start, limit = self._list_limits(allowed, marker, limit)
216
        return allowed[start:start + limit]
217

    
218
    @backend_method
219
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
220
        """Return a dictionary with the account metadata for the domain."""
221

    
222
        logger.debug(
223
            "get_account_meta: %s %s %s %s", user, account, domain, until)
224
        path, node = self._lookup_account(account, user == account)
225
        if user != account:
226
            if until or node is None or account not in self._allowed_accounts(user):
227
                raise NotAllowedError
228
        try:
229
            props = self._get_properties(node, until)
230
            mtime = props[self.MTIME]
231
        except NameError:
232
            props = None
233
            mtime = until
234
        count, bytes, tstamp = self._get_statistics(node, until)
235
        tstamp = max(tstamp, mtime)
236
        if until is None:
237
            modified = tstamp
238
        else:
239
            modified = self._get_statistics(
240
                node)[2]  # Overall last modification.
241
            modified = max(modified, mtime)
242

    
243
        if user != account:
244
            meta = {'name': account}
245
        else:
246
            meta = {}
247
            if props is not None and include_user_defined:
248
                meta.update(
249
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
250
            if until is not None:
251
                meta.update({'until_timestamp': tstamp})
252
            meta.update({'name': account, 'count': count, 'bytes': bytes})
253
        meta.update({'modified': modified})
254
        return meta
255

    
256
    @backend_method
257
    def update_account_meta(self, user, account, domain, meta, replace=False):
258
        """Update the metadata associated with the account for the domain."""
259

    
260
        logger.debug("update_account_meta: %s %s %s %s %s", user,
261
                     account, domain, meta, replace)
262
        if user != account:
263
            raise NotAllowedError
264
        path, node = self._lookup_account(account, True)
265
        self._put_metadata(user, node, domain, meta, replace)
266

    
267
    @backend_method
268
    def get_account_groups(self, user, account):
269
        """Return a dictionary with the user groups defined for this account."""
270

    
271
        logger.debug("get_account_groups: %s %s", user, account)
272
        if user != account:
273
            if account not in self._allowed_accounts(user):
274
                raise NotAllowedError
275
            return {}
276
        self._lookup_account(account, True)
277
        return self.permissions.group_dict(account)
278

    
279
    @backend_method
280
    def update_account_groups(self, user, account, groups, replace=False):
281
        """Update the groups associated with the account."""
282

    
283
        logger.debug("update_account_groups: %s %s %s %s", user,
284
                     account, groups, replace)
285
        if user != account:
286
            raise NotAllowedError
287
        self._lookup_account(account, True)
288
        self._check_groups(groups)
289
        if replace:
290
            self.permissions.group_destroy(account)
291
        for k, v in groups.iteritems():
292
            if not replace:  # If not already deleted.
293
                self.permissions.group_delete(account, k)
294
            if v:
295
                self.permissions.group_addmany(account, k, v)
296

    
297
    @backend_method
298
    def get_account_policy(self, user, account):
299
        """Return a dictionary with the account policy."""
300

    
301
        logger.debug("get_account_policy: %s %s", user, account)
302
        if user != account:
303
            if account not in self._allowed_accounts(user):
304
                raise NotAllowedError
305
            return {}
306
        path, node = self._lookup_account(account, True)
307
        return self._get_policy(node)
308

    
309
    @backend_method
310
    def update_account_policy(self, user, account, policy, replace=False):
311
        """Update the policy associated with the account."""
312

    
313
        logger.debug("update_account_policy: %s %s %s %s", user,
314
                     account, policy, replace)
315
        if user != account:
316
            raise NotAllowedError
317
        path, node = self._lookup_account(account, True)
318
        self._check_policy(policy)
319
        self._put_policy(node, policy, replace)
320

    
321
    @backend_method
322
    def put_account(self, user, account, policy={}):
323
        """Create a new account with the given name."""
324

    
325
        logger.debug("put_account: %s %s %s", user, account, policy)
326
        if user != account:
327
            raise NotAllowedError
328
        node = self.node.node_lookup(account)
329
        if node is not None:
330
            raise AccountExists('Account already exists')
331
        if policy:
332
            self._check_policy(policy)
333
        node = self._put_path(user, self.ROOTNODE, account)
334
        self._put_policy(node, policy, True)
335

    
336
    @backend_method
337
    def delete_account(self, user, account):
338
        """Delete the account with the given name."""
339

    
340
        logger.debug("delete_account: %s %s", user, account)
341
        if user != account:
342
            raise NotAllowedError
343
        node = self.node.node_lookup(account)
344
        if node is None:
345
            return
346
        if not self.node.node_remove(node):
347
            raise AccountNotEmpty('Account is not empty')
348
        self.permissions.group_destroy(account)
349

    
350
    @backend_method
351
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
352
        """Return a list of containers existing under an account."""
353

    
354
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
355
                     account, marker, limit, shared, until, public)
356
        if user != account:
357
            if until or account not in self._allowed_accounts(user):
358
                raise NotAllowedError
359
            allowed = self._allowed_containers(user, account)
360
            start, limit = self._list_limits(allowed, marker, limit)
361
            return allowed[start:start + limit]
362
        if shared or public:
363
            allowed = set()
364
            if shared:
365
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
366
            if public:
367
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
368
            allowed = sorted(allowed)
369
            start, limit = self._list_limits(allowed, marker, limit)
370
            return allowed[start:start + limit]
371
        node = self.node.node_lookup(account)
372
        containers = [x[0] for x in self._list_object_properties(
373
            node, account, '', '/', marker, limit, False, None, [], until)]
374
        start, limit = self._list_limits(
375
            [x[0] for x in containers], marker, limit)
376
        return containers[start:start + limit]
377

    
378
    @backend_method
379
    def list_container_meta(self, user, account, container, domain, until=None):
380
        """Return a list with all the container's object meta keys for the domain."""
381

    
382
        logger.debug("list_container_meta: %s %s %s %s %s", user,
383
                     account, container, domain, until)
384
        allowed = []
385
        if user != account:
386
            if until:
387
                raise NotAllowedError
388
            allowed = self.permissions.access_list_paths(
389
                user, '/'.join((account, container)))
390
            if not allowed:
391
                raise NotAllowedError
392
        path, node = self._lookup_container(account, container)
393
        before = until if until is not None else inf
394
        allowed = self._get_formatted_paths(allowed)
395
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
396

    
397
    @backend_method
398
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
399
        """Return a dictionary with the container metadata for the domain."""
400

    
401
        logger.debug("get_container_meta: %s %s %s %s %s", user,
402
                     account, container, domain, until)
403
        if user != account:
404
            if until or container not in self._allowed_containers(user, account):
405
                raise NotAllowedError
406
        path, node = self._lookup_container(account, container)
407
        props = self._get_properties(node, until)
408
        mtime = props[self.MTIME]
409
        count, bytes, tstamp = self._get_statistics(node, until)
410
        tstamp = max(tstamp, mtime)
411
        if until is None:
412
            modified = tstamp
413
        else:
414
            modified = self._get_statistics(
415
                node)[2]  # Overall last modification.
416
            modified = max(modified, mtime)
417

    
418
        if user != account:
419
            meta = {'name': container}
420
        else:
421
            meta = {}
422
            if include_user_defined:
423
                meta.update(
424
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
425
            if until is not None:
426
                meta.update({'until_timestamp': tstamp})
427
            meta.update({'name': container, 'count': count, 'bytes': bytes})
428
        meta.update({'modified': modified})
429
        return meta
430

    
431
    @backend_method
432
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
433
        """Update the metadata associated with the container for the domain."""
434

    
435
        logger.debug("update_container_meta: %s %s %s %s %s %s",
436
                     user, account, container, domain, meta, replace)
437
        if user != account:
438
            raise NotAllowedError
439
        path, node = self._lookup_container(account, container)
440
        src_version_id, dest_version_id = self._put_metadata(
441
            user, node, domain, meta, replace)
442
        if src_version_id is not None:
443
            versioning = self._get_policy(node)['versioning']
444
            if versioning != 'auto':
445
                self.node.version_remove(src_version_id)
446

    
447
    @backend_method
448
    def get_container_policy(self, user, account, container):
449
        """Return a dictionary with the container policy."""
450

    
451
        logger.debug(
452
            "get_container_policy: %s %s %s", user, account, container)
453
        if user != account:
454
            if container not in self._allowed_containers(user, account):
455
                raise NotAllowedError
456
            return {}
457
        path, node = self._lookup_container(account, container)
458
        return self._get_policy(node)
459

    
460
    @backend_method
461
    def update_container_policy(self, user, account, container, policy, replace=False):
462
        """Update the policy associated with the container."""
463

    
464
        logger.debug("update_container_policy: %s %s %s %s %s",
465
                     user, account, container, policy, replace)
466
        if user != account:
467
            raise NotAllowedError
468
        path, node = self._lookup_container(account, container)
469
        self._check_policy(policy)
470
        self._put_policy(node, policy, replace)
471

    
472
    @backend_method
473
    def put_container(self, user, account, container, policy={}):
474
        """Create a new container with the given name."""
475

    
476
        logger.debug(
477
            "put_container: %s %s %s %s", user, account, container, policy)
478
        if user != account:
479
            raise NotAllowedError
480
        try:
481
            path, node = self._lookup_container(account, container)
482
        except NameError:
483
            pass
484
        else:
485
            raise ContainerExists('Container already exists')
486
        if policy:
487
            self._check_policy(policy)
488
        path = '/'.join((account, container))
489
        node = self._put_path(
490
            user, self._lookup_account(account, True)[1], path)
491
        self._put_policy(node, policy, True)
492

    
493
    @backend_method
494
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
495
        """Delete/purge the container with the given name."""
496

    
497
        logger.debug("delete_container: %s %s %s %s %s %s", user,
498
                     account, container, until, prefix, delimiter)
499
        if user != account:
500
            raise NotAllowedError
501
        path, node = self._lookup_container(account, container)
502

    
503
        if until is not None:
504
            hashes, size, serials = self.node.node_purge_children(
505
                node, until, CLUSTER_HISTORY)
506
            for h in hashes:
507
                self.store.map_delete(h)
508
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
509
            self._report_size_change(user, account, -size,
510
                                     {'action':'container purge', 'path': path,
511
                                      'versions': serials})
512
            return
513

    
514
        if not delimiter:
515
            if self._get_statistics(node)[0] > 0:
516
                raise ContainerNotEmpty('Container is not empty')
517
            hashes, size, serials = self.node.node_purge_children(
518
                node, inf, CLUSTER_HISTORY)
519
            for h in hashes:
520
                self.store.map_delete(h)
521
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
522
            self.node.node_remove(node)
523
            self._report_size_change(user, account, -size,
524
                                     {'action': 'container delete',
525
                                      'path': path,
526
                                      'versions': serials})
527
        else:
528
                # remove only contents
529
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
530
            paths = []
531
            for t in src_names:
532
                path = '/'.join((account, container, t[0]))
533
                node = t[2]
534
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
535
                del_size = self._apply_versioning(
536
                    account, container, src_version_id)
537
                if del_size:
538
                    self._report_size_change(user, account, -del_size,
539
                                             {'action': 'object delete',
540
                                              'path': path, 'versions': [dest_version_id]})
541
                self._report_object_change(
542
                    user, account, path, details={'action': 'object delete'})
543
                paths.append(path)
544
            self.permissions.access_clear_bulk(paths)
545

    
546
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
547
        if user != account and until:
548
            raise NotAllowedError
549
        if shared and public:
550
            # get shared first
551
            shared = self._list_object_permissions(
552
                user, account, container, prefix, shared=True, public=False)
553
            objects = set()
554
            if shared:
555
                path, node = self._lookup_container(account, container)
556
                shared = self._get_formatted_paths(shared)
557
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
558

    
559
            # get public
560
            objects |= set(self._list_public_object_properties(
561
                user, account, container, prefix, all_props))
562
            objects = list(objects)
563

    
564
            objects.sort(key=lambda x: x[0])
565
            start, limit = self._list_limits(
566
                [x[0] for x in objects], marker, limit)
567
            return objects[start:start + limit]
568
        elif public:
569
            objects = self._list_public_object_properties(
570
                user, account, container, prefix, all_props)
571
            start, limit = self._list_limits(
572
                [x[0] for x in objects], marker, limit)
573
            return objects[start:start + limit]
574

    
575
        allowed = self._list_object_permissions(
576
            user, account, container, prefix, shared, public)
577
        if shared and not allowed:
578
            return []
579
        path, node = self._lookup_container(account, container)
580
        allowed = self._get_formatted_paths(allowed)
581
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
582
        start, limit = self._list_limits(
583
            [x[0] for x in objects], marker, limit)
584
        return objects[start:start + limit]
585

    
586
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
587
        public = self._list_object_permissions(
588
            user, account, container, prefix, shared=False, public=True)
589
        paths, nodes = self._lookup_objects(public)
590
        path = '/'.join((account, container))
591
        cont_prefix = path + '/'
592
        paths = [x[len(cont_prefix):] for x in paths]
593
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
594
        objects = [(path,) + props for path, props in zip(paths, props)]
595
        return objects
596

    
597
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
598
        objects = []
599
        while True:
600
            marker = objects[-1] if objects else None
601
            limit = 10000
602
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
603
            objects.extend(l)
604
            if not l or len(l) < limit:
605
                break
606
        return objects
607

    
608
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
609
        allowed = []
610
        path = '/'.join((account, container, prefix)).rstrip('/')
611
        if user != account:
612
            allowed = self.permissions.access_list_paths(user, path)
613
            if not allowed:
614
                raise NotAllowedError
615
        else:
616
            allowed = set()
617
            if shared:
618
                allowed.update(self.permissions.access_list_shared(path))
619
            if public:
620
                allowed.update(
621
                    [x[0] for x in self.permissions.public_list(path)])
622
            allowed = sorted(allowed)
623
            if not allowed:
624
                return []
625
        return allowed
626

    
627
    @backend_method
628
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
629
        """Return a list of object (name, version_id) tuples existing under a container."""
630

    
631
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
632
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
633

    
634
    @backend_method
635
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
636
        """Return a list of object metadata dicts existing under a container."""
637

    
638
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
639
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
640
        objects = []
641
        for p in props:
642
            if len(p) == 2:
643
                objects.append({'subdir': p[0]})
644
            else:
645
                objects.append({'name': p[0],
646
                                'bytes': p[self.SIZE + 1],
647
                                'type': p[self.TYPE + 1],
648
                                'hash': p[self.HASH + 1],
649
                                'version': p[self.SERIAL + 1],
650
                                'version_timestamp': p[self.MTIME + 1],
651
                                'modified': p[self.MTIME + 1] if until is None else None,
652
                                'modified_by': p[self.MUSER + 1],
653
                                'uuid': p[self.UUID + 1],
654
                                'checksum': p[self.CHECKSUM + 1]})
655
        return objects
656

    
657
    @backend_method
658
    def list_object_permissions(self, user, account, container, prefix=''):
659
        """Return a list of paths that enforce permissions under a container."""
660

    
661
        logger.debug("list_object_permissions: %s %s %s %s", user,
662
                     account, container, prefix)
663
        return self._list_object_permissions(user, account, container, prefix, True, False)
664

    
665
    @backend_method
666
    def list_object_public(self, user, account, container, prefix=''):
667
        """Return a dict mapping paths to public ids for objects that are public under a container."""
668

    
669
        logger.debug("list_object_public: %s %s %s %s", user,
670
                     account, container, prefix)
671
        public = {}
672
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
673
            public[path] = p + ULTIMATE_ANSWER
674
        return public
675

    
676
    @backend_method
677
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
678
        """Return a dictionary with the object metadata for the domain."""
679

    
680
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
681
                     account, container, name, domain, version)
682
        self._can_read(user, account, container, name)
683
        path, node = self._lookup_object(account, container, name)
684
        props = self._get_version(node, version)
685
        if version is None:
686
            modified = props[self.MTIME]
687
        else:
688
            try:
689
                modified = self._get_version(
690
                    node)[self.MTIME]  # Overall last modification.
691
            except NameError:  # Object may be deleted.
692
                del_props = self.node.version_lookup(
693
                    node, inf, CLUSTER_DELETED)
694
                if del_props is None:
695
                    raise ItemNotExists('Object does not exist')
696
                modified = del_props[self.MTIME]
697

    
698
        meta = {}
699
        if include_user_defined:
700
            meta.update(
701
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
702
        meta.update({'name': name,
703
                     'bytes': props[self.SIZE],
704
                     'type': props[self.TYPE],
705
                     'hash': props[self.HASH],
706
                     'version': props[self.SERIAL],
707
                     'version_timestamp': props[self.MTIME],
708
                     'modified': modified,
709
                     'modified_by': props[self.MUSER],
710
                     'uuid': props[self.UUID],
711
                     'checksum': props[self.CHECKSUM]})
712
        return meta
713

    
714
    @backend_method
715
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
716
        """Update the metadata associated with the object for the domain and return the new version."""
717

    
718
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
719
                     user, account, container, name, domain, meta, replace)
720
        self._can_write(user, account, container, name)
721
        path, node = self._lookup_object(account, container, name)
722
        src_version_id, dest_version_id = self._put_metadata(
723
            user, node, domain, meta, replace)
724
        self._apply_versioning(account, container, src_version_id)
725
        return dest_version_id
726

    
727
    @backend_method
728
    def get_object_permissions(self, user, account, container, name):
729
        """Return the action allowed on the object, the path
730
        from which the object gets its permissions from,
731
        along with a dictionary containing the permissions."""
732

    
733
        logger.debug("get_object_permissions: %s %s %s %s", user,
734
                     account, container, name)
735
        allowed = 'write'
736
        permissions_path = self._get_permissions_path(account, container, name)
737
        if user != account:
738
            if self.permissions.access_check(permissions_path, self.WRITE, user):
739
                allowed = 'write'
740
            elif self.permissions.access_check(permissions_path, self.READ, user):
741
                allowed = 'read'
742
            else:
743
                raise NotAllowedError
744
        self._lookup_object(account, container, name)
745
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
746

    
747
    @backend_method
748
    def update_object_permissions(self, user, account, container, name, permissions):
749
        """Update the permissions associated with the object."""
750

    
751
        logger.debug("update_object_permissions: %s %s %s %s %s",
752
                     user, account, container, name, permissions)
753
        if user != account:
754
            raise NotAllowedError
755
        path = self._lookup_object(account, container, name)[0]
756
        self._check_permissions(path, permissions)
757
        self.permissions.access_set(path, permissions)
758
        self._report_sharing_change(user, account, path, {'members':
759
                                    self.permissions.access_members(path)})
760

    
761
    @backend_method
762
    def get_object_public(self, user, account, container, name):
763
        """Return the public id of the object if applicable."""
764

    
765
        logger.debug(
766
            "get_object_public: %s %s %s %s", user, account, container, name)
767
        self._can_read(user, account, container, name)
768
        path = self._lookup_object(account, container, name)[0]
769
        p = self.permissions.public_get(path)
770
        if p is not None:
771
            p += ULTIMATE_ANSWER
772
        return p
773

    
774
    @backend_method
775
    def update_object_public(self, user, account, container, name, public):
776
        """Update the public status of the object."""
777

    
778
        logger.debug("update_object_public: %s %s %s %s %s", user,
779
                     account, container, name, public)
780
        self._can_write(user, account, container, name)
781
        path = self._lookup_object(account, container, name)[0]
782
        if not public:
783
            self.permissions.public_unset(path)
784
        else:
785
            self.permissions.public_set(path)
786

    
787
    @backend_method
788
    def get_object_hashmap(self, user, account, container, name, version=None):
789
        """Return the object's size and a list with partial hashes."""
790

    
791
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
792
                     account, container, name, version)
793
        self._can_read(user, account, container, name)
794
        path, node = self._lookup_object(account, container, name)
795
        props = self._get_version(node, version)
796
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
797
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
798

    
799
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
800
        if permissions is not None and user != account:
801
            raise NotAllowedError
802
        self._can_write(user, account, container, name)
803
        if permissions is not None:
804
            path = '/'.join((account, container, name))
805
            self._check_permissions(path, permissions)
806

    
807
        account_path, account_node = self._lookup_account(account, True)
808
        container_path, container_node = self._lookup_container(
809
            account, container)
810
        path, node = self._put_object_node(
811
            container_path, container_node, name)
812
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
813

    
814
        # Handle meta.
815
        if src_version_id is None:
816
            src_version_id = pre_version_id
817
        self._put_metadata_duplicate(
818
            src_version_id, dest_version_id, domain, meta, replace_meta)
819

    
820
        # Check quota.
821
        del_size = self._apply_versioning(account, container, pre_version_id)
822
        size_delta = size - del_size
823
        if size_delta > 0:
824
            account_quota = long(self._get_policy(account_node)['quota'])
825
            container_quota = long(self._get_policy(container_node)['quota'])
826
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
827
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
828
                # This must be executed in a transaction, so the version is never created if it fails.
829
                raise QuotaError
830
        self._report_size_change(user, account, size_delta,
831
                                 {'action': 'object update', 'path': path,
832
                                  'versions': [dest_version_id]})
833

    
834
        if permissions is not None:
835
            self.permissions.access_set(path, permissions)
836
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
837

    
838
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
839
        return dest_version_id
840

    
841
    @backend_method
842
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
843
        """Create/update an object with the specified size and partial hashes."""
844

    
845
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
846
                     account, container, name, size, type, hashmap, checksum)
847
        if size == 0:  # No such thing as an empty hashmap.
848
            hashmap = [self.put_block('')]
849
        map = HashMap(self.block_size, self.hash_algorithm)
850
        map.extend([binascii.unhexlify(x) for x in hashmap])
851
        missing = self.store.block_search(map)
852
        if missing:
853
            ie = IndexError()
854
            ie.data = [binascii.hexlify(x) for x in missing]
855
            raise ie
856

    
857
        hash = map.hash()
858
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
859
        self.store.map_put(hash, map)
860
        return dest_version_id
861

    
862
    @backend_method
863
    def update_object_checksum(self, user, account, container, name, version, checksum):
864
        """Update an object's checksum."""
865

    
866
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
867
                     user, account, container, name, version, checksum)
868
        # Update objects with greater version and same hashmap and size (fix metadata updates).
869
        self._can_write(user, account, container, name)
870
        path, node = self._lookup_object(account, container, name)
871
        props = self._get_version(node, version)
872
        versions = self.node.node_get_versions(node)
873
        for x in versions:
874
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
875
                self.node.version_put_property(
876
                    x[self.SERIAL], 'checksum', checksum)
877

    
878
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
879
        dest_version_ids = []
880
        self._can_read(user, src_account, src_container, src_name)
881
        path, node = self._lookup_object(src_account, src_container, src_name)
882
        # TODO: Will do another fetch of the properties in duplicate version...
883
        props = self._get_version(
884
            node, src_version)  # Check to see if source exists.
885
        src_version_id = props[self.SERIAL]
886
        hash = props[self.HASH]
887
        size = props[self.SIZE]
888
        is_copy = not is_move and (src_account, src_container, src_name) != (
889
            dest_account, dest_container, dest_name)  # New uuid.
890
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
891
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
892
            self._delete_object(user, src_account, src_container, src_name)
893

    
894
        if delimiter:
895
            prefix = src_name + \
896
                delimiter if not src_name.endswith(delimiter) else src_name
897
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
898
            src_names.sort(key=lambda x: x[2])  # order by nodes
899
            paths = [elem[0] for elem in src_names]
900
            nodes = [elem[2] for elem in src_names]
901
            # TODO: Will do another fetch of the properties in duplicate version...
902
            props = self._get_versions(nodes)  # Check to see if source exists.
903

    
904
            for prop, path, node in zip(props, paths, nodes):
905
                src_version_id = prop[self.SERIAL]
906
                hash = prop[self.HASH]
907
                vtype = prop[self.TYPE]
908
                size = prop[self.SIZE]
909
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
910
                    delimiter) else dest_name
911
                vdest_name = path.replace(prefix, dest_prefix, 1)
912
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
913
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
914
                    self._delete_object(user, src_account, src_container, path)
915
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
916

    
917
    @backend_method
918
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
919
        """Copy an object's data and metadata."""
920

    
921
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
922
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
923
        return dest_version_id
924

    
925
    @backend_method
926
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
927
        """Move an object's data and metadata."""
928

    
929
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
930
        if user != src_account:
931
            raise NotAllowedError
932
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
933
        return dest_version_id
934

    
935
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
936
        if user != account:
937
            raise NotAllowedError
938

    
939
        if until is not None:
940
            path = '/'.join((account, container, name))
941
            node = self.node.node_lookup(path)
942
            if node is None:
943
                return
944
            hashes = []
945
            size = 0
946
            serials = []
947
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
948
            hashes += h
949
            size += s
950
            serials += v
951
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
952
            hashes += h
953
            size += s
954
            serials += v
955
            for h in hashes:
956
                self.store.map_delete(h)
957
            self.node.node_purge(node, until, CLUSTER_DELETED)
958
            try:
959
                props = self._get_version(node)
960
            except NameError:
961
                self.permissions.access_clear(path)
962
            self._report_size_change(user, account, -size,
963
                                    {'action': 'object purge', 'path': path,
964
                                     'versions': serials})
965
            return
966

    
967
        path, node = self._lookup_object(account, container, name)
968
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
969
        del_size = self._apply_versioning(account, container, src_version_id)
970
        if del_size:
971
            self._report_size_change(user, account, -del_size,
972
                                     {'action': 'object delete', 'path': path,
973
                                      'versions': [dest_version_id]})
974
        self._report_object_change(
975
            user, account, path, details={'action': 'object delete'})
976
        self.permissions.access_clear(path)
977

    
978
        if delimiter:
979
            prefix = name + delimiter if not name.endswith(delimiter) else name
980
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
981
            paths = []
982
            for t in src_names:
983
                path = '/'.join((account, container, t[0]))
984
                node = t[2]
985
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
986
                del_size = self._apply_versioning(
987
                    account, container, src_version_id)
988
                if del_size:
989
                    self._report_size_change(user, account, -del_size,
990
                                             {'action': 'object delete',
991
                                              'path': path,
992
                                              'versions': [dest_version_id]})
993
                self._report_object_change(
994
                    user, account, path, details={'action': 'object delete'})
995
                paths.append(path)
996
            self.permissions.access_clear_bulk(paths)
997

    
998
    @backend_method
999
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1000
        """Delete/purge an object."""
1001

    
1002
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1003
                     account, container, name, until, prefix, delimiter)
1004
        self._delete_object(user, account, container, name, until, delimiter)
1005

    
1006
    @backend_method
1007
    def list_versions(self, user, account, container, name):
1008
        """Return a list of all (version, version_timestamp) tuples for an object."""
1009

    
1010
        logger.debug(
1011
            "list_versions: %s %s %s %s", user, account, container, name)
1012
        self._can_read(user, account, container, name)
1013
        path, node = self._lookup_object(account, container, name)
1014
        versions = self.node.node_get_versions(node)
1015
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1016

    
1017
    @backend_method
1018
    def get_uuid(self, user, uuid):
1019
        """Return the (account, container, name) for the UUID given."""
1020

    
1021
        logger.debug("get_uuid: %s %s", user, uuid)
1022
        info = self.node.latest_uuid(uuid)
1023
        if info is None:
1024
            raise NameError
1025
        path, serial = info
1026
        account, container, name = path.split('/', 2)
1027
        self._can_read(user, account, container, name)
1028
        return (account, container, name)
1029

    
1030
    @backend_method
1031
    def get_public(self, user, public):
1032
        """Return the (account, container, name) for the public id given."""
1033

    
1034
        logger.debug("get_public: %s %s", user, public)
1035
        if public is None or public < ULTIMATE_ANSWER:
1036
            raise NameError
1037
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1038
        if path is None:
1039
            raise NameError
1040
        account, container, name = path.split('/', 2)
1041
        self._can_read(user, account, container, name)
1042
        return (account, container, name)
1043

    
1044
    @backend_method(autocommit=0)
1045
    def get_block(self, hash):
1046
        """Return a block's data."""
1047

    
1048
        logger.debug("get_block: %s", hash)
1049
        block = self.store.block_get(binascii.unhexlify(hash))
1050
        if not block:
1051
            raise ItemNotExists('Block does not exist')
1052
        return block
1053

    
1054
    @backend_method(autocommit=0)
1055
    def put_block(self, data):
1056
        """Store a block and return the hash."""
1057

    
1058
        logger.debug("put_block: %s", len(data))
1059
        return binascii.hexlify(self.store.block_put(data))
1060

    
1061
    @backend_method(autocommit=0)
1062
    def update_block(self, hash, data, offset=0):
1063
        """Update a known block and return the hash."""
1064

    
1065
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1066
        if offset == 0 and len(data) == self.block_size:
1067
            return self.put_block(data)
1068
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1069
        return binascii.hexlify(h)
1070

    
1071
    # Path functions.
1072

    
1073
    def _generate_uuid(self):
1074
        return str(uuidlib.uuid4())
1075

    
1076
    def _put_object_node(self, path, parent, name):
1077
        path = '/'.join((path, name))
1078
        node = self.node.node_lookup(path)
1079
        if node is None:
1080
            node = self.node.node_create(parent, path)
1081
        return path, node
1082

    
1083
    def _put_path(self, user, parent, path):
1084
        node = self.node.node_create(parent, path)
1085
        self.node.version_create(node, None, 0, '', None, user,
1086
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1087
        return node
1088

    
1089
    def _lookup_account(self, account, create=True):
1090
        node = self.node.node_lookup(account)
1091
        if node is None and create:
1092
            node = self._put_path(
1093
                account, self.ROOTNODE, account)  # User is account.
1094
        return account, node
1095

    
1096
    def _lookup_container(self, account, container):
1097
        path = '/'.join((account, container))
1098
        node = self.node.node_lookup(path)
1099
        if node is None:
1100
            raise ItemNotExists('Container does not exist')
1101
        return path, node
1102

    
1103
    def _lookup_object(self, account, container, name):
1104
        path = '/'.join((account, container, name))
1105
        node = self.node.node_lookup(path)
1106
        if node is None:
1107
            raise ItemNotExists('Object does not exist')
1108
        return path, node
1109

    
1110
    def _lookup_objects(self, paths):
1111
        nodes = self.node.node_lookup_bulk(paths)
1112
        return paths, nodes
1113

    
1114
    def _get_properties(self, node, until=None):
1115
        """Return properties until the timestamp given."""
1116

    
1117
        before = until if until is not None else inf
1118
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1119
        if props is None and until is not None:
1120
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1121
        if props is None:
1122
            raise ItemNotExists('Path does not exist')
1123
        return props
1124

    
1125
    def _get_statistics(self, node, until=None):
1126
        """Return count, sum of size and latest timestamp of everything under node."""
1127

    
1128
        if until is None:
1129
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1130
        else:
1131
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1132
        if stats is None:
1133
            stats = (0, 0, 0)
1134
        return stats
1135

    
1136
    def _get_version(self, node, version=None):
1137
        if version is None:
1138
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1139
            if props is None:
1140
                raise ItemNotExists('Object does not exist')
1141
        else:
1142
            try:
1143
                version = int(version)
1144
            except ValueError:
1145
                raise VersionNotExists('Version does not exist')
1146
            props = self.node.version_get_properties(version)
1147
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1148
                raise VersionNotExists('Version does not exist')
1149
        return props
1150

    
1151
    def _get_versions(self, nodes):
1152
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1153

    
1154
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1155
        """Create a new version of the node."""
1156

    
1157
        props = self.node.version_lookup(
1158
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1159
        if props is not None:
1160
            src_version_id = props[self.SERIAL]
1161
            src_hash = props[self.HASH]
1162
            src_size = props[self.SIZE]
1163
            src_type = props[self.TYPE]
1164
            src_checksum = props[self.CHECKSUM]
1165
        else:
1166
            src_version_id = None
1167
            src_hash = None
1168
            src_size = 0
1169
            src_type = ''
1170
            src_checksum = ''
1171
        if size is None:  # Set metadata.
1172
            hash = src_hash  # This way hash can be set to None (account or container).
1173
            size = src_size
1174
        if type is None:
1175
            type = src_type
1176
        if checksum is None:
1177
            checksum = src_checksum
1178
        uuid = self._generate_uuid(
1179
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1180

    
1181
        if src_node is None:
1182
            pre_version_id = src_version_id
1183
        else:
1184
            pre_version_id = None
1185
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1186
            if props is not None:
1187
                pre_version_id = props[self.SERIAL]
1188
        if pre_version_id is not None:
1189
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1190

    
1191
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1192
        return pre_version_id, dest_version_id
1193

    
1194
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1195
        if src_version_id is not None:
1196
            self.node.attribute_copy(src_version_id, dest_version_id)
1197
        if not replace:
1198
            self.node.attribute_del(dest_version_id, domain, (
1199
                k for k, v in meta.iteritems() if v == ''))
1200
            self.node.attribute_set(dest_version_id, domain, (
1201
                (k, v) for k, v in meta.iteritems() if v != ''))
1202
        else:
1203
            self.node.attribute_del(dest_version_id, domain)
1204
            self.node.attribute_set(dest_version_id, domain, ((
1205
                k, v) for k, v in meta.iteritems()))
1206

    
1207
    def _put_metadata(self, user, node, domain, meta, replace=False):
1208
        """Create a new version and store metadata."""
1209

    
1210
        src_version_id, dest_version_id = self._put_version_duplicate(
1211
            user, node)
1212
        self._put_metadata_duplicate(
1213
            src_version_id, dest_version_id, domain, meta, replace)
1214
        return src_version_id, dest_version_id
1215

    
1216
    def _list_limits(self, listing, marker, limit):
1217
        start = 0
1218
        if marker:
1219
            try:
1220
                start = listing.index(marker) + 1
1221
            except ValueError:
1222
                pass
1223
        if not limit or limit > 10000:
1224
            limit = 10000
1225
        return start, limit
1226

    
1227
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1228
        cont_prefix = path + '/'
1229
        prefix = cont_prefix + prefix
1230
        start = cont_prefix + marker if marker else None
1231
        before = until if until is not None else inf
1232
        filterq = keys if domain else []
1233
        sizeq = size_range
1234

    
1235
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1236
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1237
        objects.sort(key=lambda x: x[0])
1238
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1239
        return objects
1240

    
1241
    # Reporting functions.
1242

    
1243
    def _report_size_change(self, user, account, size, details={}):
1244
        account_node = self._lookup_account(account, True)[1]
1245
        total = self._get_statistics(account_node)[1]
1246
        details.update({'user': user, 'total': total})
1247
        logger.debug(
1248
            "_report_size_change: %s %s %s %s", user, account, size, details)
1249
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1250
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1251
                              float(size), details))
1252

    
1253
        serial = self.quotaholder.issue_commission(
1254
                context     =   {},
1255
                target      =   account,
1256
                key         =   '1',
1257
                clientkey   =   'pithos',
1258
                ownerkey    =   '',
1259
                provisions  =   (('pithos+', 'pithos+ : diskspace', size),)
1260
        )
1261
        self.serials.append(serial)
1262

    
1263
    def _report_object_change(self, user, account, path, details={}):
1264
        details.update({'user': user})
1265
        logger.debug("_report_object_change: %s %s %s %s", user,
1266
                     account, path, details)
1267
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1268
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1269

    
1270
    def _report_sharing_change(self, user, account, path, details={}):
1271
        logger.debug("_report_permissions_change: %s %s %s %s",
1272
                     user, account, path, details)
1273
        details.update({'user': user})
1274
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1275
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1276

    
1277
    # Policy functions.
1278

    
1279
    def _check_policy(self, policy):
1280
        for k in policy.keys():
1281
            if policy[k] == '':
1282
                policy[k] = self.default_policy.get(k)
1283
        for k, v in policy.iteritems():
1284
            if k == 'quota':
1285
                q = int(v)  # May raise ValueError.
1286
                if q < 0:
1287
                    raise ValueError
1288
            elif k == 'versioning':
1289
                if v not in ['auto', 'none']:
1290
                    raise ValueError
1291
            else:
1292
                raise ValueError
1293

    
1294
    def _put_policy(self, node, policy, replace):
1295
        if replace:
1296
            for k, v in self.default_policy.iteritems():
1297
                if k not in policy:
1298
                    policy[k] = v
1299
        self.node.policy_set(node, policy)
1300

    
1301
    def _get_policy(self, node):
1302
        policy = self.default_policy.copy()
1303
        policy.update(self.node.policy_get(node))
1304
        return policy
1305

    
1306
    def _apply_versioning(self, account, container, version_id):
1307
        """Delete the provided version if such is the policy.
1308
           Return size of object removed.
1309
        """
1310

    
1311
        if version_id is None:
1312
            return 0
1313
        path, node = self._lookup_container(account, container)
1314
        versioning = self._get_policy(node)['versioning']
1315
        if versioning != 'auto':
1316
            hash, size = self.node.version_remove(version_id)
1317
            self.store.map_delete(hash)
1318
            return size
1319
        return 0
1320

    
1321
    # Access control functions.
1322

    
1323
    def _check_groups(self, groups):
1324
        # raise ValueError('Bad characters in groups')
1325
        pass
1326

    
1327
    def _check_permissions(self, path, permissions):
1328
        # raise ValueError('Bad characters in permissions')
1329
        pass
1330

    
1331
    def _get_formatted_paths(self, paths):
1332
        formatted = []
1333
        for p in paths:
1334
            node = self.node.node_lookup(p)
1335
            if node is not None:
1336
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1337
            if props is not None:
1338
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1339
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1340
                formatted.append((p, self.MATCH_EXACT))
1341
        return formatted
1342

    
1343
    def _get_permissions_path(self, account, container, name):
1344
        path = '/'.join((account, container, name))
1345
        permission_paths = self.permissions.access_inherit(path)
1346
        permission_paths.sort()
1347
        permission_paths.reverse()
1348
        for p in permission_paths:
1349
            if p == path:
1350
                return p
1351
            else:
1352
                if p.count('/') < 2:
1353
                    continue
1354
                node = self.node.node_lookup(p)
1355
                if node is not None:
1356
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1357
                if props is not None:
1358
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1359
                        return p
1360
        return None
1361

    
1362
    def _can_read(self, user, account, container, name):
1363
        if user == account:
1364
            return True
1365
        path = '/'.join((account, container, name))
1366
        if self.permissions.public_get(path) is not None:
1367
            return True
1368
        path = self._get_permissions_path(account, container, name)
1369
        if not path:
1370
            raise NotAllowedError
1371
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1372
            raise NotAllowedError
1373

    
1374
    def _can_write(self, user, account, container, name):
1375
        if user == account:
1376
            return True
1377
        path = '/'.join((account, container, name))
1378
        path = self._get_permissions_path(account, container, name)
1379
        if not path:
1380
            raise NotAllowedError
1381
        if not self.permissions.access_check(path, self.WRITE, user):
1382
            raise NotAllowedError
1383

    
1384
    def _allowed_accounts(self, user):
1385
        allow = set()
1386
        for path in self.permissions.access_list_paths(user):
1387
            allow.add(path.split('/', 1)[0])
1388
        return sorted(allow)
1389

    
1390
    def _allowed_containers(self, user, account):
1391
        allow = set()
1392
        for path in self.permissions.access_list_paths(user, account):
1393
            allow.add(path.split('/', 2)[1])
1394
        return sorted(allow)