Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ ecd01afd

History | View | Annotate | Download (60.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        serials = []
113
        self.serials = serials
114
        self.messages = []
115
        try:
116
            ret = func(self, *args, **kw)
117
            for m in self.messages:
118
                self.queue.send(*m)
119
            if serials:
120
                self.quotaholder.accept_commission(
121
                            context     =   {},
122
                            clientkey   =   'pithos',
123
                            serials     =   serials)
124
            self.wrapper.commit()
125
            return ret
126
        except:
127
            self.quotaholder.reject_commission(
128
                        context     =   {},
129
                        clientkey   =   'pithos',
130
                        serials     =   serials)
131
            self.wrapper.rollback()
132
            raise
133
    return fn
134

    
135

    
136
class ModularBackend(BaseBackend):
137
    """A modular backend.
138

139
    Uses modules for SQL functions and storage.
140
    """
141

    
142
    def __init__(self, db_module=None, db_connection=None,
143
                 block_module=None, block_path=None, block_umask=None,
144
                 queue_module=None, queue_hosts=None,
145
                 queue_exchange=None, quotaholder_url=None):
146
        db_module = db_module or DEFAULT_DB_MODULE
147
        db_connection = db_connection or DEFAULT_DB_CONNECTION
148
        block_module = block_module or DEFAULT_BLOCK_MODULE
149
        block_path = block_path or DEFAULT_BLOCK_PATH
150
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
151
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
152
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
153
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
154
                
155
        self.hash_algorithm = 'sha256'
156
        self.block_size = 4 * 1024 * 1024  # 4MB
157

    
158
        self.default_policy = {'quota': DEFAULT_QUOTA,
159
                               'versioning': DEFAULT_VERSIONING}
160

    
161
        def load_module(m):
162
            __import__(m)
163
            return sys.modules[m]
164

    
165
        self.db_module = load_module(db_module)
166
        self.wrapper = self.db_module.DBWrapper(db_connection)
167
        params = {'wrapper': self.wrapper}
168
        self.permissions = self.db_module.Permissions(**params)
169
        self.config = self.db_module.Config(**params)
170
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
171
        for x in ['READ', 'WRITE']:
172
            setattr(self, x, getattr(self.db_module, x))
173
        self.node = self.db_module.Node(**params)
174
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
175
            setattr(self, x, getattr(self.db_module, x))
176

    
177
        self.block_module = load_module(block_module)
178
        params = {'path': block_path,
179
                  'block_size': self.block_size,
180
                  'hash_algorithm': self.hash_algorithm,
181
                  'umask': block_umask}
182
        self.store = self.block_module.Store(**params)
183

    
184
        if queue_module and queue_hosts:
185
            self.queue_module = load_module(queue_module)
186
            params = {'hosts': queue_hosts,
187
                          'exchange': queue_exchange,
188
                      'client_id': QUEUE_CLIENT_ID}
189
            self.queue = self.queue_module.Queue(**params)
190
        else:
191
            class NoQueue:
192
                def send(self, *args):
193
                    pass
194

    
195
                def close(self):
196
                    pass
197

    
198
            self.queue = NoQueue()
199

    
200
        self.quotaholder_url = quotaholder_url
201
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
202
        self.serials = []
203

    
204
    def close(self):
205
        self.wrapper.close()
206
        self.queue.close()
207

    
208
    @backend_method
209
    def list_accounts(self, user, marker=None, limit=10000):
210
        """Return a list of accounts the user can access."""
211

    
212
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
213
        allowed = self._allowed_accounts(user)
214
        start, limit = self._list_limits(allowed, marker, limit)
215
        return allowed[start:start + limit]
216

    
217
    @backend_method
218
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
219
        """Return a dictionary with the account metadata for the domain."""
220

    
221
        logger.debug(
222
            "get_account_meta: %s %s %s %s", user, account, domain, until)
223
        path, node = self._lookup_account(account, user == account)
224
        if user != account:
225
            if until or node is None or account not in self._allowed_accounts(user):
226
                raise NotAllowedError
227
        try:
228
            props = self._get_properties(node, until)
229
            mtime = props[self.MTIME]
230
        except NameError:
231
            props = None
232
            mtime = until
233
        count, bytes, tstamp = self._get_statistics(node, until)
234
        tstamp = max(tstamp, mtime)
235
        if until is None:
236
            modified = tstamp
237
        else:
238
            modified = self._get_statistics(
239
                node)[2]  # Overall last modification.
240
            modified = max(modified, mtime)
241

    
242
        if user != account:
243
            meta = {'name': account}
244
        else:
245
            meta = {}
246
            if props is not None and include_user_defined:
247
                meta.update(
248
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
249
            if until is not None:
250
                meta.update({'until_timestamp': tstamp})
251
            meta.update({'name': account, 'count': count, 'bytes': bytes})
252
        meta.update({'modified': modified})
253
        return meta
254

    
255
    @backend_method
256
    def update_account_meta(self, user, account, domain, meta, replace=False):
257
        """Update the metadata associated with the account for the domain."""
258

    
259
        logger.debug("update_account_meta: %s %s %s %s %s", user,
260
                     account, domain, meta, replace)
261
        if user != account:
262
            raise NotAllowedError
263
        path, node = self._lookup_account(account, True)
264
        self._put_metadata(user, node, domain, meta, replace)
265

    
266
    @backend_method
267
    def get_account_groups(self, user, account):
268
        """Return a dictionary with the user groups defined for this account."""
269

    
270
        logger.debug("get_account_groups: %s %s", user, account)
271
        if user != account:
272
            if account not in self._allowed_accounts(user):
273
                raise NotAllowedError
274
            return {}
275
        self._lookup_account(account, True)
276
        return self.permissions.group_dict(account)
277

    
278
    @backend_method
279
    def update_account_groups(self, user, account, groups, replace=False):
280
        """Update the groups associated with the account."""
281

    
282
        logger.debug("update_account_groups: %s %s %s %s", user,
283
                     account, groups, replace)
284
        if user != account:
285
            raise NotAllowedError
286
        self._lookup_account(account, True)
287
        self._check_groups(groups)
288
        if replace:
289
            self.permissions.group_destroy(account)
290
        for k, v in groups.iteritems():
291
            if not replace:  # If not already deleted.
292
                self.permissions.group_delete(account, k)
293
            if v:
294
                self.permissions.group_addmany(account, k, v)
295

    
296
    @backend_method
297
    def get_account_policy(self, user, account):
298
        """Return a dictionary with the account policy."""
299

    
300
        logger.debug("get_account_policy: %s %s", user, account)
301
        if user != account:
302
            if account not in self._allowed_accounts(user):
303
                raise NotAllowedError
304
            return {}
305
        path, node = self._lookup_account(account, True)
306
        return self._get_policy(node)
307

    
308
    @backend_method
309
    def update_account_policy(self, user, account, policy, replace=False):
310
        """Update the policy associated with the account."""
311

    
312
        logger.debug("update_account_policy: %s %s %s %s", user,
313
                     account, policy, replace)
314
        if user != account:
315
            raise NotAllowedError
316
        path, node = self._lookup_account(account, True)
317
        self._check_policy(policy)
318
        self._put_policy(node, policy, replace)
319

    
320
    @backend_method
321
    def put_account(self, user, account, policy={}):
322
        """Create a new account with the given name."""
323

    
324
        logger.debug("put_account: %s %s %s", user, account, policy)
325
        if user != account:
326
            raise NotAllowedError
327
        node = self.node.node_lookup(account)
328
        if node is not None:
329
            raise AccountExists('Account already exists')
330
        if policy:
331
            self._check_policy(policy)
332
        node = self._put_path(user, self.ROOTNODE, account)
333
        self._put_policy(node, policy, True)
334

    
335
    @backend_method
336
    def delete_account(self, user, account):
337
        """Delete the account with the given name."""
338

    
339
        logger.debug("delete_account: %s %s", user, account)
340
        if user != account:
341
            raise NotAllowedError
342
        node = self.node.node_lookup(account)
343
        if node is None:
344
            return
345
        if not self.node.node_remove(node):
346
            raise AccountNotEmpty('Account is not empty')
347
        self.permissions.group_destroy(account)
348

    
349
    @backend_method
350
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
351
        """Return a list of containers existing under an account."""
352

    
353
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
354
                     account, marker, limit, shared, until, public)
355
        if user != account:
356
            if until or account not in self._allowed_accounts(user):
357
                raise NotAllowedError
358
            allowed = self._allowed_containers(user, account)
359
            start, limit = self._list_limits(allowed, marker, limit)
360
            return allowed[start:start + limit]
361
        if shared or public:
362
            allowed = set()
363
            if shared:
364
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
365
            if public:
366
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
367
            allowed = sorted(allowed)
368
            start, limit = self._list_limits(allowed, marker, limit)
369
            return allowed[start:start + limit]
370
        node = self.node.node_lookup(account)
371
        containers = [x[0] for x in self._list_object_properties(
372
            node, account, '', '/', marker, limit, False, None, [], until)]
373
        start, limit = self._list_limits(
374
            [x[0] for x in containers], marker, limit)
375
        return containers[start:start + limit]
376

    
377
    @backend_method
378
    def list_container_meta(self, user, account, container, domain, until=None):
379
        """Return a list with all the container's object meta keys for the domain."""
380

    
381
        logger.debug("list_container_meta: %s %s %s %s %s", user,
382
                     account, container, domain, until)
383
        allowed = []
384
        if user != account:
385
            if until:
386
                raise NotAllowedError
387
            allowed = self.permissions.access_list_paths(
388
                user, '/'.join((account, container)))
389
            if not allowed:
390
                raise NotAllowedError
391
        path, node = self._lookup_container(account, container)
392
        before = until if until is not None else inf
393
        allowed = self._get_formatted_paths(allowed)
394
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
395

    
396
    @backend_method
397
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
398
        """Return a dictionary with the container metadata for the domain."""
399

    
400
        logger.debug("get_container_meta: %s %s %s %s %s", user,
401
                     account, container, domain, until)
402
        if user != account:
403
            if until or container not in self._allowed_containers(user, account):
404
                raise NotAllowedError
405
        path, node = self._lookup_container(account, container)
406
        props = self._get_properties(node, until)
407
        mtime = props[self.MTIME]
408
        count, bytes, tstamp = self._get_statistics(node, until)
409
        tstamp = max(tstamp, mtime)
410
        if until is None:
411
            modified = tstamp
412
        else:
413
            modified = self._get_statistics(
414
                node)[2]  # Overall last modification.
415
            modified = max(modified, mtime)
416

    
417
        if user != account:
418
            meta = {'name': container}
419
        else:
420
            meta = {}
421
            if include_user_defined:
422
                meta.update(
423
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
424
            if until is not None:
425
                meta.update({'until_timestamp': tstamp})
426
            meta.update({'name': container, 'count': count, 'bytes': bytes})
427
        meta.update({'modified': modified})
428
        return meta
429

    
430
    @backend_method
431
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
432
        """Update the metadata associated with the container for the domain."""
433

    
434
        logger.debug("update_container_meta: %s %s %s %s %s %s",
435
                     user, account, container, domain, meta, replace)
436
        if user != account:
437
            raise NotAllowedError
438
        path, node = self._lookup_container(account, container)
439
        src_version_id, dest_version_id = self._put_metadata(
440
            user, node, domain, meta, replace)
441
        if src_version_id is not None:
442
            versioning = self._get_policy(node)['versioning']
443
            if versioning != 'auto':
444
                self.node.version_remove(src_version_id)
445

    
446
    @backend_method
447
    def get_container_policy(self, user, account, container):
448
        """Return a dictionary with the container policy."""
449

    
450
        logger.debug(
451
            "get_container_policy: %s %s %s", user, account, container)
452
        if user != account:
453
            if container not in self._allowed_containers(user, account):
454
                raise NotAllowedError
455
            return {}
456
        path, node = self._lookup_container(account, container)
457
        return self._get_policy(node)
458

    
459
    @backend_method
460
    def update_container_policy(self, user, account, container, policy, replace=False):
461
        """Update the policy associated with the container."""
462

    
463
        logger.debug("update_container_policy: %s %s %s %s %s",
464
                     user, account, container, policy, replace)
465
        if user != account:
466
            raise NotAllowedError
467
        path, node = self._lookup_container(account, container)
468
        self._check_policy(policy)
469
        self._put_policy(node, policy, replace)
470

    
471
    @backend_method
472
    def put_container(self, user, account, container, policy={}):
473
        """Create a new container with the given name."""
474

    
475
        logger.debug(
476
            "put_container: %s %s %s %s", user, account, container, policy)
477
        if user != account:
478
            raise NotAllowedError
479
        try:
480
            path, node = self._lookup_container(account, container)
481
        except NameError:
482
            pass
483
        else:
484
            raise ContainerExists('Container already exists')
485
        if policy:
486
            self._check_policy(policy)
487
        path = '/'.join((account, container))
488
        node = self._put_path(
489
            user, self._lookup_account(account, True)[1], path)
490
        self._put_policy(node, policy, True)
491

    
492
    @backend_method
493
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
494
        """Delete/purge the container with the given name."""
495

    
496
        logger.debug("delete_container: %s %s %s %s %s %s", user,
497
                     account, container, until, prefix, delimiter)
498
        if user != account:
499
            raise NotAllowedError
500
        path, node = self._lookup_container(account, container)
501

    
502
        if until is not None:
503
            hashes, size, serials = self.node.node_purge_children(
504
                node, until, CLUSTER_HISTORY)
505
            for h in hashes:
506
                self.store.map_delete(h)
507
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
508
            self._report_size_change(user, account, -size,
509
                                     {'action':'container purge', 'path': path,
510
                                      'versions': serials})
511
            return
512

    
513
        if not delimiter:
514
            if self._get_statistics(node)[0] > 0:
515
                raise ContainerNotEmpty('Container is not empty')
516
            hashes, size, serials = self.node.node_purge_children(
517
                node, inf, CLUSTER_HISTORY)
518
            for h in hashes:
519
                self.store.map_delete(h)
520
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
521
            self.node.node_remove(node)
522
            self._report_size_change(user, account, -size,
523
                                     {'action': 'container delete',
524
                                      'path': path,
525
                                      'versions': serials})
526
        else:
527
                # remove only contents
528
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
529
            paths = []
530
            for t in src_names:
531
                path = '/'.join((account, container, t[0]))
532
                node = t[2]
533
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
534
                del_size = self._apply_versioning(
535
                    account, container, src_version_id)
536
                if del_size:
537
                    self._report_size_change(user, account, -del_size,
538
                                             {'action': 'object delete',
539
                                              'path': path, 'versions': [dest_version_id]})
540
                self._report_object_change(
541
                    user, account, path, details={'action': 'object delete'})
542
                paths.append(path)
543
            self.permissions.access_clear_bulk(paths)
544

    
545
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
546
        if user != account and until:
547
            raise NotAllowedError
548
        if shared and public:
549
            # get shared first
550
            shared = self._list_object_permissions(
551
                user, account, container, prefix, shared=True, public=False)
552
            objects = set()
553
            if shared:
554
                path, node = self._lookup_container(account, container)
555
                shared = self._get_formatted_paths(shared)
556
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
557

    
558
            # get public
559
            objects |= set(self._list_public_object_properties(
560
                user, account, container, prefix, all_props))
561
            objects = list(objects)
562

    
563
            objects.sort(key=lambda x: x[0])
564
            start, limit = self._list_limits(
565
                [x[0] for x in objects], marker, limit)
566
            return objects[start:start + limit]
567
        elif public:
568
            objects = self._list_public_object_properties(
569
                user, account, container, prefix, all_props)
570
            start, limit = self._list_limits(
571
                [x[0] for x in objects], marker, limit)
572
            return objects[start:start + limit]
573

    
574
        allowed = self._list_object_permissions(
575
            user, account, container, prefix, shared, public)
576
        if shared and not allowed:
577
            return []
578
        path, node = self._lookup_container(account, container)
579
        allowed = self._get_formatted_paths(allowed)
580
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
581
        start, limit = self._list_limits(
582
            [x[0] for x in objects], marker, limit)
583
        return objects[start:start + limit]
584

    
585
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
586
        public = self._list_object_permissions(
587
            user, account, container, prefix, shared=False, public=True)
588
        paths, nodes = self._lookup_objects(public)
589
        path = '/'.join((account, container))
590
        cont_prefix = path + '/'
591
        paths = [x[len(cont_prefix):] for x in paths]
592
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
593
        objects = [(path,) + props for path, props in zip(paths, props)]
594
        return objects
595

    
596
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
597
        objects = []
598
        while True:
599
            marker = objects[-1] if objects else None
600
            limit = 10000
601
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
602
            objects.extend(l)
603
            if not l or len(l) < limit:
604
                break
605
        return objects
606

    
607
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
608
        allowed = []
609
        path = '/'.join((account, container, prefix)).rstrip('/')
610
        if user != account:
611
            allowed = self.permissions.access_list_paths(user, path)
612
            if not allowed:
613
                raise NotAllowedError
614
        else:
615
            allowed = set()
616
            if shared:
617
                allowed.update(self.permissions.access_list_shared(path))
618
            if public:
619
                allowed.update(
620
                    [x[0] for x in self.permissions.public_list(path)])
621
            allowed = sorted(allowed)
622
            if not allowed:
623
                return []
624
        return allowed
625

    
626
    @backend_method
627
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
628
        """Return a list of object (name, version_id) tuples existing under a container."""
629

    
630
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
631
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
632

    
633
    @backend_method
634
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
635
        """Return a list of object metadata dicts existing under a container."""
636

    
637
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
638
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
639
        objects = []
640
        for p in props:
641
            if len(p) == 2:
642
                objects.append({'subdir': p[0]})
643
            else:
644
                objects.append({'name': p[0],
645
                                'bytes': p[self.SIZE + 1],
646
                                'type': p[self.TYPE + 1],
647
                                'hash': p[self.HASH + 1],
648
                                'version': p[self.SERIAL + 1],
649
                                'version_timestamp': p[self.MTIME + 1],
650
                                'modified': p[self.MTIME + 1] if until is None else None,
651
                                'modified_by': p[self.MUSER + 1],
652
                                'uuid': p[self.UUID + 1],
653
                                'checksum': p[self.CHECKSUM + 1]})
654
        return objects
655

    
656
    @backend_method
657
    def list_object_permissions(self, user, account, container, prefix=''):
658
        """Return a list of paths that enforce permissions under a container."""
659

    
660
        logger.debug("list_object_permissions: %s %s %s %s", user,
661
                     account, container, prefix)
662
        return self._list_object_permissions(user, account, container, prefix, True, False)
663

    
664
    @backend_method
665
    def list_object_public(self, user, account, container, prefix=''):
666
        """Return a dict mapping paths to public ids for objects that are public under a container."""
667

    
668
        logger.debug("list_object_public: %s %s %s %s", user,
669
                     account, container, prefix)
670
        public = {}
671
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
672
            public[path] = p + ULTIMATE_ANSWER
673
        return public
674

    
675
    @backend_method
676
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
677
        """Return a dictionary with the object metadata for the domain."""
678

    
679
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
680
                     account, container, name, domain, version)
681
        self._can_read(user, account, container, name)
682
        path, node = self._lookup_object(account, container, name)
683
        props = self._get_version(node, version)
684
        if version is None:
685
            modified = props[self.MTIME]
686
        else:
687
            try:
688
                modified = self._get_version(
689
                    node)[self.MTIME]  # Overall last modification.
690
            except NameError:  # Object may be deleted.
691
                del_props = self.node.version_lookup(
692
                    node, inf, CLUSTER_DELETED)
693
                if del_props is None:
694
                    raise ItemNotExists('Object does not exist')
695
                modified = del_props[self.MTIME]
696

    
697
        meta = {}
698
        if include_user_defined:
699
            meta.update(
700
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
701
        meta.update({'name': name,
702
                     'bytes': props[self.SIZE],
703
                     'type': props[self.TYPE],
704
                     'hash': props[self.HASH],
705
                     'version': props[self.SERIAL],
706
                     'version_timestamp': props[self.MTIME],
707
                     'modified': modified,
708
                     'modified_by': props[self.MUSER],
709
                     'uuid': props[self.UUID],
710
                     'checksum': props[self.CHECKSUM]})
711
        return meta
712

    
713
    @backend_method
714
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
715
        """Update the metadata associated with the object for the domain and return the new version."""
716

    
717
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
718
                     user, account, container, name, domain, meta, replace)
719
        self._can_write(user, account, container, name)
720
        path, node = self._lookup_object(account, container, name)
721
        src_version_id, dest_version_id = self._put_metadata(
722
            user, node, domain, meta, replace)
723
        self._apply_versioning(account, container, src_version_id)
724
        return dest_version_id
725

    
726
    @backend_method
727
    def get_object_permissions(self, user, account, container, name):
728
        """Return the action allowed on the object, the path
729
        from which the object gets its permissions from,
730
        along with a dictionary containing the permissions."""
731

    
732
        logger.debug("get_object_permissions: %s %s %s %s", user,
733
                     account, container, name)
734
        allowed = 'write'
735
        permissions_path = self._get_permissions_path(account, container, name)
736
        if user != account:
737
            if self.permissions.access_check(permissions_path, self.WRITE, user):
738
                allowed = 'write'
739
            elif self.permissions.access_check(permissions_path, self.READ, user):
740
                allowed = 'read'
741
            else:
742
                raise NotAllowedError
743
        self._lookup_object(account, container, name)
744
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
745

    
746
    @backend_method
747
    def update_object_permissions(self, user, account, container, name, permissions):
748
        """Update the permissions associated with the object."""
749

    
750
        logger.debug("update_object_permissions: %s %s %s %s %s",
751
                     user, account, container, name, permissions)
752
        if user != account:
753
            raise NotAllowedError
754
        path = self._lookup_object(account, container, name)[0]
755
        self._check_permissions(path, permissions)
756
        self.permissions.access_set(path, permissions)
757
        self._report_sharing_change(user, account, path, {'members':
758
                                    self.permissions.access_members(path)})
759

    
760
    @backend_method
761
    def get_object_public(self, user, account, container, name):
762
        """Return the public id of the object if applicable."""
763

    
764
        logger.debug(
765
            "get_object_public: %s %s %s %s", user, account, container, name)
766
        self._can_read(user, account, container, name)
767
        path = self._lookup_object(account, container, name)[0]
768
        p = self.permissions.public_get(path)
769
        if p is not None:
770
            p += ULTIMATE_ANSWER
771
        return p
772

    
773
    @backend_method
774
    def update_object_public(self, user, account, container, name, public):
775
        """Update the public status of the object."""
776

    
777
        logger.debug("update_object_public: %s %s %s %s %s", user,
778
                     account, container, name, public)
779
        self._can_write(user, account, container, name)
780
        path = self._lookup_object(account, container, name)[0]
781
        if not public:
782
            self.permissions.public_unset(path)
783
        else:
784
            self.permissions.public_set(path)
785

    
786
    @backend_method
787
    def get_object_hashmap(self, user, account, container, name, version=None):
788
        """Return the object's size and a list with partial hashes."""
789

    
790
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
791
                     account, container, name, version)
792
        self._can_read(user, account, container, name)
793
        path, node = self._lookup_object(account, container, name)
794
        props = self._get_version(node, version)
795
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
796
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
797

    
798
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
799
        if permissions is not None and user != account:
800
            raise NotAllowedError
801
        self._can_write(user, account, container, name)
802
        if permissions is not None:
803
            path = '/'.join((account, container, name))
804
            self._check_permissions(path, permissions)
805

    
806
        account_path, account_node = self._lookup_account(account, True)
807
        container_path, container_node = self._lookup_container(
808
            account, container)
809
        path, node = self._put_object_node(
810
            container_path, container_node, name)
811
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
812

    
813
        # Handle meta.
814
        if src_version_id is None:
815
            src_version_id = pre_version_id
816
        self._put_metadata_duplicate(
817
            src_version_id, dest_version_id, domain, meta, replace_meta)
818

    
819
        # Check quota.
820
        del_size = self._apply_versioning(account, container, pre_version_id)
821
        size_delta = size - del_size
822
        if size_delta > 0:
823
            account_quota = long(self._get_policy(account_node)['quota'])
824
            container_quota = long(self._get_policy(container_node)['quota'])
825
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
826
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
827
                # This must be executed in a transaction, so the version is never created if it fails.
828
                raise QuotaError
829
        self._report_size_change(user, account, size_delta,
830
                                 {'action': 'object update', 'path': path,
831
                                  'versions': [dest_version_id]})
832

    
833
        if permissions is not None:
834
            self.permissions.access_set(path, permissions)
835
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
836

    
837
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
838
        return dest_version_id
839

    
840
    @backend_method
841
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
842
        """Create/update an object with the specified size and partial hashes."""
843

    
844
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
845
                     account, container, name, size, type, hashmap, checksum)
846
        if size == 0:  # No such thing as an empty hashmap.
847
            hashmap = [self.put_block('')]
848
        map = HashMap(self.block_size, self.hash_algorithm)
849
        map.extend([binascii.unhexlify(x) for x in hashmap])
850
        missing = self.store.block_search(map)
851
        if missing:
852
            ie = IndexError()
853
            ie.data = [binascii.hexlify(x) for x in missing]
854
            raise ie
855

    
856
        hash = map.hash()
857
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
858
        self.store.map_put(hash, map)
859
        return dest_version_id
860

    
861
    @backend_method
862
    def update_object_checksum(self, user, account, container, name, version, checksum):
863
        """Update an object's checksum."""
864

    
865
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
866
                     user, account, container, name, version, checksum)
867
        # Update objects with greater version and same hashmap and size (fix metadata updates).
868
        self._can_write(user, account, container, name)
869
        path, node = self._lookup_object(account, container, name)
870
        props = self._get_version(node, version)
871
        versions = self.node.node_get_versions(node)
872
        for x in versions:
873
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
874
                self.node.version_put_property(
875
                    x[self.SERIAL], 'checksum', checksum)
876

    
877
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
878
        dest_version_ids = []
879
        self._can_read(user, src_account, src_container, src_name)
880
        path, node = self._lookup_object(src_account, src_container, src_name)
881
        # TODO: Will do another fetch of the properties in duplicate version...
882
        props = self._get_version(
883
            node, src_version)  # Check to see if source exists.
884
        src_version_id = props[self.SERIAL]
885
        hash = props[self.HASH]
886
        size = props[self.SIZE]
887
        is_copy = not is_move and (src_account, src_container, src_name) != (
888
            dest_account, dest_container, dest_name)  # New uuid.
889
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
890
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
891
            self._delete_object(user, src_account, src_container, src_name)
892

    
893
        if delimiter:
894
            prefix = src_name + \
895
                delimiter if not src_name.endswith(delimiter) else src_name
896
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
897
            src_names.sort(key=lambda x: x[2])  # order by nodes
898
            paths = [elem[0] for elem in src_names]
899
            nodes = [elem[2] for elem in src_names]
900
            # TODO: Will do another fetch of the properties in duplicate version...
901
            props = self._get_versions(nodes)  # Check to see if source exists.
902

    
903
            for prop, path, node in zip(props, paths, nodes):
904
                src_version_id = prop[self.SERIAL]
905
                hash = prop[self.HASH]
906
                vtype = prop[self.TYPE]
907
                size = prop[self.SIZE]
908
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
909
                    delimiter) else dest_name
910
                vdest_name = path.replace(prefix, dest_prefix, 1)
911
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
912
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
913
                    self._delete_object(user, src_account, src_container, path)
914
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
915

    
916
    @backend_method
917
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
918
        """Copy an object's data and metadata."""
919

    
920
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
921
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
922
        return dest_version_id
923

    
924
    @backend_method
925
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
926
        """Move an object's data and metadata."""
927

    
928
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
929
        if user != src_account:
930
            raise NotAllowedError
931
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
932
        return dest_version_id
933

    
934
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
935
        if user != account:
936
            raise NotAllowedError
937

    
938
        if until is not None:
939
            path = '/'.join((account, container, name))
940
            node = self.node.node_lookup(path)
941
            if node is None:
942
                return
943
            hashes = []
944
            size = 0
945
            serials = []
946
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
947
            hashes += h
948
            size += s
949
            serials += v
950
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
951
            hashes += h
952
            size += s
953
            serials += v
954
            for h in hashes:
955
                self.store.map_delete(h)
956
            self.node.node_purge(node, until, CLUSTER_DELETED)
957
            try:
958
                props = self._get_version(node)
959
            except NameError:
960
                self.permissions.access_clear(path)
961
            self._report_size_change(user, account, -size,
962
                                    {'action': 'object purge', 'path': path,
963
                                     'versions': serials})
964
            return
965

    
966
        path, node = self._lookup_object(account, container, name)
967
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
968
        del_size = self._apply_versioning(account, container, src_version_id)
969
        if del_size:
970
            self._report_size_change(user, account, -del_size,
971
                                     {'action': 'object delete', 'path': path,
972
                                      'versions': [dest_version_id]})
973
        self._report_object_change(
974
            user, account, path, details={'action': 'object delete'})
975
        self.permissions.access_clear(path)
976

    
977
        if delimiter:
978
            prefix = name + delimiter if not name.endswith(delimiter) else name
979
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
980
            paths = []
981
            for t in src_names:
982
                path = '/'.join((account, container, t[0]))
983
                node = t[2]
984
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
985
                del_size = self._apply_versioning(
986
                    account, container, src_version_id)
987
                if del_size:
988
                    self._report_size_change(user, account, -del_size,
989
                                             {'action': 'object delete',
990
                                              'path': path,
991
                                              'versions': [dest_version_id]})
992
                self._report_object_change(
993
                    user, account, path, details={'action': 'object delete'})
994
                paths.append(path)
995
            self.permissions.access_clear_bulk(paths)
996

    
997
    @backend_method
998
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
999
        """Delete/purge an object."""
1000

    
1001
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1002
                     account, container, name, until, prefix, delimiter)
1003
        self._delete_object(user, account, container, name, until, delimiter)
1004

    
1005
    @backend_method
1006
    def list_versions(self, user, account, container, name):
1007
        """Return a list of all (version, version_timestamp) tuples for an object."""
1008

    
1009
        logger.debug(
1010
            "list_versions: %s %s %s %s", user, account, container, name)
1011
        self._can_read(user, account, container, name)
1012
        path, node = self._lookup_object(account, container, name)
1013
        versions = self.node.node_get_versions(node)
1014
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1015

    
1016
    @backend_method
1017
    def get_uuid(self, user, uuid):
1018
        """Return the (account, container, name) for the UUID given."""
1019

    
1020
        logger.debug("get_uuid: %s %s", user, uuid)
1021
        info = self.node.latest_uuid(uuid)
1022
        if info is None:
1023
            raise NameError
1024
        path, serial = info
1025
        account, container, name = path.split('/', 2)
1026
        self._can_read(user, account, container, name)
1027
        return (account, container, name)
1028

    
1029
    @backend_method
1030
    def get_public(self, user, public):
1031
        """Return the (account, container, name) for the public id given."""
1032

    
1033
        logger.debug("get_public: %s %s", user, public)
1034
        if public is None or public < ULTIMATE_ANSWER:
1035
            raise NameError
1036
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1037
        if path is None:
1038
            raise NameError
1039
        account, container, name = path.split('/', 2)
1040
        self._can_read(user, account, container, name)
1041
        return (account, container, name)
1042

    
1043
    @backend_method(autocommit=0)
1044
    def get_block(self, hash):
1045
        """Return a block's data."""
1046

    
1047
        logger.debug("get_block: %s", hash)
1048
        block = self.store.block_get(binascii.unhexlify(hash))
1049
        if not block:
1050
            raise ItemNotExists('Block does not exist')
1051
        return block
1052

    
1053
    @backend_method(autocommit=0)
1054
    def put_block(self, data):
1055
        """Store a block and return the hash."""
1056

    
1057
        logger.debug("put_block: %s", len(data))
1058
        return binascii.hexlify(self.store.block_put(data))
1059

    
1060
    @backend_method(autocommit=0)
1061
    def update_block(self, hash, data, offset=0):
1062
        """Update a known block and return the hash."""
1063

    
1064
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1065
        if offset == 0 and len(data) == self.block_size:
1066
            return self.put_block(data)
1067
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1068
        return binascii.hexlify(h)
1069

    
1070
    # Path functions.
1071

    
1072
    def _generate_uuid(self):
1073
        return str(uuidlib.uuid4())
1074

    
1075
    def _put_object_node(self, path, parent, name):
1076
        path = '/'.join((path, name))
1077
        node = self.node.node_lookup(path)
1078
        if node is None:
1079
            node = self.node.node_create(parent, path)
1080
        return path, node
1081

    
1082
    def _put_path(self, user, parent, path):
1083
        node = self.node.node_create(parent, path)
1084
        self.node.version_create(node, None, 0, '', None, user,
1085
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1086
        return node
1087

    
1088
    def _lookup_account(self, account, create=True):
1089
        node = self.node.node_lookup(account)
1090
        if node is None and create:
1091
            node = self._put_path(
1092
                account, self.ROOTNODE, account)  # User is account.
1093
        return account, node
1094

    
1095
    def _lookup_container(self, account, container):
1096
        path = '/'.join((account, container))
1097
        node = self.node.node_lookup(path)
1098
        if node is None:
1099
            raise ItemNotExists('Container does not exist')
1100
        return path, node
1101

    
1102
    def _lookup_object(self, account, container, name):
1103
        path = '/'.join((account, container, name))
1104
        node = self.node.node_lookup(path)
1105
        if node is None:
1106
            raise ItemNotExists('Object does not exist')
1107
        return path, node
1108

    
1109
    def _lookup_objects(self, paths):
1110
        nodes = self.node.node_lookup_bulk(paths)
1111
        return paths, nodes
1112

    
1113
    def _get_properties(self, node, until=None):
1114
        """Return properties until the timestamp given."""
1115

    
1116
        before = until if until is not None else inf
1117
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1118
        if props is None and until is not None:
1119
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1120
        if props is None:
1121
            raise ItemNotExists('Path does not exist')
1122
        return props
1123

    
1124
    def _get_statistics(self, node, until=None):
1125
        """Return count, sum of size and latest timestamp of everything under node."""
1126

    
1127
        if until is None:
1128
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1129
        else:
1130
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1131
        if stats is None:
1132
            stats = (0, 0, 0)
1133
        return stats
1134

    
1135
    def _get_version(self, node, version=None):
1136
        if version is None:
1137
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1138
            if props is None:
1139
                raise ItemNotExists('Object does not exist')
1140
        else:
1141
            try:
1142
                version = int(version)
1143
            except ValueError:
1144
                raise VersionNotExists('Version does not exist')
1145
            props = self.node.version_get_properties(version)
1146
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1147
                raise VersionNotExists('Version does not exist')
1148
        return props
1149

    
1150
    def _get_versions(self, nodes):
1151
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1152

    
1153
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1154
        """Create a new version of the node."""
1155

    
1156
        props = self.node.version_lookup(
1157
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1158
        if props is not None:
1159
            src_version_id = props[self.SERIAL]
1160
            src_hash = props[self.HASH]
1161
            src_size = props[self.SIZE]
1162
            src_type = props[self.TYPE]
1163
            src_checksum = props[self.CHECKSUM]
1164
        else:
1165
            src_version_id = None
1166
            src_hash = None
1167
            src_size = 0
1168
            src_type = ''
1169
            src_checksum = ''
1170
        if size is None:  # Set metadata.
1171
            hash = src_hash  # This way hash can be set to None (account or container).
1172
            size = src_size
1173
        if type is None:
1174
            type = src_type
1175
        if checksum is None:
1176
            checksum = src_checksum
1177
        uuid = self._generate_uuid(
1178
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1179

    
1180
        if src_node is None:
1181
            pre_version_id = src_version_id
1182
        else:
1183
            pre_version_id = None
1184
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1185
            if props is not None:
1186
                pre_version_id = props[self.SERIAL]
1187
        if pre_version_id is not None:
1188
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1189

    
1190
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1191
        return pre_version_id, dest_version_id
1192

    
1193
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1194
        if src_version_id is not None:
1195
            self.node.attribute_copy(src_version_id, dest_version_id)
1196
        if not replace:
1197
            self.node.attribute_del(dest_version_id, domain, (
1198
                k for k, v in meta.iteritems() if v == ''))
1199
            self.node.attribute_set(dest_version_id, domain, (
1200
                (k, v) for k, v in meta.iteritems() if v != ''))
1201
        else:
1202
            self.node.attribute_del(dest_version_id, domain)
1203
            self.node.attribute_set(dest_version_id, domain, ((
1204
                k, v) for k, v in meta.iteritems()))
1205

    
1206
    def _put_metadata(self, user, node, domain, meta, replace=False):
1207
        """Create a new version and store metadata."""
1208

    
1209
        src_version_id, dest_version_id = self._put_version_duplicate(
1210
            user, node)
1211
        self._put_metadata_duplicate(
1212
            src_version_id, dest_version_id, domain, meta, replace)
1213
        return src_version_id, dest_version_id
1214

    
1215
    def _list_limits(self, listing, marker, limit):
1216
        start = 0
1217
        if marker:
1218
            try:
1219
                start = listing.index(marker) + 1
1220
            except ValueError:
1221
                pass
1222
        if not limit or limit > 10000:
1223
            limit = 10000
1224
        return start, limit
1225

    
1226
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1227
        cont_prefix = path + '/'
1228
        prefix = cont_prefix + prefix
1229
        start = cont_prefix + marker if marker else None
1230
        before = until if until is not None else inf
1231
        filterq = keys if domain else []
1232
        sizeq = size_range
1233

    
1234
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1235
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1236
        objects.sort(key=lambda x: x[0])
1237
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1238
        return objects
1239

    
1240
    # Reporting functions.
1241

    
1242
    def _report_size_change(self, user, account, size, details={}):
1243
        account_node = self._lookup_account(account, True)[1]
1244
        total = self._get_statistics(account_node)[1]
1245
        details.update({'user': user, 'total': total})
1246
        logger.debug(
1247
            "_report_size_change: %s %s %s %s", user, account, size, details)
1248
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1249
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1250
                              float(size), details))
1251

    
1252
        serial = self.quotaholder.issue_commission(
1253
                context     =   {},
1254
                target      =   account,
1255
                key         =   '1',
1256
                clientkey   =   'pithos',
1257
                ownerkey    =   '',
1258
                provisions  =   (('pithos+', 'pithos+ : diskspace', size),)
1259
        )
1260
        self.serials.append(serial)
1261

    
1262
    def _report_object_change(self, user, account, path, details={}):
1263
        details.update({'user': user})
1264
        logger.debug("_report_object_change: %s %s %s %s", user,
1265
                     account, path, details)
1266
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1267
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1268

    
1269
    def _report_sharing_change(self, user, account, path, details={}):
1270
        logger.debug("_report_permissions_change: %s %s %s %s",
1271
                     user, account, path, details)
1272
        details.update({'user': user})
1273
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1274
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1275

    
1276
    # Policy functions.
1277

    
1278
    def _check_policy(self, policy):
1279
        for k in policy.keys():
1280
            if policy[k] == '':
1281
                policy[k] = self.default_policy.get(k)
1282
        for k, v in policy.iteritems():
1283
            if k == 'quota':
1284
                q = int(v)  # May raise ValueError.
1285
                if q < 0:
1286
                    raise ValueError
1287
            elif k == 'versioning':
1288
                if v not in ['auto', 'none']:
1289
                    raise ValueError
1290
            else:
1291
                raise ValueError
1292

    
1293
    def _put_policy(self, node, policy, replace):
1294
        if replace:
1295
            for k, v in self.default_policy.iteritems():
1296
                if k not in policy:
1297
                    policy[k] = v
1298
        self.node.policy_set(node, policy)
1299

    
1300
    def _get_policy(self, node):
1301
        policy = self.default_policy.copy()
1302
        policy.update(self.node.policy_get(node))
1303
        return policy
1304

    
1305
    def _apply_versioning(self, account, container, version_id):
1306
        """Delete the provided version if such is the policy.
1307
           Return size of object removed.
1308
        """
1309

    
1310
        if version_id is None:
1311
            return 0
1312
        path, node = self._lookup_container(account, container)
1313
        versioning = self._get_policy(node)['versioning']
1314
        if versioning != 'auto':
1315
            hash, size = self.node.version_remove(version_id)
1316
            self.store.map_delete(hash)
1317
            return size
1318
        return 0
1319

    
1320
    # Access control functions.
1321

    
1322
    def _check_groups(self, groups):
1323
        # raise ValueError('Bad characters in groups')
1324
        pass
1325

    
1326
    def _check_permissions(self, path, permissions):
1327
        # raise ValueError('Bad characters in permissions')
1328
        pass
1329

    
1330
    def _get_formatted_paths(self, paths):
1331
        formatted = []
1332
        for p in paths:
1333
            node = self.node.node_lookup(p)
1334
            if node is not None:
1335
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1336
            if props is not None:
1337
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1338
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1339
                formatted.append((p, self.MATCH_EXACT))
1340
        return formatted
1341

    
1342
    def _get_permissions_path(self, account, container, name):
1343
        path = '/'.join((account, container, name))
1344
        permission_paths = self.permissions.access_inherit(path)
1345
        permission_paths.sort()
1346
        permission_paths.reverse()
1347
        for p in permission_paths:
1348
            if p == path:
1349
                return p
1350
            else:
1351
                if p.count('/') < 2:
1352
                    continue
1353
                node = self.node.node_lookup(p)
1354
                if node is not None:
1355
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1356
                if props is not None:
1357
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1358
                        return p
1359
        return None
1360

    
1361
    def _can_read(self, user, account, container, name):
1362
        if user == account:
1363
            return True
1364
        path = '/'.join((account, container, name))
1365
        if self.permissions.public_get(path) is not None:
1366
            return True
1367
        path = self._get_permissions_path(account, container, name)
1368
        if not path:
1369
            raise NotAllowedError
1370
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1371
            raise NotAllowedError
1372

    
1373
    def _can_write(self, user, account, container, name):
1374
        if user == account:
1375
            return True
1376
        path = '/'.join((account, container, name))
1377
        path = self._get_permissions_path(account, container, name)
1378
        if not path:
1379
            raise NotAllowedError
1380
        if not self.permissions.access_check(path, self.WRITE, user):
1381
            raise NotAllowedError
1382

    
1383
    def _allowed_accounts(self, user):
1384
        allow = set()
1385
        for path in self.permissions.access_list_paths(user):
1386
            allow.add(path.split('/', 1)[0])
1387
        return sorted(allow)
1388

    
1389
    def _allowed_containers(self, user, account):
1390
        allow = set()
1391
        for path in self.permissions.access_list_paths(user, account):
1392
            allow.add(path.split('/', 2)[1])
1393
        return sorted(allow)