Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 478f763e

History | View | Annotate | Download (62.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from kamaki.clients.quotaholder import QuotaholderClient
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115

    
116
        try:
117
            ret = func(self, *args, **kw)
118
            for m in self.messages:
119
                self.queue.send(*m)
120
            if serials:
121
                self.quotaholder.accept_commission(
122
                            context     =   {},
123
                            clientkey   =   'pithos',
124
                            serials     =   serials)
125
            self.wrapper.commit()
126
            return ret
127
        except:
128
            if serials:
129
                self.quotaholder.reject_commission(
130
                            context     =   {},
131
                            clientkey   =   'pithos',
132
                            serials     =   serials)
133
            self.wrapper.rollback()
134
            raise
135
    return fn
136

    
137

    
138
class ModularBackend(BaseBackend):
139
    """A modular backend.
140

141
    Uses modules for SQL functions and storage.
142
    """
143

    
144
    def __init__(self, db_module=None, db_connection=None,
145
                 block_module=None, block_path=None, block_umask=None,
146
                 queue_module=None, queue_hosts=None, queue_exchange=None,
147
                 quotaholder_url=None, quotaholder_token=None,
148
                 free_versioning=True):
149
        db_module = db_module or DEFAULT_DB_MODULE
150
        db_connection = db_connection or DEFAULT_DB_CONNECTION
151
        block_module = block_module or DEFAULT_BLOCK_MODULE
152
        block_path = block_path or DEFAULT_BLOCK_PATH
153
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
154
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
155
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
156
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
157
        
158
        self.hash_algorithm = 'sha256'
159
        self.block_size = 4 * 1024 * 1024  # 4MB
160
        self.free_versioning = free_versioning
161

    
162
        self.default_policy = {'quota': DEFAULT_QUOTA,
163
                               'versioning': DEFAULT_VERSIONING}
164

    
165
        def load_module(m):
166
            __import__(m)
167
            return sys.modules[m]
168

    
169
        self.db_module = load_module(db_module)
170
        self.wrapper = self.db_module.DBWrapper(db_connection)
171
        params = {'wrapper': self.wrapper}
172
        self.permissions = self.db_module.Permissions(**params)
173
        self.config = self.db_module.Config(**params)
174
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
175
        for x in ['READ', 'WRITE']:
176
            setattr(self, x, getattr(self.db_module, x))
177
        self.node = self.db_module.Node(**params)
178
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
179
            setattr(self, x, getattr(self.db_module, x))
180

    
181
        self.block_module = load_module(block_module)
182
        params = {'path': block_path,
183
                  'block_size': self.block_size,
184
                  'hash_algorithm': self.hash_algorithm,
185
                  'umask': block_umask}
186
        self.store = self.block_module.Store(**params)
187

    
188
        if queue_module and queue_hosts:
189
            self.queue_module = load_module(queue_module)
190
            params = {'hosts': queue_hosts,
191
                              'exchange': queue_exchange,
192
                      'client_id': QUEUE_CLIENT_ID}
193
            self.queue = self.queue_module.Queue(**params)
194
        else:
195
            class NoQueue:
196
                def send(self, *args):
197
                    pass
198

    
199
                def close(self):
200
                    pass
201

    
202
            self.queue = NoQueue()
203

    
204
        self.quotaholder_url = quotaholder_url
205
        self.quotaholder_token = quotaholder_token
206
        self.quotaholder = QuotaholderClient(quotaholder_url, quotaholder_token)
207
        self.serials = []
208
        self.messages = []
209

    
210
    def close(self):
211
        self.wrapper.close()
212
        self.queue.close()
213

    
214
    @property
215
    def using_external_quotaholder(self):
216
        if self.quotaholder_url:
217
            return True
218
        return False
219

    
220
    @backend_method
221
    def list_accounts(self, user, marker=None, limit=10000):
222
        """Return a list of accounts the user can access."""
223

    
224
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
225
        allowed = self._allowed_accounts(user)
226
        start, limit = self._list_limits(allowed, marker, limit)
227
        return allowed[start:start + limit]
228

    
229
    @backend_method
230
    def get_account_meta(
231
            self, user, account, domain, until=None, include_user_defined=True,
232
            external_quota=None):
233
        """Return a dictionary with the account metadata for the domain."""
234

    
235
        logger.debug(
236
            "get_account_meta: %s %s %s %s", user, account, domain, until)
237
        path, node = self._lookup_account(account, user == account)
238
        if user != account:
239
            if until or node is None or account not in self._allowed_accounts(user):
240
                raise NotAllowedError
241
        try:
242
            props = self._get_properties(node, until)
243
            mtime = props[self.MTIME]
244
        except NameError:
245
            props = None
246
            mtime = until
247
        count, bytes, tstamp = self._get_statistics(node, until)
248
        tstamp = max(tstamp, mtime)
249
        if until is None:
250
            modified = tstamp
251
        else:
252
            modified = self._get_statistics(
253
                node)[2]  # Overall last modification.
254
            modified = max(modified, mtime)
255

    
256
        if user != account:
257
            meta = {'name': account}
258
        else:
259
            meta = {}
260
            if props is not None and include_user_defined:
261
                meta.update(
262
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
263
            if until is not None:
264
                meta.update({'until_timestamp': tstamp})
265
            meta.update({'name': account, 'count': count, 'bytes': bytes})
266
            if self.using_external_quotaholder:
267
                external_quota = external_quota or {}
268
                meta['bytes'] = external_quota.get('currValue', 0)
269
        meta.update({'modified': modified})
270
        return meta
271

    
272
    @backend_method
273
    def update_account_meta(self, user, account, domain, meta, replace=False):
274
        """Update the metadata associated with the account for the domain."""
275

    
276
        logger.debug("update_account_meta: %s %s %s %s %s", user,
277
                     account, domain, meta, replace)
278
        if user != account:
279
            raise NotAllowedError
280
        path, node = self._lookup_account(account, True)
281
        self._put_metadata(user, node, domain, meta, replace)
282

    
283
    @backend_method
284
    def get_account_groups(self, user, account):
285
        """Return a dictionary with the user groups defined for this account."""
286

    
287
        logger.debug("get_account_groups: %s %s", user, account)
288
        if user != account:
289
            if account not in self._allowed_accounts(user):
290
                raise NotAllowedError
291
            return {}
292
        self._lookup_account(account, True)
293
        return self.permissions.group_dict(account)
294

    
295
    @backend_method
296
    def update_account_groups(self, user, account, groups, replace=False):
297
        """Update the groups associated with the account."""
298

    
299
        logger.debug("update_account_groups: %s %s %s %s", user,
300
                     account, groups, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        self._lookup_account(account, True)
304
        self._check_groups(groups)
305
        if replace:
306
            self.permissions.group_destroy(account)
307
        for k, v in groups.iteritems():
308
            if not replace:  # If not already deleted.
309
                self.permissions.group_delete(account, k)
310
            if v:
311
                self.permissions.group_addmany(account, k, v)
312

    
313
    @backend_method
314
    def get_account_policy(self, user, account, external_quota=None):
315
        """Return a dictionary with the account policy."""
316

    
317
        logger.debug("get_account_policy: %s %s", user, account)
318
        if user != account:
319
            if account not in self._allowed_accounts(user):
320
                raise NotAllowedError
321
            return {}
322
        path, node = self._lookup_account(account, True)
323
        policy = self._get_policy(node)
324
        if self.using_external_quotaholder:
325
            external_quota = external_quota or {}
326
            policy['quota'] = external_quota.get('maxValue', 0)
327
        return policy
328

    
329
    @backend_method
330
    def update_account_policy(self, user, account, policy, replace=False):
331
        """Update the policy associated with the account."""
332

    
333
        logger.debug("update_account_policy: %s %s %s %s", user,
334
                     account, policy, replace)
335
        if user != account:
336
            raise NotAllowedError
337
        path, node = self._lookup_account(account, True)
338
        self._check_policy(policy)
339
        self._put_policy(node, policy, replace)
340

    
341
    @backend_method
342
    def put_account(self, user, account, policy={}):
343
        """Create a new account with the given name."""
344

    
345
        logger.debug("put_account: %s %s %s", user, account, policy)
346
        if user != account:
347
            raise NotAllowedError
348
        node = self.node.node_lookup(account)
349
        if node is not None:
350
            raise AccountExists('Account already exists')
351
        if policy:
352
            self._check_policy(policy)
353
        node = self._put_path(user, self.ROOTNODE, account)
354
        self._put_policy(node, policy, True)
355

    
356
    @backend_method
357
    def delete_account(self, user, account):
358
        """Delete the account with the given name."""
359

    
360
        logger.debug("delete_account: %s %s", user, account)
361
        if user != account:
362
            raise NotAllowedError
363
        node = self.node.node_lookup(account)
364
        if node is None:
365
            return
366
        if not self.node.node_remove(node):
367
            raise AccountNotEmpty('Account is not empty')
368
        self.permissions.group_destroy(account)
369

    
370
    @backend_method
371
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
372
        """Return a list of containers existing under an account."""
373

    
374
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
375
                     account, marker, limit, shared, until, public)
376
        if user != account:
377
            if until or account not in self._allowed_accounts(user):
378
                raise NotAllowedError
379
            allowed = self._allowed_containers(user, account)
380
            start, limit = self._list_limits(allowed, marker, limit)
381
            return allowed[start:start + limit]
382
        if shared or public:
383
            allowed = set()
384
            if shared:
385
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
386
            if public:
387
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
388
            allowed = sorted(allowed)
389
            start, limit = self._list_limits(allowed, marker, limit)
390
            return allowed[start:start + limit]
391
        node = self.node.node_lookup(account)
392
        containers = [x[0] for x in self._list_object_properties(
393
            node, account, '', '/', marker, limit, False, None, [], until)]
394
        start, limit = self._list_limits(
395
            [x[0] for x in containers], marker, limit)
396
        return containers[start:start + limit]
397

    
398
    @backend_method
399
    def list_container_meta(self, user, account, container, domain, until=None):
400
        """Return a list with all the container's object meta keys for the domain."""
401

    
402
        logger.debug("list_container_meta: %s %s %s %s %s", user,
403
                     account, container, domain, until)
404
        allowed = []
405
        if user != account:
406
            if until:
407
                raise NotAllowedError
408
            allowed = self.permissions.access_list_paths(
409
                user, '/'.join((account, container)))
410
            if not allowed:
411
                raise NotAllowedError
412
        path, node = self._lookup_container(account, container)
413
        before = until if until is not None else inf
414
        allowed = self._get_formatted_paths(allowed)
415
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
416

    
417
    @backend_method
418
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
419
        """Return a dictionary with the container metadata for the domain."""
420

    
421
        logger.debug("get_container_meta: %s %s %s %s %s", user,
422
                     account, container, domain, until)
423
        if user != account:
424
            if until or container not in self._allowed_containers(user, account):
425
                raise NotAllowedError
426
        path, node = self._lookup_container(account, container)
427
        props = self._get_properties(node, until)
428
        mtime = props[self.MTIME]
429
        count, bytes, tstamp = self._get_statistics(node, until)
430
        tstamp = max(tstamp, mtime)
431
        if until is None:
432
            modified = tstamp
433
        else:
434
            modified = self._get_statistics(
435
                node)[2]  # Overall last modification.
436
            modified = max(modified, mtime)
437

    
438
        if user != account:
439
            meta = {'name': container}
440
        else:
441
            meta = {}
442
            if include_user_defined:
443
                meta.update(
444
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
445
            if until is not None:
446
                meta.update({'until_timestamp': tstamp})
447
            meta.update({'name': container, 'count': count, 'bytes': bytes})
448
        meta.update({'modified': modified})
449
        return meta
450

    
451
    @backend_method
452
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
453
        """Update the metadata associated with the container for the domain."""
454

    
455
        logger.debug("update_container_meta: %s %s %s %s %s %s",
456
                     user, account, container, domain, meta, replace)
457
        if user != account:
458
            raise NotAllowedError
459
        path, node = self._lookup_container(account, container)
460
        src_version_id, dest_version_id = self._put_metadata(
461
            user, node, domain, meta, replace)
462
        if src_version_id is not None:
463
            versioning = self._get_policy(node)['versioning']
464
            if versioning != 'auto':
465
                self.node.version_remove(src_version_id)
466

    
467
    @backend_method
468
    def get_container_policy(self, user, account, container):
469
        """Return a dictionary with the container policy."""
470

    
471
        logger.debug(
472
            "get_container_policy: %s %s %s", user, account, container)
473
        if user != account:
474
            if container not in self._allowed_containers(user, account):
475
                raise NotAllowedError
476
            return {}
477
        path, node = self._lookup_container(account, container)
478
        return self._get_policy(node)
479

    
480
    @backend_method
481
    def update_container_policy(self, user, account, container, policy, replace=False):
482
        """Update the policy associated with the container."""
483

    
484
        logger.debug("update_container_policy: %s %s %s %s %s",
485
                     user, account, container, policy, replace)
486
        if user != account:
487
            raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        self._check_policy(policy)
490
        self._put_policy(node, policy, replace)
491

    
492
    @backend_method
493
    def put_container(self, user, account, container, policy={}):
494
        """Create a new container with the given name."""
495

    
496
        logger.debug(
497
            "put_container: %s %s %s %s", user, account, container, policy)
498
        if user != account:
499
            raise NotAllowedError
500
        try:
501
            path, node = self._lookup_container(account, container)
502
        except NameError:
503
            pass
504
        else:
505
            raise ContainerExists('Container already exists')
506
        if policy:
507
            self._check_policy(policy)
508
        path = '/'.join((account, container))
509
        node = self._put_path(
510
            user, self._lookup_account(account, True)[1], path)
511
        self._put_policy(node, policy, True)
512

    
513
    @backend_method
514
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
515
        """Delete/purge the container with the given name."""
516

    
517
        logger.debug("delete_container: %s %s %s %s %s %s", user,
518
                     account, container, until, prefix, delimiter)
519
        if user != account:
520
            raise NotAllowedError
521
        path, node = self._lookup_container(account, container)
522

    
523
        if until is not None:
524
            hashes, size, serials = self.node.node_purge_children(
525
                node, until, CLUSTER_HISTORY)
526
            for h in hashes:
527
                self.store.map_delete(h)
528
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
529
            self._report_size_change(user, account, -size,
530
                                     {'action':'container purge', 'path': path,
531
                                      'versions': ','.join(str(i) for i in serials)})
532
            return
533

    
534
        if not delimiter:
535
            if self._get_statistics(node)[0] > 0:
536
                raise ContainerNotEmpty('Container is not empty')
537
            hashes, size, serials = self.node.node_purge_children(
538
                node, inf, CLUSTER_HISTORY)
539
            for h in hashes:
540
                self.store.map_delete(h)
541
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
542
            self.node.node_remove(node)
543
            self._report_size_change(user, account, -size,
544
                                     {'action': 'container delete',
545
                                      'path': path,
546
                                      'versions': ','.join(str(i) for i in serials)})
547
        else:
548
            # remove only contents
549
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
550
            paths = []
551
            for t in src_names:
552
                path = '/'.join((account, container, t[0]))
553
                node = t[2]
554
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
555
                del_size = self._apply_versioning(
556
                    account, container, src_version_id)
557
                self._report_size_change(
558
                        user, account, -del_size, {
559
                                'action': 'object delete',
560
                                'path': path,
561
                             'versions': ','.join([str(dest_version_id)])
562
                     }
563
                )
564
                self._report_object_change(
565
                    user, account, path, details={'action': 'object delete'})
566
                paths.append(path)
567
            self.permissions.access_clear_bulk(paths)
568

    
569
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
570
        if user != account and until:
571
            raise NotAllowedError
572
        if shared and public:
573
            # get shared first
574
            shared = self._list_object_permissions(
575
                user, account, container, prefix, shared=True, public=False)
576
            objects = set()
577
            if shared:
578
                path, node = self._lookup_container(account, container)
579
                shared = self._get_formatted_paths(shared)
580
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
581

    
582
            # get public
583
            objects |= set(self._list_public_object_properties(
584
                user, account, container, prefix, all_props))
585
            objects = list(objects)
586

    
587
            objects.sort(key=lambda x: x[0])
588
            start, limit = self._list_limits(
589
                [x[0] for x in objects], marker, limit)
590
            return objects[start:start + limit]
591
        elif public:
592
            objects = self._list_public_object_properties(
593
                user, account, container, prefix, all_props)
594
            start, limit = self._list_limits(
595
                [x[0] for x in objects], marker, limit)
596
            return objects[start:start + limit]
597

    
598
        allowed = self._list_object_permissions(
599
            user, account, container, prefix, shared, public)
600
        if shared and not allowed:
601
            return []
602
        path, node = self._lookup_container(account, container)
603
        allowed = self._get_formatted_paths(allowed)
604
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
605
        start, limit = self._list_limits(
606
            [x[0] for x in objects], marker, limit)
607
        return objects[start:start + limit]
608

    
609
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
610
        public = self._list_object_permissions(
611
            user, account, container, prefix, shared=False, public=True)
612
        paths, nodes = self._lookup_objects(public)
613
        path = '/'.join((account, container))
614
        cont_prefix = path + '/'
615
        paths = [x[len(cont_prefix):] for x in paths]
616
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
617
        objects = [(path,) + props for path, props in zip(paths, props)]
618
        return objects
619

    
620
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
621
        objects = []
622
        while True:
623
            marker = objects[-1] if objects else None
624
            limit = 10000
625
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
626
            objects.extend(l)
627
            if not l or len(l) < limit:
628
                break
629
        return objects
630

    
631
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
632
        allowed = []
633
        path = '/'.join((account, container, prefix)).rstrip('/')
634
        if user != account:
635
            allowed = self.permissions.access_list_paths(user, path)
636
            if not allowed:
637
                raise NotAllowedError
638
        else:
639
            allowed = set()
640
            if shared:
641
                allowed.update(self.permissions.access_list_shared(path))
642
            if public:
643
                allowed.update(
644
                    [x[0] for x in self.permissions.public_list(path)])
645
            allowed = sorted(allowed)
646
            if not allowed:
647
                return []
648
        return allowed
649

    
650
    @backend_method
651
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
652
        """Return a list of object (name, version_id) tuples existing under a container."""
653

    
654
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
655
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
656

    
657
    @backend_method
658
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
659
        """Return a list of object metadata dicts existing under a container."""
660

    
661
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
662
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
663
        objects = []
664
        for p in props:
665
            if len(p) == 2:
666
                objects.append({'subdir': p[0]})
667
            else:
668
                objects.append({'name': p[0],
669
                                'bytes': p[self.SIZE + 1],
670
                                'type': p[self.TYPE + 1],
671
                                'hash': p[self.HASH + 1],
672
                                'version': p[self.SERIAL + 1],
673
                                'version_timestamp': p[self.MTIME + 1],
674
                                'modified': p[self.MTIME + 1] if until is None else None,
675
                                'modified_by': p[self.MUSER + 1],
676
                                'uuid': p[self.UUID + 1],
677
                                'checksum': p[self.CHECKSUM + 1]})
678
        return objects
679

    
680
    @backend_method
681
    def list_object_permissions(self, user, account, container, prefix=''):
682
        """Return a list of paths that enforce permissions under a container."""
683

    
684
        logger.debug("list_object_permissions: %s %s %s %s", user,
685
                     account, container, prefix)
686
        return self._list_object_permissions(user, account, container, prefix, True, False)
687

    
688
    @backend_method
689
    def list_object_public(self, user, account, container, prefix=''):
690
        """Return a dict mapping paths to public ids for objects that are public under a container."""
691

    
692
        logger.debug("list_object_public: %s %s %s %s", user,
693
                     account, container, prefix)
694
        public = {}
695
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
696
            public[path] = p + ULTIMATE_ANSWER
697
        return public
698

    
699
    @backend_method
700
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
701
        """Return a dictionary with the object metadata for the domain."""
702

    
703
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
704
                     account, container, name, domain, version)
705
        self._can_read(user, account, container, name)
706
        path, node = self._lookup_object(account, container, name)
707
        props = self._get_version(node, version)
708
        if version is None:
709
            modified = props[self.MTIME]
710
        else:
711
            try:
712
                modified = self._get_version(
713
                    node)[self.MTIME]  # Overall last modification.
714
            except NameError:  # Object may be deleted.
715
                del_props = self.node.version_lookup(
716
                    node, inf, CLUSTER_DELETED)
717
                if del_props is None:
718
                    raise ItemNotExists('Object does not exist')
719
                modified = del_props[self.MTIME]
720

    
721
        meta = {}
722
        if include_user_defined:
723
            meta.update(
724
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
725
        meta.update({'name': name,
726
                     'bytes': props[self.SIZE],
727
                     'type': props[self.TYPE],
728
                     'hash': props[self.HASH],
729
                     'version': props[self.SERIAL],
730
                     'version_timestamp': props[self.MTIME],
731
                     'modified': modified,
732
                     'modified_by': props[self.MUSER],
733
                     'uuid': props[self.UUID],
734
                     'checksum': props[self.CHECKSUM]})
735
        return meta
736

    
737
    @backend_method
738
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
739
        """Update the metadata associated with the object for the domain and return the new version."""
740

    
741
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
742
                     user, account, container, name, domain, meta, replace)
743
        self._can_write(user, account, container, name)
744
        path, node = self._lookup_object(account, container, name)
745
        src_version_id, dest_version_id = self._put_metadata(
746
            user, node, domain, meta, replace)
747
        self._apply_versioning(account, container, src_version_id)
748
        return dest_version_id
749

    
750
    @backend_method
751
    def get_object_permissions(self, user, account, container, name):
752
        """Return the action allowed on the object, the path
753
        from which the object gets its permissions from,
754
        along with a dictionary containing the permissions."""
755

    
756
        logger.debug("get_object_permissions: %s %s %s %s", user,
757
                     account, container, name)
758
        allowed = 'write'
759
        permissions_path = self._get_permissions_path(account, container, name)
760
        if user != account:
761
            if self.permissions.access_check(permissions_path, self.WRITE, user):
762
                allowed = 'write'
763
            elif self.permissions.access_check(permissions_path, self.READ, user):
764
                allowed = 'read'
765
            else:
766
                raise NotAllowedError
767
        self._lookup_object(account, container, name)
768
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
769

    
770
    @backend_method
771
    def update_object_permissions(self, user, account, container, name, permissions):
772
        """Update the permissions associated with the object."""
773

    
774
        logger.debug("update_object_permissions: %s %s %s %s %s",
775
                     user, account, container, name, permissions)
776
        if user != account:
777
            raise NotAllowedError
778
        path = self._lookup_object(account, container, name)[0]
779
        self._check_permissions(path, permissions)
780
        self.permissions.access_set(path, permissions)
781
        self._report_sharing_change(user, account, path, {'members':
782
                                    self.permissions.access_members(path)})
783

    
784
    @backend_method
785
    def get_object_public(self, user, account, container, name):
786
        """Return the public id of the object if applicable."""
787

    
788
        logger.debug(
789
            "get_object_public: %s %s %s %s", user, account, container, name)
790
        self._can_read(user, account, container, name)
791
        path = self._lookup_object(account, container, name)[0]
792
        p = self.permissions.public_get(path)
793
        if p is not None:
794
            p += ULTIMATE_ANSWER
795
        return p
796

    
797
    @backend_method
798
    def update_object_public(self, user, account, container, name, public):
799
        """Update the public status of the object."""
800

    
801
        logger.debug("update_object_public: %s %s %s %s %s", user,
802
                     account, container, name, public)
803
        self._can_write(user, account, container, name)
804
        path = self._lookup_object(account, container, name)[0]
805
        if not public:
806
            self.permissions.public_unset(path)
807
        else:
808
            self.permissions.public_set(path)
809

    
810
    @backend_method
811
    def get_object_hashmap(self, user, account, container, name, version=None):
812
        """Return the object's size and a list with partial hashes."""
813

    
814
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
815
                     account, container, name, version)
816
        self._can_read(user, account, container, name)
817
        path, node = self._lookup_object(account, container, name)
818
        props = self._get_version(node, version)
819
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
820
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
821

    
822
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
823
        if permissions is not None and user != account:
824
            raise NotAllowedError
825
        self._can_write(user, account, container, name)
826
        if permissions is not None:
827
            path = '/'.join((account, container, name))
828
            self._check_permissions(path, permissions)
829

    
830
        account_path, account_node = self._lookup_account(account, True)
831
        container_path, container_node = self._lookup_container(
832
            account, container)
833
        path, node = self._put_object_node(
834
            container_path, container_node, name)
835
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
836

    
837
        # Handle meta.
838
        if src_version_id is None:
839
            src_version_id = pre_version_id
840
        self._put_metadata_duplicate(
841
            src_version_id, dest_version_id, domain, meta, replace_meta)
842

    
843
        del_size = self._apply_versioning(account, container, pre_version_id)
844
        size_delta = size - del_size
845
        if not self.using_external_quotaholder: # Check quota.
846
            if size_delta > 0:
847
                account_quota = long(self._get_policy(account_node)['quota'])
848
                account_usage = self._get_statistics(account_node)[1] + size_delta
849
                container_quota = long(self._get_policy(container_node)['quota'])
850
                container_usage = self._get_statistics(container_node)[1] + size_delta
851
                if (account_quota > 0 and account_usage > account_quota):
852
                    logger.error('account_quota: %s, account_usage: %s' % (
853
                        account_quota, account_usage
854
                    ))
855
                    raise QuotaError
856
                if (container_quota > 0 and container_usage > container_quota):
857
                    # This must be executed in a transaction, so the version is
858
                    # never created if it fails.
859
                    logger.error('container_quota: %s, container_usage: %s' % (
860
                        container_quota, container_usage
861
                    ))
862
                    raise QuotaError
863
        self._report_size_change(user, account, size_delta,
864
                                 {'action': 'object update', 'path': path,
865
                                  'versions': ','.join([str(dest_version_id)])})
866
        if permissions is not None:
867
            self.permissions.access_set(path, permissions)
868
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
869

    
870
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
871
        return dest_version_id
872

    
873
    @backend_method
874
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
875
        """Create/update an object with the specified size and partial hashes."""
876

    
877
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
878
                     account, container, name, size, type, hashmap, checksum)
879
        if size == 0:  # No such thing as an empty hashmap.
880
            hashmap = [self.put_block('')]
881
        map = HashMap(self.block_size, self.hash_algorithm)
882
        map.extend([binascii.unhexlify(x) for x in hashmap])
883
        missing = self.store.block_search(map)
884
        if missing:
885
            ie = IndexError()
886
            ie.data = [binascii.hexlify(x) for x in missing]
887
            raise ie
888

    
889
        hash = map.hash()
890
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
891
        self.store.map_put(hash, map)
892
        return dest_version_id
893

    
894
    @backend_method
895
    def update_object_checksum(self, user, account, container, name, version, checksum):
896
        """Update an object's checksum."""
897

    
898
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
899
                     user, account, container, name, version, checksum)
900
        # Update objects with greater version and same hashmap and size (fix metadata updates).
901
        self._can_write(user, account, container, name)
902
        path, node = self._lookup_object(account, container, name)
903
        props = self._get_version(node, version)
904
        versions = self.node.node_get_versions(node)
905
        for x in versions:
906
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
907
                self.node.version_put_property(
908
                    x[self.SERIAL], 'checksum', checksum)
909

    
910
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
911
        dest_version_ids = []
912
        self._can_read(user, src_account, src_container, src_name)
913
        path, node = self._lookup_object(src_account, src_container, src_name)
914
        # TODO: Will do another fetch of the properties in duplicate version...
915
        props = self._get_version(
916
            node, src_version)  # Check to see if source exists.
917
        src_version_id = props[self.SERIAL]
918
        hash = props[self.HASH]
919
        size = props[self.SIZE]
920
        is_copy = not is_move and (src_account, src_container, src_name) != (
921
            dest_account, dest_container, dest_name)  # New uuid.
922
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
923
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
924
            self._delete_object(user, src_account, src_container, src_name)
925

    
926
        if delimiter:
927
            prefix = src_name + \
928
                delimiter if not src_name.endswith(delimiter) else src_name
929
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
930
            src_names.sort(key=lambda x: x[2])  # order by nodes
931
            paths = [elem[0] for elem in src_names]
932
            nodes = [elem[2] for elem in src_names]
933
            # TODO: Will do another fetch of the properties in duplicate version...
934
            props = self._get_versions(nodes)  # Check to see if source exists.
935

    
936
            for prop, path, node in zip(props, paths, nodes):
937
                src_version_id = prop[self.SERIAL]
938
                hash = prop[self.HASH]
939
                vtype = prop[self.TYPE]
940
                size = prop[self.SIZE]
941
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
942
                    delimiter) else dest_name
943
                vdest_name = path.replace(prefix, dest_prefix, 1)
944
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
945
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
946
                    self._delete_object(user, src_account, src_container, path)
947
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
948

    
949
    @backend_method
950
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
951
        """Copy an object's data and metadata."""
952

    
953
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
954
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
955
        return dest_version_id
956

    
957
    @backend_method
958
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
959
        """Move an object's data and metadata."""
960

    
961
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
962
        if user != src_account:
963
            raise NotAllowedError
964
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
965
        return dest_version_id
966

    
967
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
968
        if user != account:
969
            raise NotAllowedError
970

    
971
        if until is not None:
972
            path = '/'.join((account, container, name))
973
            node = self.node.node_lookup(path)
974
            if node is None:
975
                return
976
            hashes = []
977
            size = 0
978
            serials = []
979
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
980
            hashes += h
981
            size += s
982
            serials += v
983
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
984
            hashes += h
985
            size += s
986
            serials += v
987
            for h in hashes:
988
                self.store.map_delete(h)
989
            self.node.node_purge(node, until, CLUSTER_DELETED)
990
            try:
991
                props = self._get_version(node)
992
            except NameError:
993
                self.permissions.access_clear(path)
994
            self._report_size_change(user, account, -size,
995
                                    {'action': 'object purge', 'path': path,
996
                                     'versions': ','.join(str(i) for i in serials)})
997
            return
998

    
999
        path, node = self._lookup_object(account, container, name)
1000
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1001
        del_size = self._apply_versioning(account, container, src_version_id)
1002
        self._report_size_change(user, account, -del_size,
1003
                                 {'action': 'object delete', 'path': path,
1004
                                  'versions': ','.join([str(dest_version_id)])})
1005
        self._report_object_change(
1006
            user, account, path, details={'action': 'object delete'})
1007
        self.permissions.access_clear(path)
1008

    
1009
        if delimiter:
1010
            prefix = name + delimiter if not name.endswith(delimiter) else name
1011
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1012
            paths = []
1013
            for t in src_names:
1014
                path = '/'.join((account, container, t[0]))
1015
                node = t[2]
1016
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1017
                del_size = self._apply_versioning(
1018
                    account, container, src_version_id)
1019
                self._report_size_change(user, account, -del_size,
1020
                                         {'action': 'object delete',
1021
                                          'path': path,
1022
                                          'versions': ','.join([str(dest_version_id)])})
1023
                self._report_object_change(
1024
                    user, account, path, details={'action': 'object delete'})
1025
                paths.append(path)
1026
            self.permissions.access_clear_bulk(paths)
1027

    
1028
    @backend_method
1029
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1030
        """Delete/purge an object."""
1031

    
1032
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1033
                     account, container, name, until, prefix, delimiter)
1034
        self._delete_object(user, account, container, name, until, delimiter)
1035

    
1036
    @backend_method
1037
    def list_versions(self, user, account, container, name):
1038
        """Return a list of all (version, version_timestamp) tuples for an object."""
1039

    
1040
        logger.debug(
1041
            "list_versions: %s %s %s %s", user, account, container, name)
1042
        self._can_read(user, account, container, name)
1043
        path, node = self._lookup_object(account, container, name)
1044
        versions = self.node.node_get_versions(node)
1045
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1046

    
1047
    @backend_method
1048
    def get_uuid(self, user, uuid):
1049
        """Return the (account, container, name) for the UUID given."""
1050

    
1051
        logger.debug("get_uuid: %s %s", user, uuid)
1052
        info = self.node.latest_uuid(uuid)
1053
        if info is None:
1054
            raise NameError
1055
        path, serial = info
1056
        account, container, name = path.split('/', 2)
1057
        self._can_read(user, account, container, name)
1058
        return (account, container, name)
1059

    
1060
    @backend_method
1061
    def get_public(self, user, public):
1062
        """Return the (account, container, name) for the public id given."""
1063

    
1064
        logger.debug("get_public: %s %s", user, public)
1065
        if public is None or public < ULTIMATE_ANSWER:
1066
            raise NameError
1067
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1068
        if path is None:
1069
            raise NameError
1070
        account, container, name = path.split('/', 2)
1071
        self._can_read(user, account, container, name)
1072
        return (account, container, name)
1073

    
1074
    @backend_method(autocommit=0)
1075
    def get_block(self, hash):
1076
        """Return a block's data."""
1077

    
1078
        logger.debug("get_block: %s", hash)
1079
        block = self.store.block_get(binascii.unhexlify(hash))
1080
        if not block:
1081
            raise ItemNotExists('Block does not exist')
1082
        return block
1083

    
1084
    @backend_method(autocommit=0)
1085
    def put_block(self, data):
1086
        """Store a block and return the hash."""
1087

    
1088
        logger.debug("put_block: %s", len(data))
1089
        return binascii.hexlify(self.store.block_put(data))
1090

    
1091
    @backend_method(autocommit=0)
1092
    def update_block(self, hash, data, offset=0):
1093
        """Update a known block and return the hash."""
1094

    
1095
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1096
        if offset == 0 and len(data) == self.block_size:
1097
            return self.put_block(data)
1098
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1099
        return binascii.hexlify(h)
1100

    
1101
    # Path functions.
1102

    
1103
    def _generate_uuid(self):
1104
        return str(uuidlib.uuid4())
1105

    
1106
    def _put_object_node(self, path, parent, name):
1107
        path = '/'.join((path, name))
1108
        node = self.node.node_lookup(path)
1109
        if node is None:
1110
            node = self.node.node_create(parent, path)
1111
        return path, node
1112

    
1113
    def _put_path(self, user, parent, path):
1114
        node = self.node.node_create(parent, path)
1115
        self.node.version_create(node, None, 0, '', None, user,
1116
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1117
        return node
1118

    
1119
    def _lookup_account(self, account, create=True):
1120
        node = self.node.node_lookup(account)
1121
        if node is None and create:
1122
            node = self._put_path(
1123
                account, self.ROOTNODE, account)  # User is account.
1124
        return account, node
1125

    
1126
    def _lookup_container(self, account, container):
1127
        path = '/'.join((account, container))
1128
        node = self.node.node_lookup(path)
1129
        if node is None:
1130
            raise ItemNotExists('Container does not exist')
1131
        return path, node
1132

    
1133
    def _lookup_object(self, account, container, name):
1134
        path = '/'.join((account, container, name))
1135
        node = self.node.node_lookup(path)
1136
        if node is None:
1137
            raise ItemNotExists('Object does not exist')
1138
        return path, node
1139

    
1140
    def _lookup_objects(self, paths):
1141
        nodes = self.node.node_lookup_bulk(paths)
1142
        return paths, nodes
1143

    
1144
    def _get_properties(self, node, until=None):
1145
        """Return properties until the timestamp given."""
1146

    
1147
        before = until if until is not None else inf
1148
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1149
        if props is None and until is not None:
1150
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1151
        if props is None:
1152
            raise ItemNotExists('Path does not exist')
1153
        return props
1154

    
1155
    def _get_statistics(self, node, until=None):
1156
        """Return count, sum of size and latest timestamp of everything under node."""
1157

    
1158
        if until is None:
1159
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1160
        else:
1161
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1162
        if stats is None:
1163
            stats = (0, 0, 0)
1164
        return stats
1165

    
1166
    def _get_version(self, node, version=None):
1167
        if version is None:
1168
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1169
            if props is None:
1170
                raise ItemNotExists('Object does not exist')
1171
        else:
1172
            try:
1173
                version = int(version)
1174
            except ValueError:
1175
                raise VersionNotExists('Version does not exist')
1176
            props = self.node.version_get_properties(version)
1177
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1178
                raise VersionNotExists('Version does not exist')
1179
        return props
1180

    
1181
    def _get_versions(self, nodes):
1182
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1183

    
1184
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1185
        """Create a new version of the node."""
1186

    
1187
        props = self.node.version_lookup(
1188
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1189
        if props is not None:
1190
            src_version_id = props[self.SERIAL]
1191
            src_hash = props[self.HASH]
1192
            src_size = props[self.SIZE]
1193
            src_type = props[self.TYPE]
1194
            src_checksum = props[self.CHECKSUM]
1195
        else:
1196
            src_version_id = None
1197
            src_hash = None
1198
            src_size = 0
1199
            src_type = ''
1200
            src_checksum = ''
1201
        if size is None:  # Set metadata.
1202
            hash = src_hash  # This way hash can be set to None (account or container).
1203
            size = src_size
1204
        if type is None:
1205
            type = src_type
1206
        if checksum is None:
1207
            checksum = src_checksum
1208
        uuid = self._generate_uuid(
1209
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1210

    
1211
        if src_node is None:
1212
            pre_version_id = src_version_id
1213
        else:
1214
            pre_version_id = None
1215
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1216
            if props is not None:
1217
                pre_version_id = props[self.SERIAL]
1218
        if pre_version_id is not None:
1219
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1220

    
1221
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1222
        return pre_version_id, dest_version_id
1223

    
1224
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1225
        if src_version_id is not None:
1226
            self.node.attribute_copy(src_version_id, dest_version_id)
1227
        if not replace:
1228
            self.node.attribute_del(dest_version_id, domain, (
1229
                k for k, v in meta.iteritems() if v == ''))
1230
            self.node.attribute_set(dest_version_id, domain, (
1231
                (k, v) for k, v in meta.iteritems() if v != ''))
1232
        else:
1233
            self.node.attribute_del(dest_version_id, domain)
1234
            self.node.attribute_set(dest_version_id, domain, ((
1235
                k, v) for k, v in meta.iteritems()))
1236

    
1237
    def _put_metadata(self, user, node, domain, meta, replace=False):
1238
        """Create a new version and store metadata."""
1239

    
1240
        src_version_id, dest_version_id = self._put_version_duplicate(
1241
            user, node)
1242
        self._put_metadata_duplicate(
1243
            src_version_id, dest_version_id, domain, meta, replace)
1244
        return src_version_id, dest_version_id
1245

    
1246
    def _list_limits(self, listing, marker, limit):
1247
        start = 0
1248
        if marker:
1249
            try:
1250
                start = listing.index(marker) + 1
1251
            except ValueError:
1252
                pass
1253
        if not limit or limit > 10000:
1254
            limit = 10000
1255
        return start, limit
1256

    
1257
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1258
        cont_prefix = path + '/'
1259
        prefix = cont_prefix + prefix
1260
        start = cont_prefix + marker if marker else None
1261
        before = until if until is not None else inf
1262
        filterq = keys if domain else []
1263
        sizeq = size_range
1264

    
1265
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1266
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1267
        objects.sort(key=lambda x: x[0])
1268
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1269
        return objects
1270

    
1271
    # Reporting functions.
1272

    
1273
    def _report_size_change(self, user, account, size, details={}):
1274
        account_node = self._lookup_account(account, True)[1]
1275
        total = self._get_statistics(account_node)[1]
1276
        details.update({'user': user, 'total': total})
1277
        logger.debug(
1278
            "_report_size_change: %s %s %s %s", user, account, size, details)
1279
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1280
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1281
                              float(size), details))
1282

    
1283
        if not self.using_external_quotaholder:
1284
            return
1285
        
1286
        serial = self.quotaholder.issue_commission(
1287
                context     =   {},
1288
                target      =   user.uuid,
1289
                key         =   '1',
1290
                clientkey   =   'pithos',
1291
                ownerkey    =   '',
1292
                        name        =   details['path'] if 'path' in details else '',
1293
                provisions  =   (('pithos+', 'pithos+.diskspace', size),)
1294
        )
1295
        self.serials.append(serial)
1296

    
1297
    def _report_object_change(self, user, account, path, details={}):
1298
        details.update({'user': user})
1299
        logger.debug("_report_object_change: %s %s %s %s", user,
1300
                     account, path, details)
1301
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1302
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1303

    
1304
    def _report_sharing_change(self, user, account, path, details={}):
1305
        logger.debug("_report_permissions_change: %s %s %s %s",
1306
                     user, account, path, details)
1307
        details.update({'user': user})
1308
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1309
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1310

    
1311
    # Policy functions.
1312

    
1313
    def _check_policy(self, policy):
1314
        for k in policy.keys():
1315
            if policy[k] == '':
1316
                policy[k] = self.default_policy.get(k)
1317
        for k, v in policy.iteritems():
1318
            if k == 'quota':
1319
                q = int(v)  # May raise ValueError.
1320
                if q < 0:
1321
                    raise ValueError
1322
            elif k == 'versioning':
1323
                if v not in ['auto', 'none']:
1324
                    raise ValueError
1325
            else:
1326
                raise ValueError
1327

    
1328
    def _put_policy(self, node, policy, replace):
1329
        if replace:
1330
            for k, v in self.default_policy.iteritems():
1331
                if k not in policy:
1332
                    policy[k] = v
1333
        self.node.policy_set(node, policy)
1334

    
1335
    def _get_policy(self, node):
1336
        policy = self.default_policy.copy()
1337
        policy.update(self.node.policy_get(node))
1338
        return policy
1339

    
1340
    def _apply_versioning(self, account, container, version_id):
1341
        """Delete the provided version if such is the policy.
1342
           Return size of object removed.
1343
        """
1344

    
1345
        if version_id is None:
1346
            return 0
1347
        path, node = self._lookup_container(account, container)
1348
        versioning = self._get_policy(node)['versioning']
1349
        if versioning != 'auto':
1350
            hash, size = self.node.version_remove(version_id)
1351
            self.store.map_delete(hash)
1352
            return size
1353
        elif self.free_versioning:
1354
            return self.node.version_get_properties(
1355
                version_id, keys=('size',))[0]
1356
        return 0
1357

    
1358
    # Access control functions.
1359

    
1360
    def _check_groups(self, groups):
1361
        # raise ValueError('Bad characters in groups')
1362
        pass
1363

    
1364
    def _check_permissions(self, path, permissions):
1365
        # raise ValueError('Bad characters in permissions')
1366
        pass
1367

    
1368
    def _get_formatted_paths(self, paths):
1369
        formatted = []
1370
        for p in paths:
1371
            node = self.node.node_lookup(p)
1372
            props = None
1373
            if node is not None:
1374
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1375
            if props is not None:
1376
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1377
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1378
                formatted.append((p, self.MATCH_EXACT))
1379
        return formatted
1380

    
1381
    def _get_permissions_path(self, account, container, name):
1382
        path = '/'.join((account, container, name))
1383
        permission_paths = self.permissions.access_inherit(path)
1384
        permission_paths.sort()
1385
        permission_paths.reverse()
1386
        for p in permission_paths:
1387
            if p == path:
1388
                return p
1389
            else:
1390
                if p.count('/') < 2:
1391
                    continue
1392
                node = self.node.node_lookup(p)
1393
                props = None
1394
                if node is not None:
1395
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1396
                if props is not None:
1397
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1398
                        return p
1399
        return None
1400

    
1401
    def _can_read(self, user, account, container, name):
1402
        if user == account:
1403
            return True
1404
        path = '/'.join((account, container, name))
1405
        if self.permissions.public_get(path) is not None:
1406
            return True
1407
        path = self._get_permissions_path(account, container, name)
1408
        if not path:
1409
            raise NotAllowedError
1410
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1411
            raise NotAllowedError
1412

    
1413
    def _can_write(self, user, account, container, name):
1414
        if user == account:
1415
            return True
1416
        path = '/'.join((account, container, name))
1417
        path = self._get_permissions_path(account, container, name)
1418
        if not path:
1419
            raise NotAllowedError
1420
        if not self.permissions.access_check(path, self.WRITE, user):
1421
            raise NotAllowedError
1422

    
1423
    def _allowed_accounts(self, user):
1424
        allow = set()
1425
        for path in self.permissions.access_list_paths(user):
1426
            allow.add(path.split('/', 1)[0])
1427
        return sorted(allow)
1428

    
1429
    def _allowed_containers(self, user, account):
1430
        allow = set()
1431
        for path in self.permissions.access_list_paths(user, account):
1432
            allow.add(path.split('/', 2)[1])
1433
        return sorted(allow)