Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ fff37615

History | View | Annotate | Download (61.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            if serials:
129
                self.quotaholder.reject_commission(
130
                            context     =   {},
131
                            clientkey   =   'pithos',
132
                            serials     =   serials)
133
            self.wrapper.rollback()
134
            raise
135
    return fn
136

    
137

    
138
class ModularBackend(BaseBackend):
139
    """A modular backend.
140

141
    Uses modules for SQL functions and storage.
142
    """
143

    
144
    def __init__(self, db_module=None, db_connection=None,
145
                 block_module=None, block_path=None, block_umask=None,
146
                 queue_module=None, queue_hosts=None, queue_exchange=None,
147
                 quotaholder_url=None, quotaholder_token=None,
148
                 free_versioning=True):
149
        db_module = db_module or DEFAULT_DB_MODULE
150
        db_connection = db_connection or DEFAULT_DB_CONNECTION
151
        block_module = block_module or DEFAULT_BLOCK_MODULE
152
        block_path = block_path or DEFAULT_BLOCK_PATH
153
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
154
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
155
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
156
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
157
        
158
        self.hash_algorithm = 'sha256'
159
        self.block_size = 4 * 1024 * 1024  # 4MB
160
        self.free_versioning = free_versioning
161

    
162
        self.default_policy = {'quota': DEFAULT_QUOTA,
163
                               'versioning': DEFAULT_VERSIONING}
164

    
165
        def load_module(m):
166
            __import__(m)
167
            return sys.modules[m]
168

    
169
        self.db_module = load_module(db_module)
170
        self.wrapper = self.db_module.DBWrapper(db_connection)
171
        params = {'wrapper': self.wrapper}
172
        self.permissions = self.db_module.Permissions(**params)
173
        self.config = self.db_module.Config(**params)
174
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
175
        for x in ['READ', 'WRITE']:
176
            setattr(self, x, getattr(self.db_module, x))
177
        self.node = self.db_module.Node(**params)
178
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
179
            setattr(self, x, getattr(self.db_module, x))
180

    
181
        self.block_module = load_module(block_module)
182
        params = {'path': block_path,
183
                  'block_size': self.block_size,
184
                  'hash_algorithm': self.hash_algorithm,
185
                  'umask': block_umask}
186
        self.store = self.block_module.Store(**params)
187

    
188
        if queue_module and queue_hosts:
189
            self.queue_module = load_module(queue_module)
190
            params = {'hosts': queue_hosts,
191
                              'exchange': queue_exchange,
192
                      'client_id': QUEUE_CLIENT_ID}
193
            self.queue = self.queue_module.Queue(**params)
194
        else:
195
            class NoQueue:
196
                def send(self, *args):
197
                    pass
198

    
199
                def close(self):
200
                    pass
201

    
202
            self.queue = NoQueue()
203

    
204
        self.quotaholder_url = quotaholder_url
205
        self.quotaholder_token = quotaholder_token
206
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
207
        self.serials = []
208
        self.messages = []
209

    
210
    def close(self):
211
        self.wrapper.close()
212
        self.queue.close()
213

    
214
    @backend_method
215
    def list_accounts(self, user, marker=None, limit=10000):
216
        """Return a list of accounts the user can access."""
217

    
218
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
219
        allowed = self._allowed_accounts(user)
220
        start, limit = self._list_limits(allowed, marker, limit)
221
        return allowed[start:start + limit]
222

    
223
    @backend_method
224
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
225
        """Return a dictionary with the account metadata for the domain."""
226

    
227
        logger.debug(
228
            "get_account_meta: %s %s %s %s", user, account, domain, until)
229
        path, node = self._lookup_account(account, user == account)
230
        if user != account:
231
            if until or node is None or account not in self._allowed_accounts(user):
232
                raise NotAllowedError
233
        try:
234
            props = self._get_properties(node, until)
235
            mtime = props[self.MTIME]
236
        except NameError:
237
            props = None
238
            mtime = until
239
        count, bytes, tstamp = self._get_statistics(node, until)
240
        tstamp = max(tstamp, mtime)
241
        if until is None:
242
            modified = tstamp
243
        else:
244
            modified = self._get_statistics(
245
                node)[2]  # Overall last modification.
246
            modified = max(modified, mtime)
247

    
248
        if user != account:
249
            meta = {'name': account}
250
        else:
251
            meta = {}
252
            if props is not None and include_user_defined:
253
                meta.update(
254
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
255
            if until is not None:
256
                meta.update({'until_timestamp': tstamp})
257
            meta.update({'name': account, 'count': count, 'bytes': bytes})
258
        meta.update({'modified': modified})
259
        return meta
260

    
261
    @backend_method
262
    def update_account_meta(self, user, account, domain, meta, replace=False):
263
        """Update the metadata associated with the account for the domain."""
264

    
265
        logger.debug("update_account_meta: %s %s %s %s %s", user,
266
                     account, domain, meta, replace)
267
        if user != account:
268
            raise NotAllowedError
269
        path, node = self._lookup_account(account, True)
270
        self._put_metadata(user, node, domain, meta, replace)
271

    
272
    @backend_method
273
    def get_account_groups(self, user, account):
274
        """Return a dictionary with the user groups defined for this account."""
275

    
276
        logger.debug("get_account_groups: %s %s", user, account)
277
        if user != account:
278
            if account not in self._allowed_accounts(user):
279
                raise NotAllowedError
280
            return {}
281
        self._lookup_account(account, True)
282
        return self.permissions.group_dict(account)
283

    
284
    @backend_method
285
    def update_account_groups(self, user, account, groups, replace=False):
286
        """Update the groups associated with the account."""
287

    
288
        logger.debug("update_account_groups: %s %s %s %s", user,
289
                     account, groups, replace)
290
        if user != account:
291
            raise NotAllowedError
292
        self._lookup_account(account, True)
293
        self._check_groups(groups)
294
        if replace:
295
            self.permissions.group_destroy(account)
296
        for k, v in groups.iteritems():
297
            if not replace:  # If not already deleted.
298
                self.permissions.group_delete(account, k)
299
            if v:
300
                self.permissions.group_addmany(account, k, v)
301

    
302
    @backend_method
303
    def get_account_policy(self, user, account):
304
        """Return a dictionary with the account policy."""
305

    
306
        logger.debug("get_account_policy: %s %s", user, account)
307
        if user != account:
308
            if account not in self._allowed_accounts(user):
309
                raise NotAllowedError
310
            return {}
311
        path, node = self._lookup_account(account, True)
312
        return self._get_policy(node)
313

    
314
    @backend_method
315
    def update_account_policy(self, user, account, policy, replace=False):
316
        """Update the policy associated with the account."""
317

    
318
        logger.debug("update_account_policy: %s %s %s %s", user,
319
                     account, policy, replace)
320
        if user != account:
321
            raise NotAllowedError
322
        path, node = self._lookup_account(account, True)
323
        self._check_policy(policy)
324
        self._put_policy(node, policy, replace)
325

    
326
    @backend_method
327
    def put_account(self, user, account, policy={}):
328
        """Create a new account with the given name."""
329

    
330
        logger.debug("put_account: %s %s %s", user, account, policy)
331
        if user != account:
332
            raise NotAllowedError
333
        node = self.node.node_lookup(account)
334
        if node is not None:
335
            raise AccountExists('Account already exists')
336
        if policy:
337
            self._check_policy(policy)
338
        node = self._put_path(user, self.ROOTNODE, account)
339
        self._put_policy(node, policy, True)
340

    
341
    @backend_method
342
    def delete_account(self, user, account):
343
        """Delete the account with the given name."""
344

    
345
        logger.debug("delete_account: %s %s", user, account)
346
        if user != account:
347
            raise NotAllowedError
348
        node = self.node.node_lookup(account)
349
        if node is None:
350
            return
351
        if not self.node.node_remove(node):
352
            raise AccountNotEmpty('Account is not empty')
353
        self.permissions.group_destroy(account)
354

    
355
    @backend_method
356
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
357
        """Return a list of containers existing under an account."""
358

    
359
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
360
                     account, marker, limit, shared, until, public)
361
        if user != account:
362
            if until or account not in self._allowed_accounts(user):
363
                raise NotAllowedError
364
            allowed = self._allowed_containers(user, account)
365
            start, limit = self._list_limits(allowed, marker, limit)
366
            return allowed[start:start + limit]
367
        if shared or public:
368
            allowed = set()
369
            if shared:
370
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
371
            if public:
372
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
373
            allowed = sorted(allowed)
374
            start, limit = self._list_limits(allowed, marker, limit)
375
            return allowed[start:start + limit]
376
        node = self.node.node_lookup(account)
377
        containers = [x[0] for x in self._list_object_properties(
378
            node, account, '', '/', marker, limit, False, None, [], until)]
379
        start, limit = self._list_limits(
380
            [x[0] for x in containers], marker, limit)
381
        return containers[start:start + limit]
382

    
383
    @backend_method
384
    def list_container_meta(self, user, account, container, domain, until=None):
385
        """Return a list with all the container's object meta keys for the domain."""
386

    
387
        logger.debug("list_container_meta: %s %s %s %s %s", user,
388
                     account, container, domain, until)
389
        allowed = []
390
        if user != account:
391
            if until:
392
                raise NotAllowedError
393
            allowed = self.permissions.access_list_paths(
394
                user, '/'.join((account, container)))
395
            if not allowed:
396
                raise NotAllowedError
397
        path, node = self._lookup_container(account, container)
398
        before = until if until is not None else inf
399
        allowed = self._get_formatted_paths(allowed)
400
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
401

    
402
    @backend_method
403
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
404
        """Return a dictionary with the container metadata for the domain."""
405

    
406
        logger.debug("get_container_meta: %s %s %s %s %s", user,
407
                     account, container, domain, until)
408
        if user != account:
409
            if until or container not in self._allowed_containers(user, account):
410
                raise NotAllowedError
411
        path, node = self._lookup_container(account, container)
412
        props = self._get_properties(node, until)
413
        mtime = props[self.MTIME]
414
        count, bytes, tstamp = self._get_statistics(node, until)
415
        tstamp = max(tstamp, mtime)
416
        if until is None:
417
            modified = tstamp
418
        else:
419
            modified = self._get_statistics(
420
                node)[2]  # Overall last modification.
421
            modified = max(modified, mtime)
422

    
423
        if user != account:
424
            meta = {'name': container}
425
        else:
426
            meta = {}
427
            if include_user_defined:
428
                meta.update(
429
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
430
            if until is not None:
431
                meta.update({'until_timestamp': tstamp})
432
            meta.update({'name': container, 'count': count, 'bytes': bytes})
433
        meta.update({'modified': modified})
434
        return meta
435

    
436
    @backend_method
437
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
438
        """Update the metadata associated with the container for the domain."""
439

    
440
        logger.debug("update_container_meta: %s %s %s %s %s %s",
441
                     user, account, container, domain, meta, replace)
442
        if user != account:
443
            raise NotAllowedError
444
        path, node = self._lookup_container(account, container)
445
        src_version_id, dest_version_id = self._put_metadata(
446
            user, node, domain, meta, replace)
447
        if src_version_id is not None:
448
            versioning = self._get_policy(node)['versioning']
449
            if versioning != 'auto':
450
                self.node.version_remove(src_version_id)
451

    
452
    @backend_method
453
    def get_container_policy(self, user, account, container):
454
        """Return a dictionary with the container policy."""
455

    
456
        logger.debug(
457
            "get_container_policy: %s %s %s", user, account, container)
458
        if user != account:
459
            if container not in self._allowed_containers(user, account):
460
                raise NotAllowedError
461
            return {}
462
        path, node = self._lookup_container(account, container)
463
        return self._get_policy(node)
464

    
465
    @backend_method
466
    def update_container_policy(self, user, account, container, policy, replace=False):
467
        """Update the policy associated with the container."""
468

    
469
        logger.debug("update_container_policy: %s %s %s %s %s",
470
                     user, account, container, policy, replace)
471
        if user != account:
472
            raise NotAllowedError
473
        path, node = self._lookup_container(account, container)
474
        self._check_policy(policy)
475
        self._put_policy(node, policy, replace)
476

    
477
    @backend_method
478
    def put_container(self, user, account, container, policy={}):
479
        """Create a new container with the given name."""
480

    
481
        logger.debug(
482
            "put_container: %s %s %s %s", user, account, container, policy)
483
        if user != account:
484
            raise NotAllowedError
485
        try:
486
            path, node = self._lookup_container(account, container)
487
        except NameError:
488
            pass
489
        else:
490
            raise ContainerExists('Container already exists')
491
        if policy:
492
            self._check_policy(policy)
493
        path = '/'.join((account, container))
494
        node = self._put_path(
495
            user, self._lookup_account(account, True)[1], path)
496
        self._put_policy(node, policy, True)
497

    
498
    @backend_method
499
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
500
        """Delete/purge the container with the given name."""
501

    
502
        logger.debug("delete_container: %s %s %s %s %s %s", user,
503
                     account, container, until, prefix, delimiter)
504
        if user != account:
505
            raise NotAllowedError
506
        path, node = self._lookup_container(account, container)
507

    
508
        if until is not None:
509
            hashes, size, serials = self.node.node_purge_children(
510
                node, until, CLUSTER_HISTORY)
511
            for h in hashes:
512
                self.store.map_delete(h)
513
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
514
            self._report_size_change(user, account, -size,
515
                                     {'action':'container purge', 'path': path,
516
                                      'versions': ','.join(str(i) for i in serials)})
517
            return
518

    
519
        if not delimiter:
520
            if self._get_statistics(node)[0] > 0:
521
                raise ContainerNotEmpty('Container is not empty')
522
            hashes, size, serials = self.node.node_purge_children(
523
                node, inf, CLUSTER_HISTORY)
524
            for h in hashes:
525
                self.store.map_delete(h)
526
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
527
            self.node.node_remove(node)
528
            self._report_size_change(user, account, -size,
529
                                     {'action': 'container delete',
530
                                      'path': path,
531
                                      'versions': ','.join(str(i) for i in serials)})
532
        else:
533
            # remove only contents
534
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
535
            paths = []
536
            for t in src_names:
537
                path = '/'.join((account, container, t[0]))
538
                node = t[2]
539
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
540
                del_size = self._apply_versioning(
541
                    account, container, src_version_id)
542
                self._report_size_change(
543
                        user, account, -del_size, {
544
                                'action': 'object delete',
545
                                'path': path,
546
                             'versions': ','.join([str(dest_version_id)])
547
                     }
548
                )
549
                self._report_object_change(
550
                    user, account, path, details={'action': 'object delete'})
551
                paths.append(path)
552
            self.permissions.access_clear_bulk(paths)
553

    
554
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
555
        if user != account and until:
556
            raise NotAllowedError
557
        if shared and public:
558
            # get shared first
559
            shared = self._list_object_permissions(
560
                user, account, container, prefix, shared=True, public=False)
561
            objects = set()
562
            if shared:
563
                path, node = self._lookup_container(account, container)
564
                shared = self._get_formatted_paths(shared)
565
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
566

    
567
            # get public
568
            objects |= set(self._list_public_object_properties(
569
                user, account, container, prefix, all_props))
570
            objects = list(objects)
571

    
572
            objects.sort(key=lambda x: x[0])
573
            start, limit = self._list_limits(
574
                [x[0] for x in objects], marker, limit)
575
            return objects[start:start + limit]
576
        elif public:
577
            objects = self._list_public_object_properties(
578
                user, account, container, prefix, all_props)
579
            start, limit = self._list_limits(
580
                [x[0] for x in objects], marker, limit)
581
            return objects[start:start + limit]
582

    
583
        allowed = self._list_object_permissions(
584
            user, account, container, prefix, shared, public)
585
        if shared and not allowed:
586
            return []
587
        path, node = self._lookup_container(account, container)
588
        allowed = self._get_formatted_paths(allowed)
589
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
590
        start, limit = self._list_limits(
591
            [x[0] for x in objects], marker, limit)
592
        return objects[start:start + limit]
593

    
594
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
595
        public = self._list_object_permissions(
596
            user, account, container, prefix, shared=False, public=True)
597
        paths, nodes = self._lookup_objects(public)
598
        path = '/'.join((account, container))
599
        cont_prefix = path + '/'
600
        paths = [x[len(cont_prefix):] for x in paths]
601
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
602
        objects = [(path,) + props for path, props in zip(paths, props)]
603
        return objects
604

    
605
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
606
        objects = []
607
        while True:
608
            marker = objects[-1] if objects else None
609
            limit = 10000
610
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
611
            objects.extend(l)
612
            if not l or len(l) < limit:
613
                break
614
        return objects
615

    
616
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
617
        allowed = []
618
        path = '/'.join((account, container, prefix)).rstrip('/')
619
        if user != account:
620
            allowed = self.permissions.access_list_paths(user, path)
621
            if not allowed:
622
                raise NotAllowedError
623
        else:
624
            allowed = set()
625
            if shared:
626
                allowed.update(self.permissions.access_list_shared(path))
627
            if public:
628
                allowed.update(
629
                    [x[0] for x in self.permissions.public_list(path)])
630
            allowed = sorted(allowed)
631
            if not allowed:
632
                return []
633
        return allowed
634

    
635
    @backend_method
636
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
637
        """Return a list of object (name, version_id) tuples existing under a container."""
638

    
639
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
640
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
641

    
642
    @backend_method
643
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
644
        """Return a list of object metadata dicts existing under a container."""
645

    
646
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
647
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
648
        objects = []
649
        for p in props:
650
            if len(p) == 2:
651
                objects.append({'subdir': p[0]})
652
            else:
653
                objects.append({'name': p[0],
654
                                'bytes': p[self.SIZE + 1],
655
                                'type': p[self.TYPE + 1],
656
                                'hash': p[self.HASH + 1],
657
                                'version': p[self.SERIAL + 1],
658
                                'version_timestamp': p[self.MTIME + 1],
659
                                'modified': p[self.MTIME + 1] if until is None else None,
660
                                'modified_by': p[self.MUSER + 1],
661
                                'uuid': p[self.UUID + 1],
662
                                'checksum': p[self.CHECKSUM + 1]})
663
        return objects
664

    
665
    @backend_method
666
    def list_object_permissions(self, user, account, container, prefix=''):
667
        """Return a list of paths that enforce permissions under a container."""
668

    
669
        logger.debug("list_object_permissions: %s %s %s %s", user,
670
                     account, container, prefix)
671
        return self._list_object_permissions(user, account, container, prefix, True, False)
672

    
673
    @backend_method
674
    def list_object_public(self, user, account, container, prefix=''):
675
        """Return a dict mapping paths to public ids for objects that are public under a container."""
676

    
677
        logger.debug("list_object_public: %s %s %s %s", user,
678
                     account, container, prefix)
679
        public = {}
680
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
681
            public[path] = p + ULTIMATE_ANSWER
682
        return public
683

    
684
    @backend_method
685
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
686
        """Return a dictionary with the object metadata for the domain."""
687

    
688
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
689
                     account, container, name, domain, version)
690
        self._can_read(user, account, container, name)
691
        path, node = self._lookup_object(account, container, name)
692
        props = self._get_version(node, version)
693
        if version is None:
694
            modified = props[self.MTIME]
695
        else:
696
            try:
697
                modified = self._get_version(
698
                    node)[self.MTIME]  # Overall last modification.
699
            except NameError:  # Object may be deleted.
700
                del_props = self.node.version_lookup(
701
                    node, inf, CLUSTER_DELETED)
702
                if del_props is None:
703
                    raise ItemNotExists('Object does not exist')
704
                modified = del_props[self.MTIME]
705

    
706
        meta = {}
707
        if include_user_defined:
708
            meta.update(
709
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
710
        meta.update({'name': name,
711
                     'bytes': props[self.SIZE],
712
                     'type': props[self.TYPE],
713
                     'hash': props[self.HASH],
714
                     'version': props[self.SERIAL],
715
                     'version_timestamp': props[self.MTIME],
716
                     'modified': modified,
717
                     'modified_by': props[self.MUSER],
718
                     'uuid': props[self.UUID],
719
                     'checksum': props[self.CHECKSUM]})
720
        return meta
721

    
722
    @backend_method
723
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
724
        """Update the metadata associated with the object for the domain and return the new version."""
725

    
726
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
727
                     user, account, container, name, domain, meta, replace)
728
        self._can_write(user, account, container, name)
729
        path, node = self._lookup_object(account, container, name)
730
        src_version_id, dest_version_id = self._put_metadata(
731
            user, node, domain, meta, replace)
732
        self._apply_versioning(account, container, src_version_id)
733
        return dest_version_id
734

    
735
    @backend_method
736
    def get_object_permissions(self, user, account, container, name):
737
        """Return the action allowed on the object, the path
738
        from which the object gets its permissions from,
739
        along with a dictionary containing the permissions."""
740

    
741
        logger.debug("get_object_permissions: %s %s %s %s", user,
742
                     account, container, name)
743
        allowed = 'write'
744
        permissions_path = self._get_permissions_path(account, container, name)
745
        if user != account:
746
            if self.permissions.access_check(permissions_path, self.WRITE, user):
747
                allowed = 'write'
748
            elif self.permissions.access_check(permissions_path, self.READ, user):
749
                allowed = 'read'
750
            else:
751
                raise NotAllowedError
752
        self._lookup_object(account, container, name)
753
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
754

    
755
    @backend_method
756
    def update_object_permissions(self, user, account, container, name, permissions):
757
        """Update the permissions associated with the object."""
758

    
759
        logger.debug("update_object_permissions: %s %s %s %s %s",
760
                     user, account, container, name, permissions)
761
        if user != account:
762
            raise NotAllowedError
763
        path = self._lookup_object(account, container, name)[0]
764
        self._check_permissions(path, permissions)
765
        self.permissions.access_set(path, permissions)
766
        self._report_sharing_change(user, account, path, {'members':
767
                                    self.permissions.access_members(path)})
768

    
769
    @backend_method
770
    def get_object_public(self, user, account, container, name):
771
        """Return the public id of the object if applicable."""
772

    
773
        logger.debug(
774
            "get_object_public: %s %s %s %s", user, account, container, name)
775
        self._can_read(user, account, container, name)
776
        path = self._lookup_object(account, container, name)[0]
777
        p = self.permissions.public_get(path)
778
        if p is not None:
779
            p += ULTIMATE_ANSWER
780
        return p
781

    
782
    @backend_method
783
    def update_object_public(self, user, account, container, name, public):
784
        """Update the public status of the object."""
785

    
786
        logger.debug("update_object_public: %s %s %s %s %s", user,
787
                     account, container, name, public)
788
        self._can_write(user, account, container, name)
789
        path = self._lookup_object(account, container, name)[0]
790
        if not public:
791
            self.permissions.public_unset(path)
792
        else:
793
            self.permissions.public_set(path)
794

    
795
    @backend_method
796
    def get_object_hashmap(self, user, account, container, name, version=None):
797
        """Return the object's size and a list with partial hashes."""
798

    
799
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
800
                     account, container, name, version)
801
        self._can_read(user, account, container, name)
802
        path, node = self._lookup_object(account, container, name)
803
        props = self._get_version(node, version)
804
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
805
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
806

    
807
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
808
        if permissions is not None and user != account:
809
            raise NotAllowedError
810
        self._can_write(user, account, container, name)
811
        if permissions is not None:
812
            path = '/'.join((account, container, name))
813
            self._check_permissions(path, permissions)
814

    
815
        account_path, account_node = self._lookup_account(account, True)
816
        container_path, container_node = self._lookup_container(
817
            account, container)
818
        path, node = self._put_object_node(
819
            container_path, container_node, name)
820
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
821

    
822
        # Handle meta.
823
        if src_version_id is None:
824
            src_version_id = pre_version_id
825
        self._put_metadata_duplicate(
826
            src_version_id, dest_version_id, domain, meta, replace_meta)
827

    
828
        del_size = self._apply_versioning(account, container, pre_version_id)
829
        size_delta = size - del_size
830
        if not self.quotaholder_url: # Check quota.
831
            if size_delta > 0:
832
                account_quota = long(self._get_policy(account_node)['quota'])
833
                account_usage = self._get_statistics(account_node)[1] + size_delta
834
                container_quota = long(self._get_policy(container_node)['quota'])
835
                container_usage = self._get_statistics(container_node)[1] + size_delta
836
                if (account_quota > 0 and account_usage > account_quota):
837
                    logger.error('account_quota: %s, account_usage: %s' % (
838
                        account_quota, account_usage
839
                    ))
840
                    raise QuotaError
841
                if (container_quota > 0 and container_usage > container_quota):
842
                    # This must be executed in a transaction, so the version is
843
                    # never created if it fails.
844
                    logger.error('container_quota: %s, container_usage: %s' % (
845
                        container_quota, container_usage
846
                    ))
847
                    raise QuotaError
848
        self._report_size_change(user, account, size_delta,
849
                                 {'action': 'object update', 'path': path,
850
                                  'versions': ','.join([str(dest_version_id)])})
851
        if permissions is not None:
852
            self.permissions.access_set(path, permissions)
853
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
854

    
855
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
856
        return dest_version_id
857

    
858
    @backend_method
859
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
860
        """Create/update an object with the specified size and partial hashes."""
861

    
862
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
863
                     account, container, name, size, type, hashmap, checksum)
864
        if size == 0:  # No such thing as an empty hashmap.
865
            hashmap = [self.put_block('')]
866
        map = HashMap(self.block_size, self.hash_algorithm)
867
        map.extend([binascii.unhexlify(x) for x in hashmap])
868
        missing = self.store.block_search(map)
869
        if missing:
870
            ie = IndexError()
871
            ie.data = [binascii.hexlify(x) for x in missing]
872
            raise ie
873

    
874
        hash = map.hash()
875
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
876
        self.store.map_put(hash, map)
877
        return dest_version_id
878

    
879
    @backend_method
880
    def update_object_checksum(self, user, account, container, name, version, checksum):
881
        """Update an object's checksum."""
882

    
883
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
884
                     user, account, container, name, version, checksum)
885
        # Update objects with greater version and same hashmap and size (fix metadata updates).
886
        self._can_write(user, account, container, name)
887
        path, node = self._lookup_object(account, container, name)
888
        props = self._get_version(node, version)
889
        versions = self.node.node_get_versions(node)
890
        for x in versions:
891
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
892
                self.node.version_put_property(
893
                    x[self.SERIAL], 'checksum', checksum)
894

    
895
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
896
        dest_version_ids = []
897
        self._can_read(user, src_account, src_container, src_name)
898
        path, node = self._lookup_object(src_account, src_container, src_name)
899
        # TODO: Will do another fetch of the properties in duplicate version...
900
        props = self._get_version(
901
            node, src_version)  # Check to see if source exists.
902
        src_version_id = props[self.SERIAL]
903
        hash = props[self.HASH]
904
        size = props[self.SIZE]
905
        is_copy = not is_move and (src_account, src_container, src_name) != (
906
            dest_account, dest_container, dest_name)  # New uuid.
907
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
908
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
909
            self._delete_object(user, src_account, src_container, src_name)
910

    
911
        if delimiter:
912
            prefix = src_name + \
913
                delimiter if not src_name.endswith(delimiter) else src_name
914
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
915
            src_names.sort(key=lambda x: x[2])  # order by nodes
916
            paths = [elem[0] for elem in src_names]
917
            nodes = [elem[2] for elem in src_names]
918
            # TODO: Will do another fetch of the properties in duplicate version...
919
            props = self._get_versions(nodes)  # Check to see if source exists.
920

    
921
            for prop, path, node in zip(props, paths, nodes):
922
                src_version_id = prop[self.SERIAL]
923
                hash = prop[self.HASH]
924
                vtype = prop[self.TYPE]
925
                size = prop[self.SIZE]
926
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
927
                    delimiter) else dest_name
928
                vdest_name = path.replace(prefix, dest_prefix, 1)
929
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
930
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
931
                    self._delete_object(user, src_account, src_container, path)
932
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
933

    
934
    @backend_method
935
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
936
        """Copy an object's data and metadata."""
937

    
938
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
939
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
940
        return dest_version_id
941

    
942
    @backend_method
943
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
944
        """Move an object's data and metadata."""
945

    
946
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
947
        if user != src_account:
948
            raise NotAllowedError
949
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
950
        return dest_version_id
951

    
952
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
953
        if user != account:
954
            raise NotAllowedError
955

    
956
        if until is not None:
957
            path = '/'.join((account, container, name))
958
            node = self.node.node_lookup(path)
959
            if node is None:
960
                return
961
            hashes = []
962
            size = 0
963
            serials = []
964
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
965
            hashes += h
966
            size += s
967
            serials += v
968
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
969
            hashes += h
970
            size += s
971
            serials += v
972
            for h in hashes:
973
                self.store.map_delete(h)
974
            self.node.node_purge(node, until, CLUSTER_DELETED)
975
            try:
976
                props = self._get_version(node)
977
            except NameError:
978
                self.permissions.access_clear(path)
979
            self._report_size_change(user, account, -size,
980
                                    {'action': 'object purge', 'path': path,
981
                                     'versions': ','.join(str(i) for i in serials)})
982
            return
983

    
984
        path, node = self._lookup_object(account, container, name)
985
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
986
        del_size = self._apply_versioning(account, container, src_version_id)
987
        self._report_size_change(user, account, -del_size,
988
                                 {'action': 'object delete', 'path': path,
989
                                  'versions': ','.join([str(dest_version_id)])})
990
        self._report_object_change(
991
            user, account, path, details={'action': 'object delete'})
992
        self.permissions.access_clear(path)
993

    
994
        if delimiter:
995
            prefix = name + delimiter if not name.endswith(delimiter) else name
996
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
997
            paths = []
998
            for t in src_names:
999
                path = '/'.join((account, container, t[0]))
1000
                node = t[2]
1001
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1002
                del_size = self._apply_versioning(
1003
                    account, container, src_version_id)
1004
                self._report_size_change(user, account, -del_size,
1005
                                         {'action': 'object delete',
1006
                                          'path': path,
1007
                                          'versions': ','.join([str(dest_version_id)])})
1008
                self._report_object_change(
1009
                    user, account, path, details={'action': 'object delete'})
1010
                paths.append(path)
1011
            self.permissions.access_clear_bulk(paths)
1012

    
1013
    @backend_method
1014
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1015
        """Delete/purge an object."""
1016

    
1017
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1018
                     account, container, name, until, prefix, delimiter)
1019
        self._delete_object(user, account, container, name, until, delimiter)
1020

    
1021
    @backend_method
1022
    def list_versions(self, user, account, container, name):
1023
        """Return a list of all (version, version_timestamp) tuples for an object."""
1024

    
1025
        logger.debug(
1026
            "list_versions: %s %s %s %s", user, account, container, name)
1027
        self._can_read(user, account, container, name)
1028
        path, node = self._lookup_object(account, container, name)
1029
        versions = self.node.node_get_versions(node)
1030
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1031

    
1032
    @backend_method
1033
    def get_uuid(self, user, uuid):
1034
        """Return the (account, container, name) for the UUID given."""
1035

    
1036
        logger.debug("get_uuid: %s %s", user, uuid)
1037
        info = self.node.latest_uuid(uuid)
1038
        if info is None:
1039
            raise NameError
1040
        path, serial = info
1041
        account, container, name = path.split('/', 2)
1042
        self._can_read(user, account, container, name)
1043
        return (account, container, name)
1044

    
1045
    @backend_method
1046
    def get_public(self, user, public):
1047
        """Return the (account, container, name) for the public id given."""
1048

    
1049
        logger.debug("get_public: %s %s", user, public)
1050
        if public is None or public < ULTIMATE_ANSWER:
1051
            raise NameError
1052
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1053
        if path is None:
1054
            raise NameError
1055
        account, container, name = path.split('/', 2)
1056
        self._can_read(user, account, container, name)
1057
        return (account, container, name)
1058

    
1059
    @backend_method(autocommit=0)
1060
    def get_block(self, hash):
1061
        """Return a block's data."""
1062

    
1063
        logger.debug("get_block: %s", hash)
1064
        block = self.store.block_get(binascii.unhexlify(hash))
1065
        if not block:
1066
            raise ItemNotExists('Block does not exist')
1067
        return block
1068

    
1069
    @backend_method(autocommit=0)
1070
    def put_block(self, data):
1071
        """Store a block and return the hash."""
1072

    
1073
        logger.debug("put_block: %s", len(data))
1074
        return binascii.hexlify(self.store.block_put(data))
1075

    
1076
    @backend_method(autocommit=0)
1077
    def update_block(self, hash, data, offset=0):
1078
        """Update a known block and return the hash."""
1079

    
1080
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1081
        if offset == 0 and len(data) == self.block_size:
1082
            return self.put_block(data)
1083
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1084
        return binascii.hexlify(h)
1085

    
1086
    # Path functions.
1087

    
1088
    def _generate_uuid(self):
1089
        return str(uuidlib.uuid4())
1090

    
1091
    def _put_object_node(self, path, parent, name):
1092
        path = '/'.join((path, name))
1093
        node = self.node.node_lookup(path)
1094
        if node is None:
1095
            node = self.node.node_create(parent, path)
1096
        return path, node
1097

    
1098
    def _put_path(self, user, parent, path):
1099
        node = self.node.node_create(parent, path)
1100
        self.node.version_create(node, None, 0, '', None, user,
1101
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1102
        return node
1103

    
1104
    def _lookup_account(self, account, create=True):
1105
        node = self.node.node_lookup(account)
1106
        if node is None and create:
1107
            node = self._put_path(
1108
                account, self.ROOTNODE, account)  # User is account.
1109
        return account, node
1110

    
1111
    def _lookup_container(self, account, container):
1112
        path = '/'.join((account, container))
1113
        node = self.node.node_lookup(path)
1114
        if node is None:
1115
            raise ItemNotExists('Container does not exist')
1116
        return path, node
1117

    
1118
    def _lookup_object(self, account, container, name):
1119
        path = '/'.join((account, container, name))
1120
        node = self.node.node_lookup(path)
1121
        if node is None:
1122
            raise ItemNotExists('Object does not exist')
1123
        return path, node
1124

    
1125
    def _lookup_objects(self, paths):
1126
        nodes = self.node.node_lookup_bulk(paths)
1127
        return paths, nodes
1128

    
1129
    def _get_properties(self, node, until=None):
1130
        """Return properties until the timestamp given."""
1131

    
1132
        before = until if until is not None else inf
1133
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1134
        if props is None and until is not None:
1135
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1136
        if props is None:
1137
            raise ItemNotExists('Path does not exist')
1138
        return props
1139

    
1140
    def _get_statistics(self, node, until=None):
1141
        """Return count, sum of size and latest timestamp of everything under node."""
1142

    
1143
        if until is None:
1144
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1145
        else:
1146
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1147
        if stats is None:
1148
            stats = (0, 0, 0)
1149
        return stats
1150

    
1151
    def _get_version(self, node, version=None):
1152
        if version is None:
1153
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1154
            if props is None:
1155
                raise ItemNotExists('Object does not exist')
1156
        else:
1157
            try:
1158
                version = int(version)
1159
            except ValueError:
1160
                raise VersionNotExists('Version does not exist')
1161
            props = self.node.version_get_properties(version)
1162
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1163
                raise VersionNotExists('Version does not exist')
1164
        return props
1165

    
1166
    def _get_versions(self, nodes):
1167
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1168

    
1169
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1170
        """Create a new version of the node."""
1171

    
1172
        props = self.node.version_lookup(
1173
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1174
        if props is not None:
1175
            src_version_id = props[self.SERIAL]
1176
            src_hash = props[self.HASH]
1177
            src_size = props[self.SIZE]
1178
            src_type = props[self.TYPE]
1179
            src_checksum = props[self.CHECKSUM]
1180
        else:
1181
            src_version_id = None
1182
            src_hash = None
1183
            src_size = 0
1184
            src_type = ''
1185
            src_checksum = ''
1186
        if size is None:  # Set metadata.
1187
            hash = src_hash  # This way hash can be set to None (account or container).
1188
            size = src_size
1189
        if type is None:
1190
            type = src_type
1191
        if checksum is None:
1192
            checksum = src_checksum
1193
        uuid = self._generate_uuid(
1194
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1195

    
1196
        if src_node is None:
1197
            pre_version_id = src_version_id
1198
        else:
1199
            pre_version_id = None
1200
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1201
            if props is not None:
1202
                pre_version_id = props[self.SERIAL]
1203
        if pre_version_id is not None:
1204
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1205

    
1206
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1207
        return pre_version_id, dest_version_id
1208

    
1209
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1210
        if src_version_id is not None:
1211
            self.node.attribute_copy(src_version_id, dest_version_id)
1212
        if not replace:
1213
            self.node.attribute_del(dest_version_id, domain, (
1214
                k for k, v in meta.iteritems() if v == ''))
1215
            self.node.attribute_set(dest_version_id, domain, (
1216
                (k, v) for k, v in meta.iteritems() if v != ''))
1217
        else:
1218
            self.node.attribute_del(dest_version_id, domain)
1219
            self.node.attribute_set(dest_version_id, domain, ((
1220
                k, v) for k, v in meta.iteritems()))
1221

    
1222
    def _put_metadata(self, user, node, domain, meta, replace=False):
1223
        """Create a new version and store metadata."""
1224

    
1225
        src_version_id, dest_version_id = self._put_version_duplicate(
1226
            user, node)
1227
        self._put_metadata_duplicate(
1228
            src_version_id, dest_version_id, domain, meta, replace)
1229
        return src_version_id, dest_version_id
1230

    
1231
    def _list_limits(self, listing, marker, limit):
1232
        start = 0
1233
        if marker:
1234
            try:
1235
                start = listing.index(marker) + 1
1236
            except ValueError:
1237
                pass
1238
        if not limit or limit > 10000:
1239
            limit = 10000
1240
        return start, limit
1241

    
1242
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1243
        cont_prefix = path + '/'
1244
        prefix = cont_prefix + prefix
1245
        start = cont_prefix + marker if marker else None
1246
        before = until if until is not None else inf
1247
        filterq = keys if domain else []
1248
        sizeq = size_range
1249

    
1250
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1251
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1252
        objects.sort(key=lambda x: x[0])
1253
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1254
        return objects
1255

    
1256
    # Reporting functions.
1257

    
1258
    def _report_size_change(self, user, account, size, details={}):
1259
        account_node = self._lookup_account(account, True)[1]
1260
        total = self._get_statistics(account_node)[1]
1261
        details.update({'user': user, 'total': total})
1262
        logger.debug(
1263
            "_report_size_change: %s %s %s %s", user, account, size, details)
1264
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1265
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1266
                              float(size), details))
1267

    
1268
        if not self.quotaholder_url:
1269
            return
1270
        
1271
        serial = self.quotaholder.issue_commission(
1272
                context     =   {},
1273
                target      =   user.uuid,
1274
                key         =   '1',
1275
                clientkey   =   'pithos',
1276
                ownerkey    =   '',
1277
                        name        =   details['path'] if 'path' in details else '',
1278
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1279
        )
1280
        self.serials.append(serial)
1281

    
1282
    def _report_object_change(self, user, account, path, details={}):
1283
        details.update({'user': user})
1284
        logger.debug("_report_object_change: %s %s %s %s", user,
1285
                     account, path, details)
1286
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1287
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1288

    
1289
    def _report_sharing_change(self, user, account, path, details={}):
1290
        logger.debug("_report_permissions_change: %s %s %s %s",
1291
                     user, account, path, details)
1292
        details.update({'user': user})
1293
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1294
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1295

    
1296
    # Policy functions.
1297

    
1298
    def _check_policy(self, policy):
1299
        for k in policy.keys():
1300
            if policy[k] == '':
1301
                policy[k] = self.default_policy.get(k)
1302
        for k, v in policy.iteritems():
1303
            if k == 'quota':
1304
                q = int(v)  # May raise ValueError.
1305
                if q < 0:
1306
                    raise ValueError
1307
            elif k == 'versioning':
1308
                if v not in ['auto', 'none']:
1309
                    raise ValueError
1310
            else:
1311
                raise ValueError
1312

    
1313
    def _put_policy(self, node, policy, replace):
1314
        if replace:
1315
            for k, v in self.default_policy.iteritems():
1316
                if k not in policy:
1317
                    policy[k] = v
1318
        self.node.policy_set(node, policy)
1319

    
1320
    def _get_policy(self, node):
1321
        policy = self.default_policy.copy()
1322
        policy.update(self.node.policy_get(node))
1323
        return policy
1324

    
1325
    def _apply_versioning(self, account, container, version_id):
1326
        """Delete the provided version if such is the policy.
1327
           Return size of object removed.
1328
        """
1329

    
1330
        if version_id is None:
1331
            return 0
1332
        path, node = self._lookup_container(account, container)
1333
        versioning = self._get_policy(node)['versioning']
1334
        if versioning != 'auto':
1335
            hash, size = self.node.version_remove(version_id)
1336
            self.store.map_delete(hash)
1337
            return size
1338
        elif self.free_versioning:
1339
            return self.node.version_get_properties(
1340
                version_id, keys=('size',))[0]
1341
        return 0
1342

    
1343
    # Access control functions.
1344

    
1345
    def _check_groups(self, groups):
1346
        # raise ValueError('Bad characters in groups')
1347
        pass
1348

    
1349
    def _check_permissions(self, path, permissions):
1350
        # raise ValueError('Bad characters in permissions')
1351
        pass
1352

    
1353
    def _get_formatted_paths(self, paths):
1354
        formatted = []
1355
        for p in paths:
1356
            node = self.node.node_lookup(p)
1357
            props = None
1358
            if node is not None:
1359
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1360
            if props is not None:
1361
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1362
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1363
                formatted.append((p, self.MATCH_EXACT))
1364
        return formatted
1365

    
1366
    def _get_permissions_path(self, account, container, name):
1367
        path = '/'.join((account, container, name))
1368
        permission_paths = self.permissions.access_inherit(path)
1369
        permission_paths.sort()
1370
        permission_paths.reverse()
1371
        for p in permission_paths:
1372
            if p == path:
1373
                return p
1374
            else:
1375
                if p.count('/') < 2:
1376
                    continue
1377
                node = self.node.node_lookup(p)
1378
                props = None
1379
                if node is not None:
1380
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1381
                if props is not None:
1382
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1383
                        return p
1384
        return None
1385

    
1386
    def _can_read(self, user, account, container, name):
1387
        if user == account:
1388
            return True
1389
        path = '/'.join((account, container, name))
1390
        if self.permissions.public_get(path) is not None:
1391
            return True
1392
        path = self._get_permissions_path(account, container, name)
1393
        if not path:
1394
            raise NotAllowedError
1395
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1396
            raise NotAllowedError
1397

    
1398
    def _can_write(self, user, account, container, name):
1399
        if user == account:
1400
            return True
1401
        path = '/'.join((account, container, name))
1402
        path = self._get_permissions_path(account, container, name)
1403
        if not path:
1404
            raise NotAllowedError
1405
        if not self.permissions.access_check(path, self.WRITE, user):
1406
            raise NotAllowedError
1407

    
1408
    def _allowed_accounts(self, user):
1409
        allow = set()
1410
        for path in self.permissions.access_list_paths(user):
1411
            allow.add(path.split('/', 1)[0])
1412
        return sorted(allow)
1413

    
1414
    def _allowed_containers(self, user, account):
1415
        allow = set()
1416
        for path in self.permissions.access_list_paths(user, account):
1417
            allow.add(path.split('/', 2)[1])
1418
        return sorted(allow)