Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 57310278

History | View | Annotate | Download (83.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43
from time import time
44

    
45
from pithos.workers import glue
46
from archipelago.common import Segment, Xseg_ctx
47
from objpool import ObjectPool
48

    
49

    
50
try:
51
    from astakosclient import AstakosClient
52
except ImportError:
53
    AstakosClient = None
54

    
55
from pithos.backends.base import (
56
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
57
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
58
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
59
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
60
    InvalidHash, IllegalOperationError)
61

    
62

    
63
class DisabledAstakosClient(object):
64
    def __init__(self, *args, **kwargs):
65
        self.args = args
66
        self.kwargs = kwargs
67

    
68
    def __getattr__(self, name):
69
        m = ("AstakosClient has been disabled, "
70
             "yet an attempt to access it was made")
71
        raise AssertionError(m)
72

    
73

    
74
# Stripped-down version of the HashMap class found in tools.
75

    
76
class HashMap(list):
77

    
78
    def __init__(self, blocksize, blockhash):
79
        super(HashMap, self).__init__()
80
        self.blocksize = blocksize
81
        self.blockhash = blockhash
82

    
83
    def _hash_raw(self, v):
84
        h = hashlib.new(self.blockhash)
85
        h.update(v)
86
        return h.digest()
87

    
88
    def hash(self):
89
        if len(self) == 0:
90
            return self._hash_raw('')
91
        if len(self) == 1:
92
            return self.__getitem__(0)
93

    
94
        h = list(self)
95
        s = 2
96
        while s < len(h):
97
            s = s * 2
98
        h += [('\x00' * len(h[0]))] * (s - len(h))
99
        while len(h) > 1:
100
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
101
        return h[0]
102

    
103
# Default modules and settings.
104
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
105
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
106
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
107
DEFAULT_BLOCK_PATH = 'data/'
108
DEFAULT_BLOCK_UMASK = 0o022
109
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
110
DEFAULT_HASH_ALGORITHM = 'sha256'
111
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
112
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
113
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
114
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
115
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
116
                               'abcdefghijklmnopqrstuvwxyz'
117
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
118
DEFAULT_PUBLIC_URL_SECURITY = 16
119
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
120

    
121
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
122
QUEUE_CLIENT_ID = 'pithos'
123
QUEUE_INSTANCE_ID = '1'
124

    
125
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
126

    
127
inf = float('inf')
128

    
129
ULTIMATE_ANSWER = 42
130

    
131
DEFAULT_SOURCE = 'system'
132
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
133

    
134
DEFAULT_MAP_CHECK_INTERVAL = 5  # set to 5 secs
135

    
136
logger = logging.getLogger(__name__)
137

    
138

    
139
def backend_method(func):
140
    @wraps(func)
141
    def wrapper(self, *args, **kw):
142
        # if we are inside a database transaction
143
        # just proceed with the method execution
144
        # otherwise manage a new transaction
145
        if self.in_transaction:
146
            return func(self, *args, **kw)
147

    
148
        try:
149
            self.pre_exec()
150
            result = func(self, *args, **kw)
151
            success_status = True
152
            return result
153
        except:
154
            success_status = False
155
            raise
156
        finally:
157
            self.post_exec(success_status)
158
    return wrapper
159

    
160

    
161
def debug_method(func):
162
    @wraps(func)
163
    def wrapper(self, *args, **kw):
164
        try:
165
            result = func(self, *args, **kw)
166
            return result
167
        except:
168
            result = format_exc()
169
            raise
170
        finally:
171
            all_args = map(repr, args)
172
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
173
            logger.debug(">>> %s(%s) <<< %s" % (
174
                func.__name__, ', '.join(all_args).rstrip(', '), result))
175
    return wrapper
176

    
177

    
178
def check_allowed_paths(action):
179
    """Decorator for backend methods checking path access granted to user.
180

181
    The 1st argument of the decorated method is expected to be a
182
    ModularBackend instance, the 2nd the user performing the request and
183
    the path join of the rest arguments is supposed to be the requested path.
184

185
    The decorator checks whether the requested path is among the user's allowed
186
    cached paths.
187
    If this is the case, the decorator returns immediately to reduce the
188
    interactions with the database.
189
    Otherwise, it proceeds with the execution of the decorated method and if
190
    the method returns successfully (no exceptions are raised), the requested
191
    path is added to the user's cached allowed paths.
192

193
    :param action: (int) 0 for reads / 1 for writes
194
    :raises NotAllowedError: the user does not have access to the path
195
    """
196
    def decorator(func):
197
        @wraps(func)
198
        def wrapper(self, *args):
199
            user = args[0]
200
            if action == self.READ:
201
                d = self.read_allowed_paths
202
            else:
203
                d = self.write_allowed_paths
204
            path = '/'.join(args[1:])
205
            if path in d.get(user, []):
206
                return  # access is already checked
207
            else:
208
                func(self, *args)   # proceed with access check
209
                d[user].add(path)  # add path in the allowed user paths
210
        return wrapper
211
    return decorator
212

    
213

    
214
def list_method(func):
215
    @wraps(func)
216
    def wrapper(self, *args, **kw):
217
        marker = kw.get('marker')
218
        limit = kw.get('limit')
219
        result = func(self, *args, **kw)
220
        start, limit = self._list_limits(result, marker, limit)
221
        return result[start:start + limit]
222
    return wrapper
223

    
224

    
225
class ModularBackend(BaseBackend):
226
    """A modular backend.
227

228
    Uses modules for SQL functions and storage.
229
    """
230

    
231
    def __init__(self, db_module=None, db_connection=None,
232
                 block_module=None, block_path=None, block_umask=None,
233
                 block_size=None, hash_algorithm=None,
234
                 queue_module=None, queue_hosts=None, queue_exchange=None,
235
                 astakos_auth_url=None, service_token=None,
236
                 astakosclient_poolsize=None,
237
                 free_versioning=True, block_params=None,
238
                 public_url_security=None,
239
                 public_url_alphabet=None,
240
                 account_quota_policy=None,
241
                 container_quota_policy=None,
242
                 container_versioning_policy=None,
243
                 archipelago_conf_file=None,
244
                 xseg_pool_size=8,
245
                 map_check_interval=None):
246
        db_module = db_module or DEFAULT_DB_MODULE
247
        db_connection = db_connection or DEFAULT_DB_CONNECTION
248
        block_module = block_module or DEFAULT_BLOCK_MODULE
249
        block_path = block_path or DEFAULT_BLOCK_PATH
250
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
251
        block_params = block_params or DEFAULT_BLOCK_PARAMS
252
        block_size = block_size or DEFAULT_BLOCK_SIZE
253
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
254
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
255
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
256
        container_quota_policy = container_quota_policy \
257
            or DEFAULT_CONTAINER_QUOTA
258
        container_versioning_policy = container_versioning_policy \
259
            or DEFAULT_CONTAINER_VERSIONING
260
        archipelago_conf_file = archipelago_conf_file \
261
            or DEFAULT_ARCHIPELAGO_CONF_FILE
262
        map_check_interval = map_check_interval \
263
            or DEFAULT_MAP_CHECK_INTERVAL
264

    
265
        self.default_account_policy = {'quota': account_quota_policy}
266
        self.default_container_policy = {
267
            'quota': container_quota_policy,
268
            'versioning': container_versioning_policy
269
        }
270
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
271
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
272

    
273
        self.public_url_security = (public_url_security or
274
                                    DEFAULT_PUBLIC_URL_SECURITY)
275
        self.public_url_alphabet = (public_url_alphabet or
276
                                    DEFAULT_PUBLIC_URL_ALPHABET)
277

    
278
        self.hash_algorithm = hash_algorithm
279
        self.block_size = block_size
280
        self.free_versioning = free_versioning
281
        self.map_check_interval = map_check_interval
282

    
283
        def load_module(m):
284
            __import__(m)
285
            return sys.modules[m]
286

    
287
        self.db_module = load_module(db_module)
288
        self.wrapper = self.db_module.DBWrapper(db_connection)
289
        params = {'wrapper': self.wrapper}
290
        self.permissions = self.db_module.Permissions(**params)
291
        self.config = self.db_module.Config(**params)
292
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
293
        for x in ['READ', 'WRITE']:
294
            setattr(self, x, getattr(self.db_module, x))
295
        self.node = self.db_module.Node(**params)
296
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
297
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
298
                  'MATCH_PREFIX', 'MATCH_EXACT',
299
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
300
            setattr(self, x, getattr(self.db_module, x))
301

    
302
        self.ALLOWED = ['read', 'write']
303

    
304
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
305
                                      cfile=archipelago_conf_file,
306
                                      pool_size=xseg_pool_size)
307
        self.block_module = load_module(block_module)
308
        self.block_params = block_params
309
        params = {'path': block_path,
310
                  'block_size': self.block_size,
311
                  'hash_algorithm': self.hash_algorithm,
312
                  'umask': block_umask}
313
        params.update(self.block_params)
314
        self.store = self.block_module.Store(**params)
315

    
316
        if queue_module and queue_hosts:
317
            self.queue_module = load_module(queue_module)
318
            params = {'hosts': queue_hosts,
319
                      'exchange': queue_exchange,
320
                      'client_id': QUEUE_CLIENT_ID}
321
            self.queue = self.queue_module.Queue(**params)
322
        else:
323
            class NoQueue:
324
                def send(self, *args):
325
                    pass
326

    
327
                def close(self):
328
                    pass
329

    
330
            self.queue = NoQueue()
331

    
332
        self.astakos_auth_url = astakos_auth_url
333
        self.service_token = service_token
334

    
335
        if not astakos_auth_url or not AstakosClient:
336
            self.astakosclient = DisabledAstakosClient(
337
                service_token, astakos_auth_url,
338
                use_pool=True,
339
                pool_size=astakosclient_poolsize)
340
        else:
341
            self.astakosclient = AstakosClient(
342
                service_token, astakos_auth_url,
343
                use_pool=True,
344
                pool_size=astakosclient_poolsize)
345

    
346
        self.serials = []
347
        self.messages = []
348

    
349
        self._move_object = partial(self._copy_object, is_move=True)
350

    
351
        self.lock_container_path = False
352

    
353
        self.in_transaction = False
354

    
355
        self._reset_allowed_paths()
356

    
357
    def pre_exec(self, lock_container_path=False):
358
        self.lock_container_path = lock_container_path
359
        self.wrapper.execute()
360
        self.serials = []
361
        self._reset_allowed_paths()
362
        self.in_transaction = True
363

    
364
    def post_exec(self, success_status=True):
365
        if success_status:
366
            # send messages produced
367
            for m in self.messages:
368
                self.queue.send(*m)
369

    
370
            # register serials
371
            if self.serials:
372
                self.commission_serials.insert_many(
373
                    self.serials)
374

    
375
                # commit to ensure that the serials are registered
376
                # even if resolve commission fails
377
                self.wrapper.commit()
378

    
379
                # start new transaction
380
                self.wrapper.execute()
381

    
382
                r = self.astakosclient.resolve_commissions(
383
                    accept_serials=self.serials,
384
                    reject_serials=[])
385
                self.commission_serials.delete_many(
386
                    r['accepted'])
387

    
388
            self.wrapper.commit()
389
        else:
390
            if self.serials:
391
                r = self.astakosclient.resolve_commissions(
392
                    accept_serials=[],
393
                    reject_serials=self.serials)
394
                self.commission_serials.delete_many(
395
                    r['rejected'])
396
            self.wrapper.rollback()
397
        self.in_transaction = False
398

    
399
    def close(self):
400
        self.wrapper.close()
401
        self.queue.close()
402

    
403
    @property
404
    def using_external_quotaholder(self):
405
        return not isinstance(self.astakosclient, DisabledAstakosClient)
406

    
407
    @debug_method
408
    @backend_method
409
    @list_method
410
    def list_accounts(self, user, marker=None, limit=10000):
411
        """Return a list of accounts the user can access."""
412

    
413
        return self._allowed_accounts(user)
414

    
415
    def _get_account_quotas(self, account):
416
        """Get account usage from astakos."""
417

    
418
        quotas = self.astakosclient.service_get_quotas(account)[account]
419
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
420
                                                  {})
421

    
422
    def _get_account_quotas(self, account):
423
        """Get account usage from astakos."""
424

    
425
        quotas = self.astakosclient.service_get_quotas(account)[account]
426
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
427
                                                  {})
428

    
429
    @debug_method
430
    @backend_method
431
    def get_account_meta(self, user, account, domain, until=None,
432
                         include_user_defined=True):
433
        """Return a dictionary with the account metadata for the domain."""
434

    
435
        self._can_read_account(user, account)
436
        path, node = self._lookup_account(account, user == account)
437
        if user != account:
438
            if until or (node is None):
439
                raise NotAllowedError
440
        try:
441
            props = self._get_properties(node, until)
442
            mtime = props[self.MTIME]
443
        except NameError:
444
            props = None
445
            mtime = until
446
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
447
        tstamp = max(tstamp, mtime)
448
        if until is None:
449
            modified = tstamp
450
        else:
451
            modified = self._get_statistics(
452
                node, compute=True)[2]  # Overall last modification.
453
            modified = max(modified, mtime)
454

    
455
        if user != account:
456
            meta = {'name': account}
457
        else:
458
            meta = {}
459
            if props is not None and include_user_defined:
460
                meta.update(
461
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
462
            if until is not None:
463
                meta.update({'until_timestamp': tstamp})
464
            meta.update({'name': account, 'count': count, 'bytes': bytes})
465
            if self.using_external_quotaholder:
466
                external_quota = self._get_account_quotas(account)
467
                meta['bytes'] = external_quota.get('usage', 0)
468
        meta.update({'modified': modified})
469
        return meta
470

    
471
    @debug_method
472
    @backend_method
473
    def update_account_meta(self, user, account, domain, meta, replace=False):
474
        """Update the metadata associated with the account for the domain."""
475

    
476
        self._can_write_account(user, account)
477
        path, node = self._lookup_account(account, True)
478
        self._put_metadata(user, node, domain, meta, replace,
479
                           update_statistics_ancestors_depth=-1)
480

    
481
    @debug_method
482
    @backend_method
483
    def get_account_groups(self, user, account):
484
        """Return a dictionary with the user groups defined for the account."""
485

    
486
        self._can_read_account(user, account)
487
        if user != account:
488
            return {}
489
        self._lookup_account(account, True)
490
        return self.permissions.group_dict(account)
491

    
492
    @debug_method
493
    @backend_method
494
    def update_account_groups(self, user, account, groups, replace=False):
495
        """Update the groups associated with the account."""
496

    
497
        self._can_write_account(user, account)
498
        self._lookup_account(account, True)
499
        self._check_groups(groups)
500
        if replace:
501
            self.permissions.group_destroy(account)
502
        for k, v in groups.iteritems():
503
            if not replace:  # If not already deleted.
504
                self.permissions.group_delete(account, k)
505
            if v:
506
                self.permissions.group_addmany(account, k, v)
507

    
508
    @debug_method
509
    @backend_method
510
    def get_account_policy(self, user, account):
511
        """Return a dictionary with the account policy."""
512

    
513
        self._can_read_account(user, account)
514
        if user != account:
515
            return {}
516
        path, node = self._lookup_account(account, True)
517
        policy = self._get_policy(node, is_account_policy=True)
518
        if self.using_external_quotaholder:
519
            external_quota = self._get_account_quotas(account)
520
            policy['quota'] = external_quota.get('limit', 0)
521
        return policy
522

    
523
    @debug_method
524
    @backend_method
525
    def update_account_policy(self, user, account, policy, replace=False):
526
        """Update the policy associated with the account."""
527

    
528
        self._can_write_account(user, account)
529
        path, node = self._lookup_account(account, True)
530
        self._check_policy(policy, is_account_policy=True)
531
        self._put_policy(node, policy, replace, is_account_policy=True)
532

    
533
    @debug_method
534
    @backend_method
535
    def put_account(self, user, account, policy=None):
536
        """Create a new account with the given name."""
537

    
538
        policy = policy or {}
539
        self._can_write_account(user, account)
540
        node = self.node.node_lookup(account)
541
        if node is not None:
542
            raise AccountExists('Account already exists')
543
        if policy:
544
            self._check_policy(policy, is_account_policy=True)
545
        node = self._put_path(user, self.ROOTNODE, account,
546
                              update_statistics_ancestors_depth=-1)
547
        self._put_policy(node, policy, True, is_account_policy=True)
548

    
549
    @debug_method
550
    @backend_method
551
    def delete_account(self, user, account):
552
        """Delete the account with the given name."""
553

    
554
        self._can_write_account(user, account)
555
        node = self.node.node_lookup(account)
556
        if node is None:
557
            return
558
        if not self.node.node_remove(node,
559
                                     update_statistics_ancestors_depth=-1):
560
            raise AccountNotEmpty('Account is not empty')
561
        self.permissions.group_destroy(account)
562

    
563
        # remove all the cached allowed paths
564
        # removing the specific path could be more expensive
565
        self._reset_allowed_paths()
566

    
567
    @debug_method
568
    @backend_method
569
    @list_method
570
    def list_containers(self, user, account, marker=None, limit=10000,
571
                        shared=False, until=None, public=False):
572
        """Return a list of containers existing under an account."""
573

    
574
        self._can_read_account(user, account)
575
        if user != account:
576
            if until:
577
                raise NotAllowedError
578
            return self._allowed_containers(user, account)
579
        if shared or public:
580
            allowed = set()
581
            if shared:
582
                allowed.update([x.split('/', 2)[1] for x in
583
                               self.permissions.access_list_shared(account)])
584
            if public:
585
                allowed.update([x[0].split('/', 2)[1] for x in
586
                               self.permissions.public_list(account)])
587
            return sorted(allowed)
588
        node = self.node.node_lookup(account)
589
        return [x[0] for x in self._list_object_properties(
590
            node, account, '', '/', marker, limit, False, None, [], until)]
591

    
592
    @debug_method
593
    @backend_method
594
    def list_container_meta(self, user, account, container, domain,
595
                            until=None):
596
        """Return a list of the container's object meta keys for a domain."""
597

    
598
        self._can_read_container(user, account, container)
599
        allowed = []
600
        if user != account:
601
            if until:
602
                raise NotAllowedError
603
        path, node = self._lookup_container(account, container)
604
        before = until if until is not None else inf
605
        allowed = self._get_formatted_paths(allowed)
606
        return self.node.latest_attribute_keys(node, domain, before,
607
                                               CLUSTER_DELETED, allowed)
608

    
609
    @debug_method
610
    @backend_method
611
    def get_container_meta(self, user, account, container, domain, until=None,
612
                           include_user_defined=True):
613
        """Return a dictionary with the container metadata for the domain."""
614

    
615
        self._can_read_container(user, account, container)
616
        if user != account:
617
            if until:
618
                raise NotAllowedError
619
        path, node = self._lookup_container(account, container)
620
        props = self._get_properties(node, until)
621
        mtime = props[self.MTIME]
622
        count, bytes, tstamp = self._get_statistics(node, until)
623
        tstamp = max(tstamp, mtime)
624
        if until is None:
625
            modified = tstamp
626
        else:
627
            modified = self._get_statistics(
628
                node)[2]  # Overall last modification.
629
            modified = max(modified, mtime)
630

    
631
        if user != account:
632
            meta = {'name': container}
633
        else:
634
            meta = {}
635
            if include_user_defined:
636
                meta.update(
637
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
638
            if until is not None:
639
                meta.update({'until_timestamp': tstamp})
640
            meta.update({'name': container, 'count': count, 'bytes': bytes})
641
        meta.update({'modified': modified})
642
        return meta
643

    
644
    @debug_method
645
    @backend_method
646
    def update_container_meta(self, user, account, container, domain, meta,
647
                              replace=False):
648
        """Update the metadata associated with the container for the domain."""
649

    
650
        self._can_write_container(user, account, container)
651
        path, node = self._lookup_container(account, container)
652
        src_version_id, dest_version_id = self._put_metadata(
653
            user, node, domain, meta, replace,
654
            update_statistics_ancestors_depth=0)
655
        if src_version_id is not None:
656
            versioning = self._get_policy(
657
                node, is_account_policy=False)['versioning']
658
            if versioning != 'auto':
659
                self.node.version_remove(src_version_id,
660
                                         update_statistics_ancestors_depth=0)
661

    
662
    @debug_method
663
    @backend_method
664
    def get_container_policy(self, user, account, container):
665
        """Return a dictionary with the container policy."""
666

    
667
        self._can_read_container(user, account, container)
668
        if user != account:
669
            return {}
670
        path, node = self._lookup_container(account, container)
671
        return self._get_policy(node, is_account_policy=False)
672

    
673
    @debug_method
674
    @backend_method
675
    def update_container_policy(self, user, account, container, policy,
676
                                replace=False):
677
        """Update the policy associated with the container."""
678

    
679
        self._can_write_container(user, account, container)
680
        path, node = self._lookup_container(account, container)
681
        self._check_policy(policy, is_account_policy=False)
682
        self._put_policy(node, policy, replace, is_account_policy=False)
683

    
684
    @debug_method
685
    @backend_method
686
    def put_container(self, user, account, container, policy=None):
687
        """Create a new container with the given name."""
688

    
689
        policy = policy or {}
690
        self._can_write_container(user, account, container)
691
        try:
692
            path, node = self._lookup_container(account, container)
693
        except NameError:
694
            pass
695
        else:
696
            raise ContainerExists('Container already exists')
697
        if policy:
698
            self._check_policy(policy, is_account_policy=False)
699
        path = '/'.join((account, container))
700
        node = self._put_path(
701
            user, self._lookup_account(account, True)[1], path,
702
            update_statistics_ancestors_depth=-1)
703
        self._put_policy(node, policy, True, is_account_policy=False)
704

    
705
    @debug_method
706
    @backend_method
707
    def delete_container(self, user, account, container, until=None, prefix='',
708
                         delimiter=None):
709
        """Delete/purge the container with the given name."""
710

    
711
        self._can_write_container(user, account, container)
712
        path, node = self._lookup_container(account, container)
713

    
714
        if until is not None:
715
            hashes, size, serials = self.node.node_purge_children(
716
                node, until, CLUSTER_HISTORY,
717
                update_statistics_ancestors_depth=0)
718
            for h in hashes:
719
                self.store.map_delete(h)
720
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
721
                                          update_statistics_ancestors_depth=0)
722
            if not self.free_versioning:
723
                self._report_size_change(
724
                    user, account, -size, {
725
                        'action': 'container purge',
726
                        'path': path,
727
                        'versions': ','.join(str(i) for i in serials)
728
                    }
729
                )
730
            return
731

    
732
        if not delimiter:
733
            if self._get_statistics(node)[0] > 0:
734
                raise ContainerNotEmpty('Container is not empty')
735
            hashes, size, serials = self.node.node_purge_children(
736
                node, inf, CLUSTER_HISTORY,
737
                update_statistics_ancestors_depth=0)
738
            for h in hashes:
739
                self.store.map_delete(h)
740
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
741
                                          update_statistics_ancestors_depth=0)
742
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
743
            if not self.free_versioning:
744
                self._report_size_change(
745
                    user, account, -size, {
746
                        'action': 'container purge',
747
                        'path': path,
748
                        'versions': ','.join(str(i) for i in serials)
749
                    }
750
                )
751
        else:
752
            # remove only contents
753
            src_names = self._list_objects_no_limit(
754
                user, account, container, prefix='', delimiter=None,
755
                virtual=False, domain=None, keys=[], shared=False, until=None,
756
                size_range=None, all_props=True, public=False)
757
            paths = []
758
            for t in src_names:
759
                path = '/'.join((account, container, t[0]))
760
                node = t[2]
761
                if not self._exists(node):
762
                    continue
763
                src_version_id, dest_version_id = self._put_version_duplicate(
764
                    user, node, size=0, type='', hash=None, checksum='',
765
                    cluster=CLUSTER_DELETED,
766
                    update_statistics_ancestors_depth=1)
767
                del_size = self._apply_versioning(
768
                    account, container, src_version_id,
769
                    update_statistics_ancestors_depth=1)
770
                self._report_size_change(
771
                    user, account, -del_size, {
772
                        'action': 'object delete',
773
                        'path': path,
774
                        'versions': ','.join([str(dest_version_id)])})
775
                self._report_object_change(
776
                    user, account, path, details={'action': 'object delete'})
777
                paths.append(path)
778
            self.permissions.access_clear_bulk(paths)
779

    
780
        # remove all the cached allowed paths
781
        # removing the specific path could be more expensive
782
        self._reset_allowed_paths()
783

    
784
    def _list_objects(self, user, account, container, prefix, delimiter,
785
                      marker, limit, virtual, domain, keys, shared, until,
786
                      size_range, all_props, public):
787
        if user != account and until:
788
            raise NotAllowedError
789

    
790
        objects = []
791
        if shared and public:
792
            # get shared first
793
            shared_paths = self._list_object_permissions(
794
                user, account, container, prefix, shared=True, public=False)
795
            if shared_paths:
796
                path, node = self._lookup_container(account, container)
797
                shared_paths = self._get_formatted_paths(shared_paths)
798
                objects = set(self._list_object_properties(
799
                    node, path, prefix, delimiter, marker, limit, virtual,
800
                    domain, keys, until, size_range, shared_paths, all_props))
801

    
802
            # get public
803
            objects |= set(self._list_public_object_properties(
804
                user, account, container, prefix, all_props))
805
            objects = list(objects)
806

    
807
            objects.sort(key=lambda x: x[0])
808
        elif public:
809
            objects = self._list_public_object_properties(
810
                user, account, container, prefix, all_props)
811
        else:
812
            allowed = self._list_object_permissions(
813
                user, account, container, prefix, shared, public=False)
814
            if shared and not allowed:
815
                return []
816
            path, node = self._lookup_container(account, container)
817
            allowed = self._get_formatted_paths(allowed)
818
            objects = self._list_object_properties(
819
                node, path, prefix, delimiter, marker, limit, virtual, domain,
820
                keys, until, size_range, allowed, all_props)
821

    
822
        # apply limits
823
        start, limit = self._list_limits(objects, marker, limit)
824
        return objects[start:start + limit]
825

    
826
    def _list_public_object_properties(self, user, account, container, prefix,
827
                                       all_props):
828
        public = self._list_object_permissions(
829
            user, account, container, prefix, shared=False, public=True)
830
        paths, nodes = self._lookup_objects(public)
831
        path = '/'.join((account, container))
832
        cont_prefix = path + '/'
833
        paths = [x[len(cont_prefix):] for x in paths]
834
        objects = [(p,) + props for p, props in
835
                   zip(paths, self.node.version_lookup_bulk(
836
                       nodes, all_props=all_props, order_by_path=True))]
837
        return objects
838

    
839
    def _list_objects_no_limit(self, user, account, container, prefix,
840
                               delimiter, virtual, domain, keys, shared, until,
841
                               size_range, all_props, public):
842
        objects = []
843
        while True:
844
            marker = objects[-1] if objects else None
845
            limit = 10000
846
            l = self._list_objects(
847
                user, account, container, prefix, delimiter, marker, limit,
848
                virtual, domain, keys, shared, until, size_range, all_props,
849
                public)
850
            objects.extend(l)
851
            if not l or len(l) < limit:
852
                break
853
        return objects
854

    
855
    def _list_object_permissions(self, user, account, container, prefix,
856
                                 shared, public):
857
        allowed = []
858
        path = '/'.join((account, container, prefix)).rstrip('/')
859
        if user != account:
860
            allowed = self.permissions.access_list_paths(user, path)
861
            if not allowed:
862
                raise NotAllowedError
863
        else:
864
            allowed = set()
865
            if shared:
866
                allowed.update(self.permissions.access_list_shared(path))
867
            if public:
868
                allowed.update(
869
                    [x[0] for x in self.permissions.public_list(path)])
870
            allowed = sorted(allowed)
871
            if not allowed:
872
                return []
873
        return allowed
874

    
875
    @debug_method
876
    @backend_method
877
    def list_objects(self, user, account, container, prefix='', delimiter=None,
878
                     marker=None, limit=10000, virtual=True, domain=None,
879
                     keys=None, shared=False, until=None, size_range=None,
880
                     public=False):
881
        """List (object name, object version_id) under a container."""
882

    
883
        keys = keys or []
884
        return self._list_objects(
885
            user, account, container, prefix, delimiter, marker, limit,
886
            virtual, domain, keys, shared, until, size_range, False, public)
887

    
888
    @debug_method
889
    @backend_method
890
    def list_object_meta(self, user, account, container, prefix='',
891
                         delimiter=None, marker=None, limit=10000,
892
                         virtual=True, domain=None, keys=None, shared=False,
893
                         until=None, size_range=None, public=False):
894
        """Return a list of metadata dicts of objects under a container."""
895

    
896
        keys = keys or []
897
        props = self._list_objects(
898
            user, account, container, prefix, delimiter, marker, limit,
899
            virtual, domain, keys, shared, until, size_range, True, public)
900
        objects = []
901
        for p in props:
902
            if len(p) == 2:
903
                objects.append({'subdir': p[0]})
904
            else:
905
                objects.append({
906
                    'name': p[0],
907
                    'bytes': p[self.SIZE + 1],
908
                    'type': p[self.TYPE + 1],
909
                    'hash': p[self.HASH + 1],
910
                    'version': p[self.SERIAL + 1],
911
                    'version_timestamp': p[self.MTIME + 1],
912
                    'modified': p[self.MTIME + 1] if until is None else None,
913
                    'modified_by': p[self.MUSER + 1],
914
                    'uuid': p[self.UUID + 1],
915
                    'checksum': p[self.CHECKSUM + 1]})
916
        return objects
917

    
918
    @debug_method
919
    @backend_method
920
    def list_object_permissions(self, user, account, container, prefix=''):
921
        """Return a list of paths enforce permissions under a container."""
922

    
923
        return self._list_object_permissions(user, account, container, prefix,
924
                                             True, False)
925

    
926
    @debug_method
927
    @backend_method
928
    def list_object_public(self, user, account, container, prefix=''):
929
        """Return a mapping of object paths to public ids under a container."""
930

    
931
        public = {}
932
        for path, p in self.permissions.public_list('/'.join((account,
933
                                                              container,
934
                                                              prefix))):
935
            public[path] = p
936
        return public
937

    
938
    @debug_method
939
    @backend_method
940
    def get_object_meta(self, user, account, container, name, domain,
941
                        version=None, include_user_defined=True):
942
        """Return a dictionary with the object metadata for the domain."""
943

    
944
        self._can_read_object(user, account, container, name)
945
        path, node = self._lookup_object(account, container, name)
946
        props = self._get_version(node, version)
947
        if version is None:
948
            if not props[self.AVAILABLE]:
949
                try:
950
                    self._update_available(props)
951
                except (NotAllowedError, IllegalOperationError):
952
                    pass  # just update the database
953
                finally:
954
                    # get updated properties
955
                    props = self._get_version(node, version)
956
            modified = props[self.MTIME]
957
        else:
958
            try:
959
                modified = self._get_version(
960
                    node)[self.MTIME]  # Overall last modification.
961
            except NameError:  # Object may be deleted.
962
                del_props = self.node.version_lookup(
963
                    node, inf, CLUSTER_DELETED)
964
                if del_props is None:
965
                    raise ItemNotExists('Object does not exist')
966
                modified = del_props[self.MTIME]
967

    
968
        meta = {}
969
        if include_user_defined:
970
            meta.update(
971
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
972
        meta.update({'name': name,
973
                     'bytes': props[self.SIZE],
974
                     'type': props[self.TYPE],
975
                     'hash': props[self.HASH],
976
                     'version': props[self.SERIAL],
977
                     'version_timestamp': props[self.MTIME],
978
                     'modified': modified,
979
                     'modified_by': props[self.MUSER],
980
                     'uuid': props[self.UUID],
981
                     'checksum': props[self.CHECKSUM],
982
                     'available': props[self.AVAILABLE],
983
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
984
        return meta
985

    
986
    @debug_method
987
    @backend_method
988
    def update_object_meta(self, user, account, container, name, domain, meta,
989
                           replace=False):
990
        """Update object metadata for a domain and return the new version."""
991

    
992
        self._can_write_object(user, account, container, name)
993

    
994
        path, node = self._lookup_object(account, container, name,
995
                                         lock_container=True)
996
        src_version_id, dest_version_id = self._put_metadata(
997
            user, node, domain, meta, replace,
998
            update_statistics_ancestors_depth=1)
999
        self._apply_versioning(account, container, src_version_id,
1000
                               update_statistics_ancestors_depth=1)
1001
        return dest_version_id
1002

    
1003
    @debug_method
1004
    @backend_method
1005
    def get_object_permissions_bulk(self, user, account, container, names):
1006
        """Return the action allowed on the object, the path
1007
        from which the object gets its permissions from,
1008
        along with a dictionary containing the permissions."""
1009

    
1010
        permissions_path = self._get_permissions_path_bulk(account, container,
1011
                                                           names)
1012
        access_objects = self.permissions.access_check_bulk(permissions_path,
1013
                                                            user)
1014
        #group_parents = access_objects['group_parents']
1015
        nobject_permissions = {}
1016
        cpath = '/'.join((account, container, ''))
1017
        cpath_idx = len(cpath)
1018
        for path in permissions_path:
1019
            allowed = 1
1020
            name = path[cpath_idx:]
1021
            if user != account:
1022
                try:
1023
                    allowed = access_objects[path]
1024
                except KeyError:
1025
                    raise NotAllowedError
1026
            access_dict, allowed = \
1027
                self.permissions.access_get_for_bulk(access_objects[path])
1028
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1029
                                         access_dict)
1030
        self._lookup_objects(permissions_path)
1031
        return nobject_permissions
1032

    
1033
    @debug_method
1034
    @backend_method
1035
    def get_object_permissions(self, user, account, container, name):
1036
        """Return the action allowed on the object, the path
1037
        from which the object gets its permissions from,
1038
        along with a dictionary containing the permissions."""
1039

    
1040
        allowed = 'write'
1041
        permissions_path = self._get_permissions_path(account, container, name)
1042
        if user != account:
1043
            if self.permissions.access_check(permissions_path, self.WRITE,
1044
                                             user):
1045
                allowed = 'write'
1046
            elif self.permissions.access_check(permissions_path, self.READ,
1047
                                               user):
1048
                allowed = 'read'
1049
            else:
1050
                raise NotAllowedError
1051
        self._lookup_object(account, container, name)
1052
        return (allowed,
1053
                permissions_path,
1054
                self.permissions.access_get(permissions_path))
1055

    
1056
    @debug_method
1057
    @backend_method
1058
    def update_object_permissions(self, user, account, container, name,
1059
                                  permissions):
1060
        """Update the permissions associated with the object."""
1061

    
1062
        if user != account:
1063
            raise NotAllowedError
1064
        path = self._lookup_object(account, container, name,
1065
                                   lock_container=True)[0]
1066
        self._check_permissions(path, permissions)
1067
        try:
1068
            self.permissions.access_set(path, permissions)
1069
        except:
1070
            raise ValueError
1071
        else:
1072
            self._report_sharing_change(user, account, path, {'members':
1073
                                        self.permissions.access_members(path)})
1074

    
1075
        # remove all the cached allowed paths
1076
        # filtering out only those affected could be more expensive
1077
        self._reset_allowed_paths()
1078

    
1079
    @debug_method
1080
    @backend_method
1081
    def get_object_public(self, user, account, container, name):
1082
        """Return the public id of the object if applicable."""
1083

    
1084
        self._can_read_object(user, account, container, name)
1085
        path = self._lookup_object(account, container, name)[0]
1086
        p = self.permissions.public_get(path)
1087
        return p
1088

    
1089
    @debug_method
1090
    @backend_method
1091
    def update_object_public(self, user, account, container, name, public):
1092
        """Update the public status of the object."""
1093

    
1094
        self._can_write_object(user, account, container, name)
1095
        path = self._lookup_object(account, container, name,
1096
                                   lock_container=True)[0]
1097
        if not public:
1098
            self.permissions.public_unset(path)
1099
        else:
1100
            self.permissions.public_set(
1101
                path, self.public_url_security, self.public_url_alphabet)
1102

    
1103
    def _update_available(self, props):
1104
        """Checks if the object map exists and updates the database"""
1105

    
1106
        if not props[self.AVAILABLE]:
1107
            if props[self.MAP_CHECK_TIMESTAMP]:
1108
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1109
                if elapsed_time < self.map_check_interval:
1110
                    raise NotAllowedError(
1111
                        'Consequent map checks are limited: retry later.')
1112
        try:
1113
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1114
                                                     props[self.SIZE])
1115
        except:  # map does not exist
1116
            # Raising an exception results in db transaction rollback
1117
            # However we have to force the update of the database
1118
            self.wrapper.rollback()  # rollback existing transaction
1119
            self.wrapper.execute()  # start new transaction
1120
            self.node.version_put_property(props[self.SERIAL],
1121
                                           'map_check_timestamp', time())
1122
            self.wrapper.commit()  # commit transaction
1123
            self.wrapper.execute()  # start new transaction
1124
            raise IllegalOperationError(
1125
                'Unable to retrieve Archipelago Volume hashmap.')
1126
        else:  # map exists
1127
            self.node.version_put_property(props[self.SERIAL],
1128
                                           'available', True)
1129
            self.node.version_put_property(props[self.SERIAL],
1130
                                           'map_check_timestamp', time())
1131
            return hashmap
1132

    
1133
    @debug_method
1134
    @backend_method
1135
    def get_object_hashmap(self, user, account, container, name, version=None):
1136
        """Return the object's size and a list with partial hashes."""
1137

    
1138
        self._can_read_object(user, account, container, name)
1139
        path, node = self._lookup_object(account, container, name)
1140
        props = self._get_version(node, version)
1141
        if props[self.HASH] is None:
1142
            return 0, ()
1143
        if props[self.HASH].startswith('archip:'):
1144
            hashmap = self._update_available(props)
1145
            return props[self.SIZE], [x for x in hashmap]
1146
        else:
1147
            hashmap = self.store.map_get(self._unhexlify_hash(
1148
                props[self.HASH]))
1149
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1150

    
1151
    def _update_object_hash(self, user, account, container, name, size, type,
1152
                            hash, checksum, domain, meta, replace_meta,
1153
                            permissions, src_node=None, src_version_id=None,
1154
                            is_copy=False, report_size_change=True,
1155
                            available=True):
1156
        if permissions is not None and user != account:
1157
            raise NotAllowedError
1158
        self._can_write_object(user, account, container, name)
1159
        if permissions is not None:
1160
            path = '/'.join((account, container, name))
1161
            self._check_permissions(path, permissions)
1162

    
1163
        account_path, account_node = self._lookup_account(account, True)
1164
        container_path, container_node = self._lookup_container(
1165
            account, container)
1166

    
1167
        path, node = self._put_object_node(
1168
            container_path, container_node, name)
1169
        pre_version_id, dest_version_id = self._put_version_duplicate(
1170
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1171
            checksum=checksum, is_copy=is_copy,
1172
            update_statistics_ancestors_depth=1,
1173
            available=available, keep_available=False)
1174

    
1175
        # Handle meta.
1176
        if src_version_id is None:
1177
            src_version_id = pre_version_id
1178
        self._put_metadata_duplicate(
1179
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1180

    
1181
        del_size = self._apply_versioning(account, container, pre_version_id,
1182
                                          update_statistics_ancestors_depth=1)
1183
        size_delta = size - del_size
1184
        if size_delta > 0:
1185
            # Check account quota.
1186
            if not self.using_external_quotaholder:
1187
                account_quota = long(self._get_policy(
1188
                    account_node, is_account_policy=True)['quota'])
1189
                account_usage = self._get_statistics(account_node,
1190
                                                     compute=True)[1]
1191
                if (account_quota > 0 and account_usage > account_quota):
1192
                    raise QuotaError(
1193
                        'Account quota exceeded: limit: %s, usage: %s' % (
1194
                            account_quota, account_usage))
1195

    
1196
            # Check container quota.
1197
            container_quota = long(self._get_policy(
1198
                container_node, is_account_policy=False)['quota'])
1199
            container_usage = self._get_statistics(container_node)[1]
1200
            if (container_quota > 0 and container_usage > container_quota):
1201
                # This must be executed in a transaction, so the version is
1202
                # never created if it fails.
1203
                raise QuotaError(
1204
                    'Container quota exceeded: limit: %s, usage: %s' % (
1205
                        container_quota, container_usage
1206
                    )
1207
                )
1208

    
1209
        if report_size_change:
1210
            self._report_size_change(
1211
                user, account, size_delta,
1212
                {'action': 'object update', 'path': path,
1213
                 'versions': ','.join([str(dest_version_id)])})
1214
        if permissions is not None:
1215
            self.permissions.access_set(path, permissions)
1216
            self._report_sharing_change(
1217
                user, account, path,
1218
                {'members': self.permissions.access_members(path)})
1219

    
1220
        self._report_object_change(
1221
            user, account, path,
1222
            details={'version': dest_version_id, 'action': 'object update'})
1223
        return dest_version_id
1224

    
1225
    @debug_method
1226
    @backend_method
1227
    def register_object_map(self, user, account, container, name, size, type,
1228
                            mapfile, checksum='', domain='pithos', meta=None,
1229
                            replace_meta=False, permissions=None):
1230
        """Register an object mapfile without providing any data.
1231

1232
        Lock the container path, create a node pointing to the object path,
1233
        create a version pointing to the mapfile
1234
        and issue the size change in the quotaholder.
1235

1236
        :param user: the user account which performs the action
1237

1238
        :param account: the account under which the object resides
1239

1240
        :param container: the container under which the object resides
1241

1242
        :param name: the object name
1243

1244
        :param size: the object size
1245

1246
        :param type: the object mimetype
1247

1248
        :param mapfile: the mapfile pointing to the object data
1249

1250
        :param checkcum: the md5 checksum (optional)
1251

1252
        :param domain: the object domain
1253

1254
        :param meta: a dict with custom object metadata
1255

1256
        :param replace_meta: replace existing metadata or not
1257

1258
        :param permissions: a dict with the read and write object permissions
1259

1260
        :returns: the new object uuid
1261

1262
        :raises: ItemNotExists, NotAllowedError, QuotaError
1263
        """
1264

    
1265
        meta = meta or {}
1266
        try:
1267
            self.lock_container_path = True
1268
            self.put_container(user, account, container, policy=None)
1269
        except ContainerExists:
1270
            pass
1271
        finally:
1272
            self.lock_container_path = False
1273
        dest_version_id = self._update_object_hash(
1274
            user, account, container, name, size, type, mapfile, checksum,
1275
            domain, meta, replace_meta, permissions, available=False)
1276
        return self.node.version_get_properties(dest_version_id,
1277
                                                keys=('uuid',))[0]
1278

    
1279
    @debug_method
1280
    def update_object_hashmap(self, user, account, container, name, size, type,
1281
                              hashmap, checksum, domain, meta=None,
1282
                              replace_meta=False, permissions=None):
1283
        """Create/update an object's hashmap and return the new version."""
1284

    
1285
        for h in hashmap:
1286
            if h.startswith('archip_'):
1287
                raise IllegalOperationError(
1288
                    'Cannot update Archipelago Volume hashmap.')
1289
        meta = meta or {}
1290
        if size == 0:  # No such thing as an empty hashmap.
1291
            hashmap = [self.put_block('')]
1292
        map = HashMap(self.block_size, self.hash_algorithm)
1293
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1294
        missing = self.store.block_search(map)
1295
        if missing:
1296
            ie = IndexError()
1297
            ie.data = [binascii.hexlify(x) for x in missing]
1298
            raise ie
1299

    
1300
        hash = map.hash()
1301
        hexlified = binascii.hexlify(hash)
1302
        # _update_object_hash() locks destination path
1303
        dest_version_id = self._update_object_hash(
1304
            user, account, container, name, size, type, hexlified, checksum,
1305
            domain, meta, replace_meta, permissions)
1306
        self.store.map_put(hash, map)
1307
        return dest_version_id, hexlified
1308

    
1309
    @debug_method
1310
    @backend_method
1311
    def update_object_checksum(self, user, account, container, name, version,
1312
                               checksum):
1313
        """Update an object's checksum."""
1314

    
1315
        # Update objects with greater version and same hashmap
1316
        # and size (fix metadata updates).
1317
        self._can_write_object(user, account, container, name)
1318
        path, node = self._lookup_object(account, container, name,
1319
                                         lock_container=True)
1320
        props = self._get_version(node, version)
1321
        versions = self.node.node_get_versions(node)
1322
        for x in versions:
1323
            if (x[self.SERIAL] >= int(version) and
1324
                x[self.HASH] == props[self.HASH] and
1325
                    x[self.SIZE] == props[self.SIZE]):
1326
                self.node.version_put_property(
1327
                    x[self.SERIAL], 'checksum', checksum)
1328

    
1329
    def _copy_object(self, user, src_account, src_container, src_name,
1330
                     dest_account, dest_container, dest_name, type,
1331
                     dest_domain=None, dest_meta=None, replace_meta=False,
1332
                     permissions=None, src_version=None, is_move=False,
1333
                     delimiter=None):
1334

    
1335
        report_size_change = not is_move
1336
        dest_meta = dest_meta or {}
1337
        dest_version_ids = []
1338
        self._can_read_object(user, src_account, src_container, src_name)
1339

    
1340
        src_container_path = '/'.join((src_account, src_container))
1341
        dest_container_path = '/'.join((dest_account, dest_container))
1342
        # Lock container paths in alphabetical order
1343
        if src_container_path < dest_container_path:
1344
            self._lookup_container(src_account, src_container)
1345
            self._lookup_container(dest_account, dest_container)
1346
        else:
1347
            self._lookup_container(dest_account, dest_container)
1348
            self._lookup_container(src_account, src_container)
1349

    
1350
        path, node = self._lookup_object(src_account, src_container, src_name)
1351
        # TODO: Will do another fetch of the properties in duplicate version...
1352
        props = self._get_version(
1353
            node, src_version)  # Check to see if source exists.
1354
        src_version_id = props[self.SERIAL]
1355
        hash = props[self.HASH]
1356
        size = props[self.SIZE]
1357
        is_copy = not is_move and (src_account, src_container, src_name) != (
1358
            dest_account, dest_container, dest_name)  # New uuid.
1359
        dest_version_ids.append(self._update_object_hash(
1360
            user, dest_account, dest_container, dest_name, size, type, hash,
1361
            None, dest_domain, dest_meta, replace_meta, permissions,
1362
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1363
            report_size_change=report_size_change))
1364
        if is_move and ((src_account, src_container, src_name) !=
1365
                        (dest_account, dest_container, dest_name)):
1366
            self._delete_object(user, src_account, src_container, src_name,
1367
                                report_size_change=report_size_change)
1368

    
1369
        if delimiter:
1370
            prefix = (src_name + delimiter if not
1371
                      src_name.endswith(delimiter) else src_name)
1372
            src_names = self._list_objects_no_limit(
1373
                user, src_account, src_container, prefix, delimiter=None,
1374
                virtual=False, domain=None, keys=[], shared=False, until=None,
1375
                size_range=None, all_props=True, public=False)
1376
            src_names.sort(key=lambda x: x[2])  # order by nodes
1377
            paths = [elem[0] for elem in src_names]
1378
            nodes = [elem[2] for elem in src_names]
1379
            # TODO: Will do another fetch of the properties
1380
            # in duplicate version...
1381
            props = self._get_versions(nodes)  # Check to see if source exists.
1382

    
1383
            for prop, path, node in zip(props, paths, nodes):
1384
                src_version_id = prop[self.SERIAL]
1385
                hash = prop[self.HASH]
1386
                vtype = prop[self.TYPE]
1387
                size = prop[self.SIZE]
1388
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1389
                    delimiter) else dest_name
1390
                vdest_name = path.replace(prefix, dest_prefix, 1)
1391
                # _update_object_hash() locks destination path
1392
                dest_version_ids.append(self._update_object_hash(
1393
                    user, dest_account, dest_container, vdest_name, size,
1394
                    vtype, hash, None, dest_domain, meta={},
1395
                    replace_meta=False, permissions=None, src_node=node,
1396
                    src_version_id=src_version_id, is_copy=is_copy,
1397
                    report_size_change=report_size_change))
1398
                if is_move and ((src_account, src_container, src_name) !=
1399
                                (dest_account, dest_container, dest_name)):
1400
                    self._delete_object(user, src_account, src_container, path,
1401
                                        report_size_change=report_size_change)
1402
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1403
                dest_version_ids)
1404

    
1405
    @debug_method
1406
    @backend_method
1407
    def copy_object(self, user, src_account, src_container, src_name,
1408
                    dest_account, dest_container, dest_name, type, domain,
1409
                    meta=None, replace_meta=False, permissions=None,
1410
                    src_version=None, delimiter=None):
1411
        """Copy an object's data and metadata."""
1412

    
1413
        meta = meta or {}
1414
        dest_version_id = self._copy_object(
1415
            user, src_account, src_container, src_name, dest_account,
1416
            dest_container, dest_name, type, domain, meta, replace_meta,
1417
            permissions, src_version, False, delimiter)
1418
        return dest_version_id
1419

    
1420
    @debug_method
1421
    @backend_method
1422
    def move_object(self, user, src_account, src_container, src_name,
1423
                    dest_account, dest_container, dest_name, type, domain,
1424
                    meta=None, replace_meta=False, permissions=None,
1425
                    delimiter=None):
1426
        """Move an object's data and metadata."""
1427

    
1428
        meta = meta or {}
1429
        if user != src_account:
1430
            raise NotAllowedError
1431
        dest_version_id = self._move_object(
1432
            user, src_account, src_container, src_name, dest_account,
1433
            dest_container, dest_name, type, domain, meta, replace_meta,
1434
            permissions, None, delimiter=delimiter)
1435
        return dest_version_id
1436

    
1437
    def _delete_object(self, user, account, container, name, until=None,
1438
                       delimiter=None, report_size_change=True):
1439
        if user != account:
1440
            raise NotAllowedError
1441

    
1442
        # lookup object and lock container path also
1443
        path, node = self._lookup_object(account, container, name,
1444
                                         lock_container=True)
1445

    
1446
        if until is not None:
1447
            if node is None:
1448
                return
1449
            hashes = []
1450
            size = 0
1451
            serials = []
1452
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1453
                                           update_statistics_ancestors_depth=1)
1454
            hashes += h
1455
            size += s
1456
            serials += v
1457
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1458
                                           update_statistics_ancestors_depth=1)
1459
            hashes += h
1460
            if not self.free_versioning:
1461
                size += s
1462
            serials += v
1463
            for h in hashes:
1464
                self.store.map_delete(h)
1465
            self.node.node_purge(node, until, CLUSTER_DELETED,
1466
                                 update_statistics_ancestors_depth=1)
1467
            try:
1468
                self._get_version(node)
1469
            except NameError:
1470
                self.permissions.access_clear(path)
1471
            self._report_size_change(
1472
                user, account, -size, {
1473
                    'action': 'object purge',
1474
                    'path': path,
1475
                    'versions': ','.join(str(i) for i in serials)
1476
                }
1477
            )
1478
            return
1479

    
1480
        if not self._exists(node):
1481
            raise ItemNotExists('Object is deleted.')
1482

    
1483
        src_version_id, dest_version_id = self._put_version_duplicate(
1484
            user, node, size=0, type='', hash=None, checksum='',
1485
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1486
        del_size = self._apply_versioning(account, container, src_version_id,
1487
                                          update_statistics_ancestors_depth=1)
1488
        if report_size_change:
1489
            self._report_size_change(
1490
                user, account, -del_size,
1491
                {'action': 'object delete',
1492
                 'path': path,
1493
                 'versions': ','.join([str(dest_version_id)])})
1494
        self._report_object_change(
1495
            user, account, path, details={'action': 'object delete'})
1496
        self.permissions.access_clear(path)
1497

    
1498
        if delimiter:
1499
            prefix = name + delimiter if not name.endswith(delimiter) else name
1500
            src_names = self._list_objects_no_limit(
1501
                user, account, container, prefix, delimiter=None,
1502
                virtual=False, domain=None, keys=[], shared=False, until=None,
1503
                size_range=None, all_props=True, public=False)
1504
            paths = []
1505
            for t in src_names:
1506
                path = '/'.join((account, container, t[0]))
1507
                node = t[2]
1508
                if not self._exists(node):
1509
                    continue
1510
                src_version_id, dest_version_id = self._put_version_duplicate(
1511
                    user, node, size=0, type='', hash=None, checksum='',
1512
                    cluster=CLUSTER_DELETED,
1513
                    update_statistics_ancestors_depth=1)
1514
                del_size = self._apply_versioning(
1515
                    account, container, src_version_id,
1516
                    update_statistics_ancestors_depth=1)
1517
                if report_size_change:
1518
                    self._report_size_change(
1519
                        user, account, -del_size,
1520
                        {'action': 'object delete',
1521
                         'path': path,
1522
                         'versions': ','.join([str(dest_version_id)])})
1523
                self._report_object_change(
1524
                    user, account, path, details={'action': 'object delete'})
1525
                paths.append(path)
1526
            self.permissions.access_clear_bulk(paths)
1527

    
1528
        # remove all the cached allowed paths
1529
        # removing the specific path could be more expensive
1530
        self._reset_allowed_paths()
1531

    
1532
    @debug_method
1533
    @backend_method
1534
    def delete_object(self, user, account, container, name, until=None,
1535
                      prefix='', delimiter=None):
1536
        """Delete/purge an object."""
1537

    
1538
        self._delete_object(user, account, container, name, until, delimiter)
1539

    
1540
    @debug_method
1541
    @backend_method
1542
    def list_versions(self, user, account, container, name):
1543
        """Return a list of all object (version, version_timestamp) tuples."""
1544

    
1545
        self._can_read_object(user, account, container, name)
1546
        path, node = self._lookup_object(account, container, name)
1547
        versions = self.node.node_get_versions(node)
1548
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1549
                x[self.CLUSTER] != CLUSTER_DELETED]
1550

    
1551
    @debug_method
1552
    @backend_method
1553
    def get_uuid(self, user, uuid, check_permissions=True):
1554
        """Return the (account, container, name) for the UUID given."""
1555

    
1556
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1557
        if info is None:
1558
            raise NameError
1559
        path, serial = info
1560
        account, container, name = path.split('/', 2)
1561
        if check_permissions:
1562
            self._can_read_object(user, account, container, name)
1563
        return (account, container, name)
1564

    
1565
    @debug_method
1566
    @backend_method
1567
    def get_public(self, user, public):
1568
        """Return the (account, container, name) for the public id given."""
1569

    
1570
        path = self.permissions.public_path(public)
1571
        if path is None:
1572
            raise NameError
1573
        account, container, name = path.split('/', 2)
1574
        self._can_read_object(user, account, container, name)
1575
        return (account, container, name)
1576

    
1577
    def get_block(self, hash):
1578
        """Return a block's data."""
1579

    
1580
        logger.debug("get_block: %s", hash)
1581
        if hash.startswith('archip_'):
1582
            block = self.store.block_get_archipelago(hash)
1583
        else:
1584
            block = self.store.block_get(self._unhexlify_hash(hash))
1585
        if not block:
1586
            raise ItemNotExists('Block does not exist')
1587
        return block
1588

    
1589
    def put_block(self, data):
1590
        """Store a block and return the hash."""
1591

    
1592
        logger.debug("put_block: %s", len(data))
1593
        return binascii.hexlify(self.store.block_put(data))
1594

    
1595
    def update_block(self, hash, data, offset=0):
1596
        """Update a known block and return the hash."""
1597

    
1598
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1599
        if hash.startswith('archip_'):
1600
            raise IllegalOperationError(
1601
                'Cannot update an Archipelago Volume block.')
1602
        if offset == 0 and len(data) == self.block_size:
1603
            return self.put_block(data)
1604
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1605
        return binascii.hexlify(h)
1606

    
1607
    # Path functions.
1608

    
1609
    def _generate_uuid(self):
1610
        return str(uuidlib.uuid4())
1611

    
1612
    def _put_object_node(self, path, parent, name):
1613
        path = '/'.join((path, name))
1614
        node = self.node.node_lookup(path)
1615
        if node is None:
1616
            node = self.node.node_create(parent, path)
1617
        return path, node
1618

    
1619
    def _put_path(self, user, parent, path,
1620
                  update_statistics_ancestors_depth=None):
1621
        node = self.node.node_create(parent, path)
1622
        self.node.version_create(node, None, 0, '', None, user,
1623
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1624
                                 update_statistics_ancestors_depth)
1625
        return node
1626

    
1627
    def _lookup_account(self, account, create=True):
1628
        node = self.node.node_lookup(account)
1629
        if node is None and create:
1630
            node = self._put_path(
1631
                account, self.ROOTNODE, account,
1632
                update_statistics_ancestors_depth=-1)  # User is account.
1633
        return account, node
1634

    
1635
    def _lookup_container(self, account, container):
1636
        for_update = True if self.lock_container_path else False
1637
        path = '/'.join((account, container))
1638
        node = self.node.node_lookup(path, for_update)
1639
        if node is None:
1640
            raise ItemNotExists('Container does not exist')
1641
        return path, node
1642

    
1643
    def _lookup_object(self, account, container, name, lock_container=False):
1644
        if lock_container:
1645
            self._lookup_container(account, container)
1646

    
1647
        path = '/'.join((account, container, name))
1648
        node = self.node.node_lookup(path)
1649
        if node is None:
1650
            raise ItemNotExists('Object does not exist')
1651
        return path, node
1652

    
1653
    def _lookup_objects(self, paths):
1654
        nodes = self.node.node_lookup_bulk(paths)
1655
        return paths, nodes
1656

    
1657
    def _get_properties(self, node, until=None):
1658
        """Return properties until the timestamp given."""
1659

    
1660
        before = until if until is not None else inf
1661
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1662
        if props is None and until is not None:
1663
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1664
        if props is None:
1665
            raise ItemNotExists('Path does not exist')
1666
        return props
1667

    
1668
    def _get_statistics(self, node, until=None, compute=False):
1669
        """Return (count, sum of size, timestamp) of everything under node."""
1670

    
1671
        if until is not None:
1672
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1673
        elif compute:
1674
            stats = self.node.statistics_latest(node,
1675
                                                except_cluster=CLUSTER_DELETED)
1676
        else:
1677
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1678
        if stats is None:
1679
            stats = (0, 0, 0)
1680
        return stats
1681

    
1682
    def _get_version(self, node, version=None):
1683
        if version is None:
1684
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1685
            if props is None:
1686
                raise ItemNotExists('Object does not exist')
1687
        else:
1688
            try:
1689
                version = int(version)
1690
            except ValueError:
1691
                raise VersionNotExists('Version does not exist')
1692
            props = self.node.version_get_properties(version, node=node)
1693
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1694
                raise VersionNotExists('Version does not exist')
1695
        return props
1696

    
1697
    def _get_versions(self, nodes):
1698
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1699

    
1700
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1701
                               type=None, hash=None, checksum=None,
1702
                               cluster=CLUSTER_NORMAL, is_copy=False,
1703
                               update_statistics_ancestors_depth=None,
1704
                               available=True, keep_available=True):
1705
        """Create a new version of the node."""
1706

    
1707
        props = self.node.version_lookup(
1708
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1709
        if props is not None:
1710
            src_version_id = props[self.SERIAL]
1711
            src_hash = props[self.HASH]
1712
            src_size = props[self.SIZE]
1713
            src_type = props[self.TYPE]
1714
            src_checksum = props[self.CHECKSUM]
1715
            if keep_available:
1716
                src_available = props[self.AVAILABLE]
1717
                src_map_check_timestamp = props[self.MAP_CHECK_TIMESTAMP]
1718
            else:
1719
                src_available = available
1720
                src_map_check_timestamp = None
1721
        else:
1722
            src_version_id = None
1723
            src_hash = None
1724
            src_size = 0
1725
            src_type = ''
1726
            src_checksum = ''
1727
            src_available = available
1728
            src_map_check_timestamp = None
1729
        if size is None:  # Set metadata.
1730
            hash = src_hash  # This way hash can be set to None
1731
                             # (account or container).
1732
            size = src_size
1733
        if type is None:
1734
            type = src_type
1735
        if checksum is None:
1736
            checksum = src_checksum
1737
        uuid = self._generate_uuid(
1738
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1739

    
1740
        if src_node is None:
1741
            pre_version_id = src_version_id
1742
        else:
1743
            pre_version_id = None
1744
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1745
            if props is not None:
1746
                pre_version_id = props[self.SERIAL]
1747
        if pre_version_id is not None:
1748
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1749
                                        update_statistics_ancestors_depth)
1750

    
1751
        dest_version_id, mtime = self.node.version_create(
1752
            node, hash, size, type, src_version_id, user, uuid, checksum,
1753
            cluster, update_statistics_ancestors_depth,
1754
            available=src_available,
1755
            map_check_timestamp=src_map_check_timestamp)
1756

    
1757
        self.node.attribute_unset_is_latest(node, dest_version_id)
1758

    
1759
        return pre_version_id, dest_version_id
1760

    
1761
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1762
                                node, meta, replace=False):
1763
        if src_version_id is not None:
1764
            self.node.attribute_copy(src_version_id, dest_version_id)
1765
        if not replace:
1766
            self.node.attribute_del(dest_version_id, domain, (
1767
                k for k, v in meta.iteritems() if v == ''))
1768
            self.node.attribute_set(dest_version_id, domain, node, (
1769
                (k, v) for k, v in meta.iteritems() if v != ''))
1770
        else:
1771
            self.node.attribute_del(dest_version_id, domain)
1772
            self.node.attribute_set(dest_version_id, domain, node, ((
1773
                k, v) for k, v in meta.iteritems()))
1774

    
1775
    def _put_metadata(self, user, node, domain, meta, replace=False,
1776
                      update_statistics_ancestors_depth=None):
1777
        """Create a new version and store metadata."""
1778

    
1779
        src_version_id, dest_version_id = self._put_version_duplicate(
1780
            user, node,
1781
            update_statistics_ancestors_depth=
1782
            update_statistics_ancestors_depth)
1783
        self._put_metadata_duplicate(
1784
            src_version_id, dest_version_id, domain, node, meta, replace)
1785
        return src_version_id, dest_version_id
1786

    
1787
    def _list_limits(self, listing, marker, limit):
1788
        start = 0
1789
        if marker:
1790
            try:
1791
                start = listing.index(marker) + 1
1792
            except ValueError:
1793
                pass
1794
        if not limit or limit > 10000:
1795
            limit = 10000
1796
        return start, limit
1797

    
1798
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1799
                                marker=None, limit=10000, virtual=True,
1800
                                domain=None, keys=None, until=None,
1801
                                size_range=None, allowed=None,
1802
                                all_props=False):
1803
        keys = keys or []
1804
        allowed = allowed or []
1805
        cont_prefix = path + '/'
1806
        prefix = cont_prefix + prefix
1807
        start = cont_prefix + marker if marker else None
1808
        before = until if until is not None else inf
1809
        filterq = keys if domain else []
1810
        sizeq = size_range
1811

    
1812
        objects, prefixes = self.node.latest_version_list(
1813
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1814
            allowed, domain, filterq, sizeq, all_props)
1815
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1816
        objects.sort(key=lambda x: x[0])
1817
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1818
        return objects
1819

    
1820
    # Reporting functions.
1821

    
1822
    @debug_method
1823
    @backend_method
1824
    def _report_size_change(self, user, account, size, details=None):
1825
        details = details or {}
1826

    
1827
        if size == 0:
1828
            return
1829

    
1830
        account_node = self._lookup_account(account, True)[1]
1831
        total = self._get_statistics(account_node, compute=True)[1]
1832
        details.update({'user': user, 'total': total})
1833
        self.messages.append(
1834
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1835
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1836

    
1837
        if not self.using_external_quotaholder:
1838
            return
1839

    
1840
        try:
1841
            name = details['path'] if 'path' in details else ''
1842
            serial = self.astakosclient.issue_one_commission(
1843
                holder=account,
1844
                source=DEFAULT_SOURCE,
1845
                provisions={'pithos.diskspace': size},
1846
                name=name)
1847
        except BaseException, e:
1848
            raise QuotaError(e)
1849
        else:
1850
            self.serials.append(serial)
1851

    
1852
    @debug_method
1853
    @backend_method
1854
    def _report_object_change(self, user, account, path, details=None):
1855
        details = details or {}
1856
        details.update({'user': user})
1857
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1858
                              account, QUEUE_INSTANCE_ID, 'object', path,
1859
                              details))
1860

    
1861
    @debug_method
1862
    @backend_method
1863
    def _report_sharing_change(self, user, account, path, details=None):
1864
        details = details or {}
1865
        details.update({'user': user})
1866
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1867
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1868
                              details))
1869

    
1870
    # Policy functions.
1871

    
1872
    def _check_policy(self, policy, is_account_policy=True):
1873
        default_policy = self.default_account_policy \
1874
            if is_account_policy else self.default_container_policy
1875
        for k in policy.keys():
1876
            if policy[k] == '':
1877
                policy[k] = default_policy.get(k)
1878
        for k, v in policy.iteritems():
1879
            if k == 'quota':
1880
                q = int(v)  # May raise ValueError.
1881
                if q < 0:
1882
                    raise ValueError
1883
            elif k == 'versioning':
1884
                if v not in ['auto', 'none']:
1885
                    raise ValueError
1886
            else:
1887
                raise ValueError
1888

    
1889
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1890
        default_policy = self.default_account_policy \
1891
            if is_account_policy else self.default_container_policy
1892
        if replace:
1893
            for k, v in default_policy.iteritems():
1894
                if k not in policy:
1895
                    policy[k] = v
1896
        self.node.policy_set(node, policy)
1897

    
1898
    def _get_policy(self, node, is_account_policy=True):
1899
        default_policy = self.default_account_policy \
1900
            if is_account_policy else self.default_container_policy
1901
        policy = default_policy.copy()
1902
        policy.update(self.node.policy_get(node))
1903
        return policy
1904

    
1905
    def _apply_versioning(self, account, container, version_id,
1906
                          update_statistics_ancestors_depth=None):
1907
        """Delete the provided version if such is the policy.
1908
           Return size of object removed.
1909
        """
1910

    
1911
        if version_id is None:
1912
            return 0
1913
        path, node = self._lookup_container(account, container)
1914
        versioning = self._get_policy(
1915
            node, is_account_policy=False)['versioning']
1916
        if versioning != 'auto':
1917
            hash, size = self.node.version_remove(
1918
                version_id, update_statistics_ancestors_depth)
1919
            self.store.map_delete(hash)
1920
            return size
1921
        elif self.free_versioning:
1922
            return self.node.version_get_properties(
1923
                version_id, keys=('size',))[0]
1924
        return 0
1925

    
1926
    # Access control functions.
1927

    
1928
    def _check_groups(self, groups):
1929
        # raise ValueError('Bad characters in groups')
1930
        pass
1931

    
1932
    def _check_permissions(self, path, permissions):
1933
        # raise ValueError('Bad characters in permissions')
1934
        pass
1935

    
1936
    def _get_formatted_paths(self, paths):
1937
        formatted = []
1938
        if len(paths) == 0:
1939
            return formatted
1940
        props = self.node.get_props(paths)
1941
        if props:
1942
            for prop in props:
1943
                if prop[1].split(';', 1)[0].strip() in (
1944
                        'application/directory', 'application/folder'):
1945
                    formatted.append((prop[0].rstrip('/') + '/',
1946
                                      self.MATCH_PREFIX))
1947
                formatted.append((prop[0], self.MATCH_EXACT))
1948
        return formatted
1949

    
1950
    def _get_permissions_path(self, account, container, name):
1951
        path = '/'.join((account, container, name))
1952
        permission_paths = self.permissions.access_inherit(path)
1953
        permission_paths.sort()
1954
        permission_paths.reverse()
1955
        for p in permission_paths:
1956
            if p == path:
1957
                return p
1958
            else:
1959
                if p.count('/') < 2:
1960
                    continue
1961
                node = self.node.node_lookup(p)
1962
                props = None
1963
                if node is not None:
1964
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1965
                if props is not None:
1966
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1967
                            'application/directory', 'application/folder'):
1968
                        return p
1969
        return None
1970

    
1971
    def _get_permissions_path_bulk(self, account, container, names):
1972
        formatted_paths = []
1973
        for name in names:
1974
            path = '/'.join((account, container, name))
1975
            formatted_paths.append(path)
1976
        permission_paths = self.permissions.access_inherit_bulk(
1977
            formatted_paths)
1978
        permission_paths.sort()
1979
        permission_paths.reverse()
1980
        permission_paths_list = []
1981
        lookup_list = []
1982
        for p in permission_paths:
1983
            if p in formatted_paths:
1984
                permission_paths_list.append(p)
1985
            else:
1986
                if p.count('/') < 2:
1987
                    continue
1988
                lookup_list.append(p)
1989

    
1990
        if len(lookup_list) > 0:
1991
            props = self.node.get_props(lookup_list)
1992
            if props:
1993
                for prop in props:
1994
                    if prop[1].split(';', 1)[0].strip() in (
1995
                            'application/directory', 'application/folder'):
1996
                        permission_paths_list.append(prop[0])
1997

    
1998
        if len(permission_paths_list) > 0:
1999
            return permission_paths_list
2000

    
2001
        return None
2002

    
2003
    def _reset_allowed_paths(self):
2004
        self.read_allowed_paths = defaultdict(set)
2005
        self.write_allowed_paths = defaultdict(set)
2006

    
2007
    @check_allowed_paths(action=0)
2008
    def _can_read_account(self, user, account):
2009
        if user != account:
2010
            if account not in self._allowed_accounts(user):
2011
                raise NotAllowedError
2012

    
2013
    @check_allowed_paths(action=1)
2014
    def _can_write_account(self, user, account):
2015
        if user != account:
2016
            raise NotAllowedError
2017

    
2018
    @check_allowed_paths(action=0)
2019
    def _can_read_container(self, user, account, container):
2020
        if user != account:
2021
            if container not in self._allowed_containers(user, account):
2022
                raise NotAllowedError
2023

    
2024
    @check_allowed_paths(action=1)
2025
    def _can_write_container(self, user, account, container):
2026
        if user != account:
2027
            raise NotAllowedError
2028

    
2029
    @check_allowed_paths(action=0)
2030
    def _can_read_object(self, user, account, container, name):
2031
        if user == account:
2032
            return True
2033
        path = '/'.join((account, container, name))
2034
        if self.permissions.public_get(path) is not None:
2035
            return True
2036
        path = self._get_permissions_path(account, container, name)
2037
        if not path:
2038
            raise NotAllowedError
2039
        if (not self.permissions.access_check(path, self.READ, user) and not
2040
                self.permissions.access_check(path, self.WRITE, user)):
2041
            raise NotAllowedError
2042

    
2043
    @check_allowed_paths(action=1)
2044
    def _can_write_object(self, user, account, container, name):
2045
        if user == account:
2046
            return True
2047
        path = '/'.join((account, container, name))
2048
        path = self._get_permissions_path(account, container, name)
2049
        if not path:
2050
            raise NotAllowedError
2051
        if not self.permissions.access_check(path, self.WRITE, user):
2052
            raise NotAllowedError
2053

    
2054
    def _allowed_accounts(self, user):
2055
        allow = set()
2056
        for path in self.permissions.access_list_paths(user):
2057
            p = path.split('/', 1)[0]
2058
            allow.add(p)
2059
        self.read_allowed_paths[user] |= allow
2060
        return sorted(allow)
2061

    
2062
    def _allowed_containers(self, user, account):
2063
        allow = set()
2064
        for path in self.permissions.access_list_paths(user, account):
2065
            p = path.split('/', 2)[1]
2066
            allow.add(p)
2067
        self.read_allowed_paths[user] |= allow
2068
        return sorted(allow)
2069

    
2070
    # Domain functions
2071

    
2072
    @debug_method
2073
    @backend_method
2074
    def get_domain_objects(self, domain, user=None):
2075
        allowed_paths = self.permissions.access_list_paths(
2076
            user, include_owned=user is not None, include_containers=False)
2077
        if not allowed_paths:
2078
            return []
2079
        obj_list = self.node.domain_object_list(
2080
            domain, allowed_paths, CLUSTER_NORMAL)
2081
        return [(path,
2082
                 self._build_metadata(props, user_defined_meta),
2083
                 self.permissions.access_get(path)) for
2084
                path, props, user_defined_meta in obj_list]
2085

    
2086
    # util functions
2087

    
2088
    def _build_metadata(self, props, user_defined=None,
2089
                        include_user_defined=True):
2090
        meta = {'bytes': props[self.SIZE],
2091
                'type': props[self.TYPE],
2092
                'hash': props[self.HASH],
2093
                'version': props[self.SERIAL],
2094
                'version_timestamp': props[self.MTIME],
2095
                'modified_by': props[self.MUSER],
2096
                'uuid': props[self.UUID],
2097
                'checksum': props[self.CHECKSUM]}
2098
        if include_user_defined and user_defined is not None:
2099
            meta.update(user_defined)
2100
        return meta
2101

    
2102
    def _exists(self, node):
2103
        try:
2104
            self._get_version(node)
2105
        except ItemNotExists:
2106
            return False
2107
        else:
2108
            return True
2109

    
2110
    def _unhexlify_hash(self, hash):
2111
        try:
2112
            return binascii.unhexlify(hash)
2113
        except TypeError:
2114
            raise InvalidHash(hash)