Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 470984e0

History | View | Annotate | Download (60.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = self.serials
113
        self.messages = []
114
        try:
115
            ret = func(self, *args, **kw)
116
            for m in self.messages:
117
                self.queue.send(*m)
118
            if serials:
119
                self.quotaholder.accept_commission(
120
                            context     =   {},
121
                            clientkey   =   'pithos',
122
                            serials     =   serials)
123
            self.wrapper.commit()
124
            return ret
125
        except:
126
            self.quotaholder.reject_commission(
127
                        context     =   {},
128
                        clientkey   =   'pithos',
129
                        serials     =   serials)
130
            self.wrapper.rollback()
131
            raise
132
    return fn
133

    
134

    
135
class ModularBackend(BaseBackend):
136
    """A modular backend.
137

138
    Uses modules for SQL functions and storage.
139
    """
140

    
141
    def __init__(self, db_module=None, db_connection=None,
142
                 block_module=None, block_path=None, block_umask=None,
143
                 queue_module=None, queue_hosts=None,
144
                 queue_exchange=None, quotaholder_url=None):
145
        db_module = db_module or DEFAULT_DB_MODULE
146
        db_connection = db_connection or DEFAULT_DB_CONNECTION
147
        block_module = block_module or DEFAULT_BLOCK_MODULE
148
        block_path = block_path or DEFAULT_BLOCK_PATH
149
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
150
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
151
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
152
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
153
                
154
        self.hash_algorithm = 'sha256'
155
        self.block_size = 4 * 1024 * 1024  # 4MB
156

    
157
        self.default_policy = {'quota': DEFAULT_QUOTA,
158
                               'versioning': DEFAULT_VERSIONING}
159

    
160
        def load_module(m):
161
            __import__(m)
162
            return sys.modules[m]
163

    
164
        self.db_module = load_module(db_module)
165
        self.wrapper = self.db_module.DBWrapper(db_connection)
166
        params = {'wrapper': self.wrapper}
167
        self.permissions = self.db_module.Permissions(**params)
168
        self.config = self.db_module.Config(**params)
169
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
170
        for x in ['READ', 'WRITE']:
171
            setattr(self, x, getattr(self.db_module, x))
172
        self.node = self.db_module.Node(**params)
173
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
174
            setattr(self, x, getattr(self.db_module, x))
175

    
176
        self.block_module = load_module(block_module)
177
        params = {'path': block_path,
178
                  'block_size': self.block_size,
179
                  'hash_algorithm': self.hash_algorithm,
180
                  'umask': block_umask}
181
        self.store = self.block_module.Store(**params)
182

    
183
        if queue_module and queue_hosts:
184
            self.queue_module = load_module(queue_module)
185
            params = {'hosts': queue_hosts,
186
                          'exchange': queue_exchange,
187
                      'client_id': QUEUE_CLIENT_ID}
188
            self.queue = self.queue_module.Queue(**params)
189
        else:
190
            class NoQueue:
191
                def send(self, *args):
192
                    pass
193

    
194
                def close(self):
195
                    pass
196

    
197
            self.queue = NoQueue()
198

    
199
        self.quotaholder_url = quotaholder_url
200
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
201
        self.serials = []
202

    
203
    def close(self):
204
        self.wrapper.close()
205
        self.queue.close()
206

    
207
    @backend_method
208
    def list_accounts(self, user, marker=None, limit=10000):
209
        """Return a list of accounts the user can access."""
210

    
211
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
212
        allowed = self._allowed_accounts(user)
213
        start, limit = self._list_limits(allowed, marker, limit)
214
        return allowed[start:start + limit]
215

    
216
    @backend_method
217
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
218
        """Return a dictionary with the account metadata for the domain."""
219

    
220
        logger.debug(
221
            "get_account_meta: %s %s %s %s", user, account, domain, until)
222
        path, node = self._lookup_account(account, user == account)
223
        if user != account:
224
            if until or node is None or account not in self._allowed_accounts(user):
225
                raise NotAllowedError
226
        try:
227
            props = self._get_properties(node, until)
228
            mtime = props[self.MTIME]
229
        except NameError:
230
            props = None
231
            mtime = until
232
        count, bytes, tstamp = self._get_statistics(node, until)
233
        tstamp = max(tstamp, mtime)
234
        if until is None:
235
            modified = tstamp
236
        else:
237
            modified = self._get_statistics(
238
                node)[2]  # Overall last modification.
239
            modified = max(modified, mtime)
240

    
241
        if user != account:
242
            meta = {'name': account}
243
        else:
244
            meta = {}
245
            if props is not None and include_user_defined:
246
                meta.update(
247
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
248
            if until is not None:
249
                meta.update({'until_timestamp': tstamp})
250
            meta.update({'name': account, 'count': count, 'bytes': bytes})
251
        meta.update({'modified': modified})
252
        return meta
253

    
254
    @backend_method
255
    def update_account_meta(self, user, account, domain, meta, replace=False):
256
        """Update the metadata associated with the account for the domain."""
257

    
258
        logger.debug("update_account_meta: %s %s %s %s %s", user,
259
                     account, domain, meta, replace)
260
        if user != account:
261
            raise NotAllowedError
262
        path, node = self._lookup_account(account, True)
263
        self._put_metadata(user, node, domain, meta, replace)
264

    
265
    @backend_method
266
    def get_account_groups(self, user, account):
267
        """Return a dictionary with the user groups defined for this account."""
268

    
269
        logger.debug("get_account_groups: %s %s", user, account)
270
        if user != account:
271
            if account not in self._allowed_accounts(user):
272
                raise NotAllowedError
273
            return {}
274
        self._lookup_account(account, True)
275
        return self.permissions.group_dict(account)
276

    
277
    @backend_method
278
    def update_account_groups(self, user, account, groups, replace=False):
279
        """Update the groups associated with the account."""
280

    
281
        logger.debug("update_account_groups: %s %s %s %s", user,
282
                     account, groups, replace)
283
        if user != account:
284
            raise NotAllowedError
285
        self._lookup_account(account, True)
286
        self._check_groups(groups)
287
        if replace:
288
            self.permissions.group_destroy(account)
289
        for k, v in groups.iteritems():
290
            if not replace:  # If not already deleted.
291
                self.permissions.group_delete(account, k)
292
            if v:
293
                self.permissions.group_addmany(account, k, v)
294

    
295
    @backend_method
296
    def get_account_policy(self, user, account):
297
        """Return a dictionary with the account policy."""
298

    
299
        logger.debug("get_account_policy: %s %s", user, account)
300
        if user != account:
301
            if account not in self._allowed_accounts(user):
302
                raise NotAllowedError
303
            return {}
304
        path, node = self._lookup_account(account, True)
305
        return self._get_policy(node)
306

    
307
    @backend_method
308
    def update_account_policy(self, user, account, policy, replace=False):
309
        """Update the policy associated with the account."""
310

    
311
        logger.debug("update_account_policy: %s %s %s %s", user,
312
                     account, policy, replace)
313
        if user != account:
314
            raise NotAllowedError
315
        path, node = self._lookup_account(account, True)
316
        self._check_policy(policy)
317
        self._put_policy(node, policy, replace)
318

    
319
    @backend_method
320
    def put_account(self, user, account, policy={}):
321
        """Create a new account with the given name."""
322

    
323
        logger.debug("put_account: %s %s %s", user, account, policy)
324
        if user != account:
325
            raise NotAllowedError
326
        node = self.node.node_lookup(account)
327
        if node is not None:
328
            raise AccountExists('Account already exists')
329
        if policy:
330
            self._check_policy(policy)
331
        node = self._put_path(user, self.ROOTNODE, account)
332
        self._put_policy(node, policy, True)
333

    
334
    @backend_method
335
    def delete_account(self, user, account):
336
        """Delete the account with the given name."""
337

    
338
        logger.debug("delete_account: %s %s", user, account)
339
        if user != account:
340
            raise NotAllowedError
341
        node = self.node.node_lookup(account)
342
        if node is None:
343
            return
344
        if not self.node.node_remove(node):
345
            raise AccountNotEmpty('Account is not empty')
346
        self.permissions.group_destroy(account)
347

    
348
    @backend_method
349
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
350
        """Return a list of containers existing under an account."""
351

    
352
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
353
                     account, marker, limit, shared, until, public)
354
        if user != account:
355
            if until or account not in self._allowed_accounts(user):
356
                raise NotAllowedError
357
            allowed = self._allowed_containers(user, account)
358
            start, limit = self._list_limits(allowed, marker, limit)
359
            return allowed[start:start + limit]
360
        if shared or public:
361
            allowed = set()
362
            if shared:
363
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
364
            if public:
365
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
366
            allowed = sorted(allowed)
367
            start, limit = self._list_limits(allowed, marker, limit)
368
            return allowed[start:start + limit]
369
        node = self.node.node_lookup(account)
370
        containers = [x[0] for x in self._list_object_properties(
371
            node, account, '', '/', marker, limit, False, None, [], until)]
372
        start, limit = self._list_limits(
373
            [x[0] for x in containers], marker, limit)
374
        return containers[start:start + limit]
375

    
376
    @backend_method
377
    def list_container_meta(self, user, account, container, domain, until=None):
378
        """Return a list with all the container's object meta keys for the domain."""
379

    
380
        logger.debug("list_container_meta: %s %s %s %s %s", user,
381
                     account, container, domain, until)
382
        allowed = []
383
        if user != account:
384
            if until:
385
                raise NotAllowedError
386
            allowed = self.permissions.access_list_paths(
387
                user, '/'.join((account, container)))
388
            if not allowed:
389
                raise NotAllowedError
390
        path, node = self._lookup_container(account, container)
391
        before = until if until is not None else inf
392
        allowed = self._get_formatted_paths(allowed)
393
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
394

    
395
    @backend_method
396
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
397
        """Return a dictionary with the container metadata for the domain."""
398

    
399
        logger.debug("get_container_meta: %s %s %s %s %s", user,
400
                     account, container, domain, until)
401
        if user != account:
402
            if until or container not in self._allowed_containers(user, account):
403
                raise NotAllowedError
404
        path, node = self._lookup_container(account, container)
405
        props = self._get_properties(node, until)
406
        mtime = props[self.MTIME]
407
        count, bytes, tstamp = self._get_statistics(node, until)
408
        tstamp = max(tstamp, mtime)
409
        if until is None:
410
            modified = tstamp
411
        else:
412
            modified = self._get_statistics(
413
                node)[2]  # Overall last modification.
414
            modified = max(modified, mtime)
415

    
416
        if user != account:
417
            meta = {'name': container}
418
        else:
419
            meta = {}
420
            if include_user_defined:
421
                meta.update(
422
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
423
            if until is not None:
424
                meta.update({'until_timestamp': tstamp})
425
            meta.update({'name': container, 'count': count, 'bytes': bytes})
426
        meta.update({'modified': modified})
427
        return meta
428

    
429
    @backend_method
430
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
431
        """Update the metadata associated with the container for the domain."""
432

    
433
        logger.debug("update_container_meta: %s %s %s %s %s %s",
434
                     user, account, container, domain, meta, replace)
435
        if user != account:
436
            raise NotAllowedError
437
        path, node = self._lookup_container(account, container)
438
        src_version_id, dest_version_id = self._put_metadata(
439
            user, node, domain, meta, replace)
440
        if src_version_id is not None:
441
            versioning = self._get_policy(node)['versioning']
442
            if versioning != 'auto':
443
                self.node.version_remove(src_version_id)
444

    
445
    @backend_method
446
    def get_container_policy(self, user, account, container):
447
        """Return a dictionary with the container policy."""
448

    
449
        logger.debug(
450
            "get_container_policy: %s %s %s", user, account, container)
451
        if user != account:
452
            if container not in self._allowed_containers(user, account):
453
                raise NotAllowedError
454
            return {}
455
        path, node = self._lookup_container(account, container)
456
        return self._get_policy(node)
457

    
458
    @backend_method
459
    def update_container_policy(self, user, account, container, policy, replace=False):
460
        """Update the policy associated with the container."""
461

    
462
        logger.debug("update_container_policy: %s %s %s %s %s",
463
                     user, account, container, policy, replace)
464
        if user != account:
465
            raise NotAllowedError
466
        path, node = self._lookup_container(account, container)
467
        self._check_policy(policy)
468
        self._put_policy(node, policy, replace)
469

    
470
    @backend_method
471
    def put_container(self, user, account, container, policy={}):
472
        """Create a new container with the given name."""
473

    
474
        logger.debug(
475
            "put_container: %s %s %s %s", user, account, container, policy)
476
        if user != account:
477
            raise NotAllowedError
478
        try:
479
            path, node = self._lookup_container(account, container)
480
        except NameError:
481
            pass
482
        else:
483
            raise ContainerExists('Container already exists')
484
        if policy:
485
            self._check_policy(policy)
486
        path = '/'.join((account, container))
487
        node = self._put_path(
488
            user, self._lookup_account(account, True)[1], path)
489
        self._put_policy(node, policy, True)
490

    
491
    @backend_method
492
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
493
        """Delete/purge the container with the given name."""
494

    
495
        logger.debug("delete_container: %s %s %s %s %s %s", user,
496
                     account, container, until, prefix, delimiter)
497
        if user != account:
498
            raise NotAllowedError
499
        path, node = self._lookup_container(account, container)
500

    
501
        if until is not None:
502
            hashes, size, serials = self.node.node_purge_children(
503
                node, until, CLUSTER_HISTORY)
504
            for h in hashes:
505
                self.store.map_delete(h)
506
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
507
            self._report_size_change(user, account, -size,
508
                                     {'action':'container purge', 'path': path,
509
                                      'versions': serials})
510
            return
511

    
512
        if not delimiter:
513
            if self._get_statistics(node)[0] > 0:
514
                raise ContainerNotEmpty('Container is not empty')
515
            hashes, size, serials = self.node.node_purge_children(
516
                node, inf, CLUSTER_HISTORY)
517
            for h in hashes:
518
                self.store.map_delete(h)
519
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
520
            self.node.node_remove(node)
521
            self._report_size_change(user, account, -size,
522
                                     {'action': 'container delete',
523
                                      'path': path,
524
                                      'versions': serials})
525
        else:
526
                # remove only contents
527
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
528
            paths = []
529
            for t in src_names:
530
                path = '/'.join((account, container, t[0]))
531
                node = t[2]
532
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
533
                del_size = self._apply_versioning(
534
                    account, container, src_version_id)
535
                if del_size:
536
                    self._report_size_change(user, account, -del_size,
537
                                             {'action': 'object delete',
538
                                              'path': path, 'versions': [dest_version_id]})
539
                self._report_object_change(
540
                    user, account, path, details={'action': 'object delete'})
541
                paths.append(path)
542
            self.permissions.access_clear_bulk(paths)
543

    
544
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
545
        if user != account and until:
546
            raise NotAllowedError
547
        if shared and public:
548
            # get shared first
549
            shared = self._list_object_permissions(
550
                user, account, container, prefix, shared=True, public=False)
551
            objects = set()
552
            if shared:
553
                path, node = self._lookup_container(account, container)
554
                shared = self._get_formatted_paths(shared)
555
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
556

    
557
            # get public
558
            objects |= set(self._list_public_object_properties(
559
                user, account, container, prefix, all_props))
560
            objects = list(objects)
561

    
562
            objects.sort(key=lambda x: x[0])
563
            start, limit = self._list_limits(
564
                [x[0] for x in objects], marker, limit)
565
            return objects[start:start + limit]
566
        elif public:
567
            objects = self._list_public_object_properties(
568
                user, account, container, prefix, all_props)
569
            start, limit = self._list_limits(
570
                [x[0] for x in objects], marker, limit)
571
            return objects[start:start + limit]
572

    
573
        allowed = self._list_object_permissions(
574
            user, account, container, prefix, shared, public)
575
        if shared and not allowed:
576
            return []
577
        path, node = self._lookup_container(account, container)
578
        allowed = self._get_formatted_paths(allowed)
579
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
580
        start, limit = self._list_limits(
581
            [x[0] for x in objects], marker, limit)
582
        return objects[start:start + limit]
583

    
584
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
585
        public = self._list_object_permissions(
586
            user, account, container, prefix, shared=False, public=True)
587
        paths, nodes = self._lookup_objects(public)
588
        path = '/'.join((account, container))
589
        cont_prefix = path + '/'
590
        paths = [x[len(cont_prefix):] for x in paths]
591
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
592
        objects = [(path,) + props for path, props in zip(paths, props)]
593
        return objects
594

    
595
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
596
        objects = []
597
        while True:
598
            marker = objects[-1] if objects else None
599
            limit = 10000
600
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
601
            objects.extend(l)
602
            if not l or len(l) < limit:
603
                break
604
        return objects
605

    
606
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
607
        allowed = []
608
        path = '/'.join((account, container, prefix)).rstrip('/')
609
        if user != account:
610
            allowed = self.permissions.access_list_paths(user, path)
611
            if not allowed:
612
                raise NotAllowedError
613
        else:
614
            allowed = set()
615
            if shared:
616
                allowed.update(self.permissions.access_list_shared(path))
617
            if public:
618
                allowed.update(
619
                    [x[0] for x in self.permissions.public_list(path)])
620
            allowed = sorted(allowed)
621
            if not allowed:
622
                return []
623
        return allowed
624

    
625
    @backend_method
626
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
627
        """Return a list of object (name, version_id) tuples existing under a container."""
628

    
629
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
630
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
631

    
632
    @backend_method
633
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
634
        """Return a list of object metadata dicts existing under a container."""
635

    
636
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
637
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
638
        objects = []
639
        for p in props:
640
            if len(p) == 2:
641
                objects.append({'subdir': p[0]})
642
            else:
643
                objects.append({'name': p[0],
644
                                'bytes': p[self.SIZE + 1],
645
                                'type': p[self.TYPE + 1],
646
                                'hash': p[self.HASH + 1],
647
                                'version': p[self.SERIAL + 1],
648
                                'version_timestamp': p[self.MTIME + 1],
649
                                'modified': p[self.MTIME + 1] if until is None else None,
650
                                'modified_by': p[self.MUSER + 1],
651
                                'uuid': p[self.UUID + 1],
652
                                'checksum': p[self.CHECKSUM + 1]})
653
        return objects
654

    
655
    @backend_method
656
    def list_object_permissions(self, user, account, container, prefix=''):
657
        """Return a list of paths that enforce permissions under a container."""
658

    
659
        logger.debug("list_object_permissions: %s %s %s %s", user,
660
                     account, container, prefix)
661
        return self._list_object_permissions(user, account, container, prefix, True, False)
662

    
663
    @backend_method
664
    def list_object_public(self, user, account, container, prefix=''):
665
        """Return a dict mapping paths to public ids for objects that are public under a container."""
666

    
667
        logger.debug("list_object_public: %s %s %s %s", user,
668
                     account, container, prefix)
669
        public = {}
670
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
671
            public[path] = p + ULTIMATE_ANSWER
672
        return public
673

    
674
    @backend_method
675
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
676
        """Return a dictionary with the object metadata for the domain."""
677

    
678
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
679
                     account, container, name, domain, version)
680
        self._can_read(user, account, container, name)
681
        path, node = self._lookup_object(account, container, name)
682
        props = self._get_version(node, version)
683
        if version is None:
684
            modified = props[self.MTIME]
685
        else:
686
            try:
687
                modified = self._get_version(
688
                    node)[self.MTIME]  # Overall last modification.
689
            except NameError:  # Object may be deleted.
690
                del_props = self.node.version_lookup(
691
                    node, inf, CLUSTER_DELETED)
692
                if del_props is None:
693
                    raise ItemNotExists('Object does not exist')
694
                modified = del_props[self.MTIME]
695

    
696
        meta = {}
697
        if include_user_defined:
698
            meta.update(
699
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
700
        meta.update({'name': name,
701
                     'bytes': props[self.SIZE],
702
                     'type': props[self.TYPE],
703
                     'hash': props[self.HASH],
704
                     'version': props[self.SERIAL],
705
                     'version_timestamp': props[self.MTIME],
706
                     'modified': modified,
707
                     'modified_by': props[self.MUSER],
708
                     'uuid': props[self.UUID],
709
                     'checksum': props[self.CHECKSUM]})
710
        return meta
711

    
712
    @backend_method
713
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
714
        """Update the metadata associated with the object for the domain and return the new version."""
715

    
716
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
717
                     user, account, container, name, domain, meta, replace)
718
        self._can_write(user, account, container, name)
719
        path, node = self._lookup_object(account, container, name)
720
        src_version_id, dest_version_id = self._put_metadata(
721
            user, node, domain, meta, replace)
722
        self._apply_versioning(account, container, src_version_id)
723
        return dest_version_id
724

    
725
    @backend_method
726
    def get_object_permissions(self, user, account, container, name):
727
        """Return the action allowed on the object, the path
728
        from which the object gets its permissions from,
729
        along with a dictionary containing the permissions."""
730

    
731
        logger.debug("get_object_permissions: %s %s %s %s", user,
732
                     account, container, name)
733
        allowed = 'write'
734
        permissions_path = self._get_permissions_path(account, container, name)
735
        if user != account:
736
            if self.permissions.access_check(permissions_path, self.WRITE, user):
737
                allowed = 'write'
738
            elif self.permissions.access_check(permissions_path, self.READ, user):
739
                allowed = 'read'
740
            else:
741
                raise NotAllowedError
742
        self._lookup_object(account, container, name)
743
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
744

    
745
    @backend_method
746
    def update_object_permissions(self, user, account, container, name, permissions):
747
        """Update the permissions associated with the object."""
748

    
749
        logger.debug("update_object_permissions: %s %s %s %s %s",
750
                     user, account, container, name, permissions)
751
        if user != account:
752
            raise NotAllowedError
753
        path = self._lookup_object(account, container, name)[0]
754
        self._check_permissions(path, permissions)
755
        self.permissions.access_set(path, permissions)
756
        self._report_sharing_change(user, account, path, {'members':
757
                                    self.permissions.access_members(path)})
758

    
759
    @backend_method
760
    def get_object_public(self, user, account, container, name):
761
        """Return the public id of the object if applicable."""
762

    
763
        logger.debug(
764
            "get_object_public: %s %s %s %s", user, account, container, name)
765
        self._can_read(user, account, container, name)
766
        path = self._lookup_object(account, container, name)[0]
767
        p = self.permissions.public_get(path)
768
        if p is not None:
769
            p += ULTIMATE_ANSWER
770
        return p
771

    
772
    @backend_method
773
    def update_object_public(self, user, account, container, name, public):
774
        """Update the public status of the object."""
775

    
776
        logger.debug("update_object_public: %s %s %s %s %s", user,
777
                     account, container, name, public)
778
        self._can_write(user, account, container, name)
779
        path = self._lookup_object(account, container, name)[0]
780
        if not public:
781
            self.permissions.public_unset(path)
782
        else:
783
            self.permissions.public_set(path)
784

    
785
    @backend_method
786
    def get_object_hashmap(self, user, account, container, name, version=None):
787
        """Return the object's size and a list with partial hashes."""
788

    
789
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
790
                     account, container, name, version)
791
        self._can_read(user, account, container, name)
792
        path, node = self._lookup_object(account, container, name)
793
        props = self._get_version(node, version)
794
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
795
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
796

    
797
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
798
        if permissions is not None and user != account:
799
            raise NotAllowedError
800
        self._can_write(user, account, container, name)
801
        if permissions is not None:
802
            path = '/'.join((account, container, name))
803
            self._check_permissions(path, permissions)
804

    
805
        account_path, account_node = self._lookup_account(account, True)
806
        container_path, container_node = self._lookup_container(
807
            account, container)
808
        path, node = self._put_object_node(
809
            container_path, container_node, name)
810
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
811

    
812
        # Handle meta.
813
        if src_version_id is None:
814
            src_version_id = pre_version_id
815
        self._put_metadata_duplicate(
816
            src_version_id, dest_version_id, domain, meta, replace_meta)
817

    
818
        # Check quota.
819
        del_size = self._apply_versioning(account, container, pre_version_id)
820
        size_delta = size - del_size
821
        if size_delta > 0:
822
            account_quota = long(self._get_policy(account_node)['quota'])
823
            container_quota = long(self._get_policy(container_node)['quota'])
824
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
825
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
826
                # This must be executed in a transaction, so the version is never created if it fails.
827
                raise QuotaError
828
        self._report_size_change(user, account, size_delta,
829
                                 {'action': 'object update', 'path': path,
830
                                  'versions': [dest_version_id]})
831

    
832
        if permissions is not None:
833
            self.permissions.access_set(path, permissions)
834
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
835

    
836
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
837
        return dest_version_id
838

    
839
    @backend_method
840
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
841
        """Create/update an object with the specified size and partial hashes."""
842

    
843
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
844
                     account, container, name, size, type, hashmap, checksum)
845
        if size == 0:  # No such thing as an empty hashmap.
846
            hashmap = [self.put_block('')]
847
        map = HashMap(self.block_size, self.hash_algorithm)
848
        map.extend([binascii.unhexlify(x) for x in hashmap])
849
        missing = self.store.block_search(map)
850
        if missing:
851
            ie = IndexError()
852
            ie.data = [binascii.hexlify(x) for x in missing]
853
            raise ie
854

    
855
        hash = map.hash()
856
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
857
        self.store.map_put(hash, map)
858
        return dest_version_id
859

    
860
    @backend_method
861
    def update_object_checksum(self, user, account, container, name, version, checksum):
862
        """Update an object's checksum."""
863

    
864
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
865
                     user, account, container, name, version, checksum)
866
        # Update objects with greater version and same hashmap and size (fix metadata updates).
867
        self._can_write(user, account, container, name)
868
        path, node = self._lookup_object(account, container, name)
869
        props = self._get_version(node, version)
870
        versions = self.node.node_get_versions(node)
871
        for x in versions:
872
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
873
                self.node.version_put_property(
874
                    x[self.SERIAL], 'checksum', checksum)
875

    
876
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
877
        dest_version_ids = []
878
        self._can_read(user, src_account, src_container, src_name)
879
        path, node = self._lookup_object(src_account, src_container, src_name)
880
        # TODO: Will do another fetch of the properties in duplicate version...
881
        props = self._get_version(
882
            node, src_version)  # Check to see if source exists.
883
        src_version_id = props[self.SERIAL]
884
        hash = props[self.HASH]
885
        size = props[self.SIZE]
886
        is_copy = not is_move and (src_account, src_container, src_name) != (
887
            dest_account, dest_container, dest_name)  # New uuid.
888
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
889
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
890
            self._delete_object(user, src_account, src_container, src_name)
891

    
892
        if delimiter:
893
            prefix = src_name + \
894
                delimiter if not src_name.endswith(delimiter) else src_name
895
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
896
            src_names.sort(key=lambda x: x[2])  # order by nodes
897
            paths = [elem[0] for elem in src_names]
898
            nodes = [elem[2] for elem in src_names]
899
            # TODO: Will do another fetch of the properties in duplicate version...
900
            props = self._get_versions(nodes)  # Check to see if source exists.
901

    
902
            for prop, path, node in zip(props, paths, nodes):
903
                src_version_id = prop[self.SERIAL]
904
                hash = prop[self.HASH]
905
                vtype = prop[self.TYPE]
906
                size = prop[self.SIZE]
907
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
908
                    delimiter) else dest_name
909
                vdest_name = path.replace(prefix, dest_prefix, 1)
910
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
911
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
912
                    self._delete_object(user, src_account, src_container, path)
913
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
914

    
915
    @backend_method
916
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
917
        """Copy an object's data and metadata."""
918

    
919
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
920
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
921
        return dest_version_id
922

    
923
    @backend_method
924
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
925
        """Move an object's data and metadata."""
926

    
927
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
928
        if user != src_account:
929
            raise NotAllowedError
930
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
931
        return dest_version_id
932

    
933
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
934
        if user != account:
935
            raise NotAllowedError
936

    
937
        if until is not None:
938
            path = '/'.join((account, container, name))
939
            node = self.node.node_lookup(path)
940
            if node is None:
941
                return
942
            hashes = []
943
            size = 0
944
            serials = []
945
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
946
            hashes += h
947
            size += s
948
            serials += v
949
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
950
            hashes += h
951
            size += s
952
            serials += v
953
            for h in hashes:
954
                self.store.map_delete(h)
955
            self.node.node_purge(node, until, CLUSTER_DELETED)
956
            try:
957
                props = self._get_version(node)
958
            except NameError:
959
                self.permissions.access_clear(path)
960
            self._report_size_change(user, account, -size,
961
                                    {'action': 'object purge', 'path': path,
962
                                     'versions': serials})
963
            return
964

    
965
        path, node = self._lookup_object(account, container, name)
966
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
967
        del_size = self._apply_versioning(account, container, src_version_id)
968
        if del_size:
969
            self._report_size_change(user, account, -del_size,
970
                                     {'action': 'object delete', 'path': path,
971
                                      'versions': [dest_version_id]})
972
        self._report_object_change(
973
            user, account, path, details={'action': 'object delete'})
974
        self.permissions.access_clear(path)
975

    
976
        if delimiter:
977
            prefix = name + delimiter if not name.endswith(delimiter) else name
978
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
979
            paths = []
980
            for t in src_names:
981
                path = '/'.join((account, container, t[0]))
982
                node = t[2]
983
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
984
                del_size = self._apply_versioning(
985
                    account, container, src_version_id)
986
                if del_size:
987
                    self._report_size_change(user, account, -del_size,
988
                                             {'action': 'object delete',
989
                                              'path': path,
990
                                              'versions': [dest_version_id]})
991
                self._report_object_change(
992
                    user, account, path, details={'action': 'object delete'})
993
                paths.append(path)
994
            self.permissions.access_clear_bulk(paths)
995

    
996
    @backend_method
997
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
998
        """Delete/purge an object."""
999

    
1000
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1001
                     account, container, name, until, prefix, delimiter)
1002
        self._delete_object(user, account, container, name, until, delimiter)
1003

    
1004
    @backend_method
1005
    def list_versions(self, user, account, container, name):
1006
        """Return a list of all (version, version_timestamp) tuples for an object."""
1007

    
1008
        logger.debug(
1009
            "list_versions: %s %s %s %s", user, account, container, name)
1010
        self._can_read(user, account, container, name)
1011
        path, node = self._lookup_object(account, container, name)
1012
        versions = self.node.node_get_versions(node)
1013
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1014

    
1015
    @backend_method
1016
    def get_uuid(self, user, uuid):
1017
        """Return the (account, container, name) for the UUID given."""
1018

    
1019
        logger.debug("get_uuid: %s %s", user, uuid)
1020
        info = self.node.latest_uuid(uuid)
1021
        if info is None:
1022
            raise NameError
1023
        path, serial = info
1024
        account, container, name = path.split('/', 2)
1025
        self._can_read(user, account, container, name)
1026
        return (account, container, name)
1027

    
1028
    @backend_method
1029
    def get_public(self, user, public):
1030
        """Return the (account, container, name) for the public id given."""
1031

    
1032
        logger.debug("get_public: %s %s", user, public)
1033
        if public is None or public < ULTIMATE_ANSWER:
1034
            raise NameError
1035
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1036
        if path is None:
1037
            raise NameError
1038
        account, container, name = path.split('/', 2)
1039
        self._can_read(user, account, container, name)
1040
        return (account, container, name)
1041

    
1042
    @backend_method(autocommit=0)
1043
    def get_block(self, hash):
1044
        """Return a block's data."""
1045

    
1046
        logger.debug("get_block: %s", hash)
1047
        block = self.store.block_get(binascii.unhexlify(hash))
1048
        if not block:
1049
            raise ItemNotExists('Block does not exist')
1050
        return block
1051

    
1052
    @backend_method(autocommit=0)
1053
    def put_block(self, data):
1054
        """Store a block and return the hash."""
1055

    
1056
        logger.debug("put_block: %s", len(data))
1057
        return binascii.hexlify(self.store.block_put(data))
1058

    
1059
    @backend_method(autocommit=0)
1060
    def update_block(self, hash, data, offset=0):
1061
        """Update a known block and return the hash."""
1062

    
1063
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1064
        if offset == 0 and len(data) == self.block_size:
1065
            return self.put_block(data)
1066
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1067
        return binascii.hexlify(h)
1068

    
1069
    # Path functions.
1070

    
1071
    def _generate_uuid(self):
1072
        return str(uuidlib.uuid4())
1073

    
1074
    def _put_object_node(self, path, parent, name):
1075
        path = '/'.join((path, name))
1076
        node = self.node.node_lookup(path)
1077
        if node is None:
1078
            node = self.node.node_create(parent, path)
1079
        return path, node
1080

    
1081
    def _put_path(self, user, parent, path):
1082
        node = self.node.node_create(parent, path)
1083
        self.node.version_create(node, None, 0, '', None, user,
1084
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1085
        return node
1086

    
1087
    def _lookup_account(self, account, create=True):
1088
        node = self.node.node_lookup(account)
1089
        if node is None and create:
1090
            node = self._put_path(
1091
                account, self.ROOTNODE, account)  # User is account.
1092
        return account, node
1093

    
1094
    def _lookup_container(self, account, container):
1095
        path = '/'.join((account, container))
1096
        node = self.node.node_lookup(path)
1097
        if node is None:
1098
            raise ItemNotExists('Container does not exist')
1099
        return path, node
1100

    
1101
    def _lookup_object(self, account, container, name):
1102
        path = '/'.join((account, container, name))
1103
        node = self.node.node_lookup(path)
1104
        if node is None:
1105
            raise ItemNotExists('Object does not exist')
1106
        return path, node
1107

    
1108
    def _lookup_objects(self, paths):
1109
        nodes = self.node.node_lookup_bulk(paths)
1110
        return paths, nodes
1111

    
1112
    def _get_properties(self, node, until=None):
1113
        """Return properties until the timestamp given."""
1114

    
1115
        before = until if until is not None else inf
1116
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1117
        if props is None and until is not None:
1118
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1119
        if props is None:
1120
            raise ItemNotExists('Path does not exist')
1121
        return props
1122

    
1123
    def _get_statistics(self, node, until=None):
1124
        """Return count, sum of size and latest timestamp of everything under node."""
1125

    
1126
        if until is None:
1127
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1128
        else:
1129
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1130
        if stats is None:
1131
            stats = (0, 0, 0)
1132
        return stats
1133

    
1134
    def _get_version(self, node, version=None):
1135
        if version is None:
1136
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1137
            if props is None:
1138
                raise ItemNotExists('Object does not exist')
1139
        else:
1140
            try:
1141
                version = int(version)
1142
            except ValueError:
1143
                raise VersionNotExists('Version does not exist')
1144
            props = self.node.version_get_properties(version)
1145
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1146
                raise VersionNotExists('Version does not exist')
1147
        return props
1148

    
1149
    def _get_versions(self, nodes):
1150
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1151

    
1152
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1153
        """Create a new version of the node."""
1154

    
1155
        props = self.node.version_lookup(
1156
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1157
        if props is not None:
1158
            src_version_id = props[self.SERIAL]
1159
            src_hash = props[self.HASH]
1160
            src_size = props[self.SIZE]
1161
            src_type = props[self.TYPE]
1162
            src_checksum = props[self.CHECKSUM]
1163
        else:
1164
            src_version_id = None
1165
            src_hash = None
1166
            src_size = 0
1167
            src_type = ''
1168
            src_checksum = ''
1169
        if size is None:  # Set metadata.
1170
            hash = src_hash  # This way hash can be set to None (account or container).
1171
            size = src_size
1172
        if type is None:
1173
            type = src_type
1174
        if checksum is None:
1175
            checksum = src_checksum
1176
        uuid = self._generate_uuid(
1177
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1178

    
1179
        if src_node is None:
1180
            pre_version_id = src_version_id
1181
        else:
1182
            pre_version_id = None
1183
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1184
            if props is not None:
1185
                pre_version_id = props[self.SERIAL]
1186
        if pre_version_id is not None:
1187
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1188

    
1189
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1190
        return pre_version_id, dest_version_id
1191

    
1192
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1193
        if src_version_id is not None:
1194
            self.node.attribute_copy(src_version_id, dest_version_id)
1195
        if not replace:
1196
            self.node.attribute_del(dest_version_id, domain, (
1197
                k for k, v in meta.iteritems() if v == ''))
1198
            self.node.attribute_set(dest_version_id, domain, (
1199
                (k, v) for k, v in meta.iteritems() if v != ''))
1200
        else:
1201
            self.node.attribute_del(dest_version_id, domain)
1202
            self.node.attribute_set(dest_version_id, domain, ((
1203
                k, v) for k, v in meta.iteritems()))
1204

    
1205
    def _put_metadata(self, user, node, domain, meta, replace=False):
1206
        """Create a new version and store metadata."""
1207

    
1208
        src_version_id, dest_version_id = self._put_version_duplicate(
1209
            user, node)
1210
        self._put_metadata_duplicate(
1211
            src_version_id, dest_version_id, domain, meta, replace)
1212
        return src_version_id, dest_version_id
1213

    
1214
    def _list_limits(self, listing, marker, limit):
1215
        start = 0
1216
        if marker:
1217
            try:
1218
                start = listing.index(marker) + 1
1219
            except ValueError:
1220
                pass
1221
        if not limit or limit > 10000:
1222
            limit = 10000
1223
        return start, limit
1224

    
1225
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1226
        cont_prefix = path + '/'
1227
        prefix = cont_prefix + prefix
1228
        start = cont_prefix + marker if marker else None
1229
        before = until if until is not None else inf
1230
        filterq = keys if domain else []
1231
        sizeq = size_range
1232

    
1233
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1234
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1235
        objects.sort(key=lambda x: x[0])
1236
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1237
        return objects
1238

    
1239
    # Reporting functions.
1240

    
1241
    def _report_size_change(self, user, account, size, details={}):
1242
        account_node = self._lookup_account(account, True)[1]
1243
        total = self._get_statistics(account_node)[1]
1244
        details.update({'user': user, 'total': total})
1245
        logger.debug(
1246
            "_report_size_change: %s %s %s %s", user, account, size, details)
1247
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1248
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1249
                              float(size), details))
1250

    
1251
        serial = self.quotaholder.issue_commission(
1252
                context     =   {},
1253
                target      =   account,
1254
                key         =   '1',
1255
                clientkey   =   'pithos',
1256
                ownerkey    =   '',
1257
                provisions  =   (('pithos+', 'pithos+ : diskspace', size),)
1258
        )
1259
        self.serials.append(serial)
1260

    
1261
    def _report_object_change(self, user, account, path, details={}):
1262
        details.update({'user': user})
1263
        logger.debug("_report_object_change: %s %s %s %s", user,
1264
                     account, path, details)
1265
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1266
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1267

    
1268
    def _report_sharing_change(self, user, account, path, details={}):
1269
        logger.debug("_report_permissions_change: %s %s %s %s",
1270
                     user, account, path, details)
1271
        details.update({'user': user})
1272
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1273
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1274

    
1275
    # Policy functions.
1276

    
1277
    def _check_policy(self, policy):
1278
        for k in policy.keys():
1279
            if policy[k] == '':
1280
                policy[k] = self.default_policy.get(k)
1281
        for k, v in policy.iteritems():
1282
            if k == 'quota':
1283
                q = int(v)  # May raise ValueError.
1284
                if q < 0:
1285
                    raise ValueError
1286
            elif k == 'versioning':
1287
                if v not in ['auto', 'none']:
1288
                    raise ValueError
1289
            else:
1290
                raise ValueError
1291

    
1292
    def _put_policy(self, node, policy, replace):
1293
        if replace:
1294
            for k, v in self.default_policy.iteritems():
1295
                if k not in policy:
1296
                    policy[k] = v
1297
        self.node.policy_set(node, policy)
1298

    
1299
    def _get_policy(self, node):
1300
        policy = self.default_policy.copy()
1301
        policy.update(self.node.policy_get(node))
1302
        return policy
1303

    
1304
    def _apply_versioning(self, account, container, version_id):
1305
        """Delete the provided version if such is the policy.
1306
           Return size of object removed.
1307
        """
1308

    
1309
        if version_id is None:
1310
            return 0
1311
        path, node = self._lookup_container(account, container)
1312
        versioning = self._get_policy(node)['versioning']
1313
        if versioning != 'auto':
1314
            hash, size = self.node.version_remove(version_id)
1315
            self.store.map_delete(hash)
1316
            return size
1317
        return 0
1318

    
1319
    # Access control functions.
1320

    
1321
    def _check_groups(self, groups):
1322
        # raise ValueError('Bad characters in groups')
1323
        pass
1324

    
1325
    def _check_permissions(self, path, permissions):
1326
        # raise ValueError('Bad characters in permissions')
1327
        pass
1328

    
1329
    def _get_formatted_paths(self, paths):
1330
        formatted = []
1331
        for p in paths:
1332
            node = self.node.node_lookup(p)
1333
            if node is not None:
1334
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1335
            if props is not None:
1336
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1337
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1338
                formatted.append((p, self.MATCH_EXACT))
1339
        return formatted
1340

    
1341
    def _get_permissions_path(self, account, container, name):
1342
        path = '/'.join((account, container, name))
1343
        permission_paths = self.permissions.access_inherit(path)
1344
        permission_paths.sort()
1345
        permission_paths.reverse()
1346
        for p in permission_paths:
1347
            if p == path:
1348
                return p
1349
            else:
1350
                if p.count('/') < 2:
1351
                    continue
1352
                node = self.node.node_lookup(p)
1353
                if node is not None:
1354
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1355
                if props is not None:
1356
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1357
                        return p
1358
        return None
1359

    
1360
    def _can_read(self, user, account, container, name):
1361
        if user == account:
1362
            return True
1363
        path = '/'.join((account, container, name))
1364
        if self.permissions.public_get(path) is not None:
1365
            return True
1366
        path = self._get_permissions_path(account, container, name)
1367
        if not path:
1368
            raise NotAllowedError
1369
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1370
            raise NotAllowedError
1371

    
1372
    def _can_write(self, user, account, container, name):
1373
        if user == account:
1374
            return True
1375
        path = '/'.join((account, container, name))
1376
        path = self._get_permissions_path(account, container, name)
1377
        if not path:
1378
            raise NotAllowedError
1379
        if not self.permissions.access_check(path, self.WRITE, user):
1380
            raise NotAllowedError
1381

    
1382
    def _allowed_accounts(self, user):
1383
        allow = set()
1384
        for path in self.permissions.access_list_paths(user):
1385
            allow.add(path.split('/', 1)[0])
1386
        return sorted(allow)
1387

    
1388
    def _allowed_containers(self, user, account):
1389
        allow = set()
1390
        for path in self.permissions.access_list_paths(user, account):
1391
            allow.add(path.split('/', 2)[1])
1392
        return sorted(allow)