Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 6e3e5c84

History | View | Annotate | Download (83.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from collections import defaultdict
41
from functools import wraps, partial
42
from traceback import format_exc
43
from time import time
44

    
45
from pithos.workers import glue
46
from archipelago.common import Segment, Xseg_ctx
47
from objpool import ObjectPool
48

    
49

    
50
try:
51
    from astakosclient import AstakosClient
52
except ImportError:
53
    AstakosClient = None
54

    
55
from pithos.backends.base import (
56
    DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
57
    DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
58
    BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
59
    ContainerNotEmpty, ItemNotExists, VersionNotExists,
60
    InvalidHash, IllegalOperationError)
61

    
62

    
63
class DisabledAstakosClient(object):
64
    def __init__(self, *args, **kwargs):
65
        self.args = args
66
        self.kwargs = kwargs
67

    
68
    def __getattr__(self, name):
69
        m = ("AstakosClient has been disabled, "
70
             "yet an attempt to access it was made")
71
        raise AssertionError(m)
72

    
73

    
74
# Stripped-down version of the HashMap class found in tools.
75

    
76
class HashMap(list):
77

    
78
    def __init__(self, blocksize, blockhash):
79
        super(HashMap, self).__init__()
80
        self.blocksize = blocksize
81
        self.blockhash = blockhash
82

    
83
    def _hash_raw(self, v):
84
        h = hashlib.new(self.blockhash)
85
        h.update(v)
86
        return h.digest()
87

    
88
    def hash(self):
89
        if len(self) == 0:
90
            return self._hash_raw('')
91
        if len(self) == 1:
92
            return self.__getitem__(0)
93

    
94
        h = list(self)
95
        s = 2
96
        while s < len(h):
97
            s = s * 2
98
        h += [('\x00' * len(h[0]))] * (s - len(h))
99
        while len(h) > 1:
100
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
101
        return h[0]
102

    
103
# Default modules and settings.
104
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
105
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
106
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
107
DEFAULT_BLOCK_PATH = 'data/'
108
DEFAULT_BLOCK_UMASK = 0o022
109
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
110
DEFAULT_HASH_ALGORITHM = 'sha256'
111
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
112
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
113
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
114
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
115
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
116
                               'abcdefghijklmnopqrstuvwxyz'
117
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
118
DEFAULT_PUBLIC_URL_SECURITY = 16
119
DEFAULT_ARCHIPELAGO_CONF_FILE = '/etc/archipelago/archipelago.conf'
120

    
121
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
122
QUEUE_CLIENT_ID = 'pithos'
123
QUEUE_INSTANCE_ID = '1'
124

    
125
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
126

    
127
inf = float('inf')
128

    
129
ULTIMATE_ANSWER = 42
130

    
131
DEFAULT_SOURCE = 'system'
132
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
133

    
134
DEFAULT_MAP_CHECK_INTERVAL = 5  # set to 5 secs
135

    
136
logger = logging.getLogger(__name__)
137

    
138

    
139
def backend_method(func):
140
    @wraps(func)
141
    def wrapper(self, *args, **kw):
142
        # if we are inside a database transaction
143
        # just proceed with the method execution
144
        # otherwise manage a new transaction
145
        if self.in_transaction:
146
            return func(self, *args, **kw)
147

    
148
        try:
149
            self.pre_exec()
150
            result = func(self, *args, **kw)
151
            success_status = True
152
            return result
153
        except:
154
            success_status = False
155
            raise
156
        finally:
157
            self.post_exec(success_status)
158
    return wrapper
159

    
160

    
161
def debug_method(func):
162
    @wraps(func)
163
    def wrapper(self, *args, **kw):
164
        try:
165
            result = func(self, *args, **kw)
166
            return result
167
        except:
168
            result = format_exc()
169
            raise
170
        finally:
171
            all_args = map(repr, args)
172
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
173
            logger.debug(">>> %s(%s) <<< %s" % (
174
                func.__name__, ', '.join(all_args).rstrip(', '), result))
175
    return wrapper
176

    
177

    
178
def check_allowed_paths(action):
179
    """Decorator for backend methods checking path access granted to user.
180

181
    The 1st argument of the decorated method is expected to be a
182
    ModularBackend instance, the 2nd the user performing the request and
183
    the path join of the rest arguments is supposed to be the requested path.
184

185
    The decorator checks whether the requested path is among the user's allowed
186
    cached paths.
187
    If this is the case, the decorator returns immediately to reduce the
188
    interactions with the database.
189
    Otherwise, it proceeds with the execution of the decorated method and if
190
    the method returns successfully (no exceptions are raised), the requested
191
    path is added to the user's cached allowed paths.
192

193
    :param action: (int) 0 for reads / 1 for writes
194
    :raises NotAllowedError: the user does not have access to the path
195
    """
196
    def decorator(func):
197
        @wraps(func)
198
        def wrapper(self, *args):
199
            user = args[0]
200
            if action == self.READ:
201
                d = self.read_allowed_paths
202
            else:
203
                d = self.write_allowed_paths
204
            path = '/'.join(args[1:])
205
            if path in d.get(user, []):
206
                return  # access is already checked
207
            else:
208
                func(self, *args)   # proceed with access check
209
                d[user].add(path)  # add path in the allowed user paths
210
        return wrapper
211
    return decorator
212

    
213

    
214
def list_method(func):
215
    @wraps(func)
216
    def wrapper(self, *args, **kw):
217
        marker = kw.get('marker')
218
        limit = kw.get('limit')
219
        result = func(self, *args, **kw)
220
        start, limit = self._list_limits(result, marker, limit)
221
        return result[start:start + limit]
222
    return wrapper
223

    
224

    
225
class ModularBackend(BaseBackend):
226
    """A modular backend.
227

228
    Uses modules for SQL functions and storage.
229
    """
230

    
231
    def __init__(self, db_module=None, db_connection=None,
232
                 block_module=None, block_path=None, block_umask=None,
233
                 block_size=None, hash_algorithm=None,
234
                 queue_module=None, queue_hosts=None, queue_exchange=None,
235
                 astakos_auth_url=None, service_token=None,
236
                 astakosclient_poolsize=None,
237
                 free_versioning=True, block_params=None,
238
                 public_url_security=None,
239
                 public_url_alphabet=None,
240
                 account_quota_policy=None,
241
                 container_quota_policy=None,
242
                 container_versioning_policy=None,
243
                 archipelago_conf_file=None,
244
                 xseg_pool_size=8,
245
                 map_check_interval=None):
246
        db_module = db_module or DEFAULT_DB_MODULE
247
        db_connection = db_connection or DEFAULT_DB_CONNECTION
248
        block_module = block_module or DEFAULT_BLOCK_MODULE
249
        block_path = block_path or DEFAULT_BLOCK_PATH
250
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
251
        block_params = block_params or DEFAULT_BLOCK_PARAMS
252
        block_size = block_size or DEFAULT_BLOCK_SIZE
253
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
254
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
255
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
256
        container_quota_policy = container_quota_policy \
257
            or DEFAULT_CONTAINER_QUOTA
258
        container_versioning_policy = container_versioning_policy \
259
            or DEFAULT_CONTAINER_VERSIONING
260
        archipelago_conf_file = archipelago_conf_file \
261
            or DEFAULT_ARCHIPELAGO_CONF_FILE
262
        map_check_interval = map_check_interval \
263
            or DEFAULT_MAP_CHECK_INTERVAL
264

    
265
        self.default_account_policy = {'quota': account_quota_policy}
266
        self.default_container_policy = {
267
            'quota': container_quota_policy,
268
            'versioning': container_versioning_policy
269
        }
270
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
271
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
272

    
273
        self.public_url_security = (public_url_security or
274
                                    DEFAULT_PUBLIC_URL_SECURITY)
275
        self.public_url_alphabet = (public_url_alphabet or
276
                                    DEFAULT_PUBLIC_URL_ALPHABET)
277

    
278
        self.hash_algorithm = hash_algorithm
279
        self.block_size = block_size
280
        self.free_versioning = free_versioning
281
        self.map_check_interval = map_check_interval
282

    
283
        def load_module(m):
284
            __import__(m)
285
            return sys.modules[m]
286

    
287
        self.db_module = load_module(db_module)
288
        self.wrapper = self.db_module.DBWrapper(db_connection)
289
        params = {'wrapper': self.wrapper}
290
        self.permissions = self.db_module.Permissions(**params)
291
        self.config = self.db_module.Config(**params)
292
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
293
        for x in ['READ', 'WRITE']:
294
            setattr(self, x, getattr(self.db_module, x))
295
        self.node = self.db_module.Node(**params)
296
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
297
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
298
                  'MATCH_PREFIX', 'MATCH_EXACT',
299
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
300
            setattr(self, x, getattr(self.db_module, x))
301

    
302
        self.ALLOWED = ['read', 'write']
303

    
304
        glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx,
305
                                      cfile=archipelago_conf_file,
306
                                      pool_size=xseg_pool_size)
307
        self.block_module = load_module(block_module)
308
        self.block_params = block_params
309
        params = {'path': block_path,
310
                  'block_size': self.block_size,
311
                  'hash_algorithm': self.hash_algorithm,
312
                  'umask': block_umask}
313
        params.update(self.block_params)
314
        self.store = self.block_module.Store(**params)
315

    
316
        if queue_module and queue_hosts:
317
            self.queue_module = load_module(queue_module)
318
            params = {'hosts': queue_hosts,
319
                      'exchange': queue_exchange,
320
                      'client_id': QUEUE_CLIENT_ID}
321
            self.queue = self.queue_module.Queue(**params)
322
        else:
323
            class NoQueue:
324
                def send(self, *args):
325
                    pass
326

    
327
                def close(self):
328
                    pass
329

    
330
            self.queue = NoQueue()
331

    
332
        self.astakos_auth_url = astakos_auth_url
333
        self.service_token = service_token
334

    
335
        if not astakos_auth_url or not AstakosClient:
336
            self.astakosclient = DisabledAstakosClient(
337
                service_token, astakos_auth_url,
338
                use_pool=True,
339
                pool_size=astakosclient_poolsize)
340
        else:
341
            self.astakosclient = AstakosClient(
342
                service_token, astakos_auth_url,
343
                use_pool=True,
344
                pool_size=astakosclient_poolsize)
345

    
346
        self.serials = []
347
        self.messages = []
348

    
349
        self._move_object = partial(self._copy_object, is_move=True)
350

    
351
        self.lock_container_path = False
352

    
353
        self.in_transaction = False
354

    
355
        self._reset_allowed_paths()
356

    
357
    def pre_exec(self, lock_container_path=False):
358
        self.lock_container_path = lock_container_path
359
        self.wrapper.execute()
360
        self.serials = []
361
        self._reset_allowed_paths()
362
        self.in_transaction = True
363

    
364
    def post_exec(self, success_status=True):
365
        if success_status:
366
            # send messages produced
367
            for m in self.messages:
368
                self.queue.send(*m)
369

    
370
            # register serials
371
            if self.serials:
372
                self.commission_serials.insert_many(
373
                    self.serials)
374

    
375
                # commit to ensure that the serials are registered
376
                # even if resolve commission fails
377
                self.wrapper.commit()
378

    
379
                # start new transaction
380
                self.wrapper.execute()
381

    
382
                r = self.astakosclient.resolve_commissions(
383
                    accept_serials=self.serials,
384
                    reject_serials=[])
385
                self.commission_serials.delete_many(
386
                    r['accepted'])
387

    
388
            self.wrapper.commit()
389
        else:
390
            if self.serials:
391
                r = self.astakosclient.resolve_commissions(
392
                    accept_serials=[],
393
                    reject_serials=self.serials)
394
                self.commission_serials.delete_many(
395
                    r['rejected'])
396
            self.wrapper.rollback()
397
        self.in_transaction = False
398

    
399
    def close(self):
400
        self.wrapper.close()
401
        self.queue.close()
402

    
403
    @property
404
    def using_external_quotaholder(self):
405
        return not isinstance(self.astakosclient, DisabledAstakosClient)
406

    
407
    @debug_method
408
    @backend_method
409
    @list_method
410
    def list_accounts(self, user, marker=None, limit=10000):
411
        """Return a list of accounts the user can access."""
412

    
413
        return self._allowed_accounts(user)
414

    
415
    def _get_account_quotas(self, account):
416
        """Get account usage from astakos."""
417

    
418
        quotas = self.astakosclient.service_get_quotas(account)[account]
419
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
420
                                                  {})
421

    
422
    def _get_account_quotas(self, account):
423
        """Get account usage from astakos."""
424

    
425
        quotas = self.astakosclient.service_get_quotas(account)[account]
426
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
427
                                                  {})
428

    
429
    @debug_method
430
    @backend_method
431
    def get_account_meta(self, user, account, domain, until=None,
432
                         include_user_defined=True):
433
        """Return a dictionary with the account metadata for the domain."""
434

    
435
        self._can_read_account(user, account)
436
        path, node = self._lookup_account(account, user == account)
437
        if user != account:
438
            if until or (node is None):
439
                raise NotAllowedError
440
        try:
441
            props = self._get_properties(node, until)
442
            mtime = props[self.MTIME]
443
        except NameError:
444
            props = None
445
            mtime = until
446
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
447
        tstamp = max(tstamp, mtime)
448
        if until is None:
449
            modified = tstamp
450
        else:
451
            modified = self._get_statistics(
452
                node, compute=True)[2]  # Overall last modification.
453
            modified = max(modified, mtime)
454

    
455
        if user != account:
456
            meta = {'name': account}
457
        else:
458
            meta = {}
459
            if props is not None and include_user_defined:
460
                meta.update(
461
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
462
            if until is not None:
463
                meta.update({'until_timestamp': tstamp})
464
            meta.update({'name': account, 'count': count, 'bytes': bytes})
465
            if self.using_external_quotaholder:
466
                external_quota = self._get_account_quotas(account)
467
                meta['bytes'] = external_quota.get('usage', 0)
468
        meta.update({'modified': modified})
469
        return meta
470

    
471
    @debug_method
472
    @backend_method
473
    def update_account_meta(self, user, account, domain, meta, replace=False):
474
        """Update the metadata associated with the account for the domain."""
475

    
476
        self._can_write_account(user, account)
477
        path, node = self._lookup_account(account, True)
478
        self._put_metadata(user, node, domain, meta, replace,
479
                           update_statistics_ancestors_depth=-1)
480

    
481
    @debug_method
482
    @backend_method
483
    def get_account_groups(self, user, account):
484
        """Return a dictionary with the user groups defined for the account."""
485

    
486
        self._can_read_account(user, account)
487
        if user != account:
488
            return {}
489
        self._lookup_account(account, True)
490
        return self.permissions.group_dict(account)
491

    
492
    @debug_method
493
    @backend_method
494
    def update_account_groups(self, user, account, groups, replace=False):
495
        """Update the groups associated with the account."""
496

    
497
        self._can_write_account(user, account)
498
        self._lookup_account(account, True)
499
        self._check_groups(groups)
500
        if replace:
501
            self.permissions.group_destroy(account)
502
        for k, v in groups.iteritems():
503
            if not replace:  # If not already deleted.
504
                self.permissions.group_delete(account, k)
505
            if v:
506
                self.permissions.group_addmany(account, k, v)
507

    
508
    @debug_method
509
    @backend_method
510
    def get_account_policy(self, user, account):
511
        """Return a dictionary with the account policy."""
512

    
513
        self._can_read_account(user, account)
514
        if user != account:
515
            return {}
516
        path, node = self._lookup_account(account, True)
517
        policy = self._get_policy(node, is_account_policy=True)
518
        if self.using_external_quotaholder:
519
            external_quota = self._get_account_quotas(account)
520
            policy['quota'] = external_quota.get('limit', 0)
521
        return policy
522

    
523
    @debug_method
524
    @backend_method
525
    def update_account_policy(self, user, account, policy, replace=False):
526
        """Update the policy associated with the account."""
527

    
528
        self._can_write_account(user, account)
529
        path, node = self._lookup_account(account, True)
530
        self._check_policy(policy, is_account_policy=True)
531
        self._put_policy(node, policy, replace, is_account_policy=True)
532

    
533
    @debug_method
534
    @backend_method
535
    def put_account(self, user, account, policy=None):
536
        """Create a new account with the given name."""
537

    
538
        policy = policy or {}
539
        self._can_write_account(user, account)
540
        node = self.node.node_lookup(account)
541
        if node is not None:
542
            raise AccountExists('Account already exists')
543
        if policy:
544
            self._check_policy(policy, is_account_policy=True)
545
        node = self._put_path(user, self.ROOTNODE, account,
546
                              update_statistics_ancestors_depth=-1)
547
        self._put_policy(node, policy, True, is_account_policy=True)
548

    
549
    @debug_method
550
    @backend_method
551
    def delete_account(self, user, account):
552
        """Delete the account with the given name."""
553

    
554
        self._can_write_account(user, account)
555
        node = self.node.node_lookup(account)
556
        if node is None:
557
            return
558
        if not self.node.node_remove(node,
559
                                     update_statistics_ancestors_depth=-1):
560
            raise AccountNotEmpty('Account is not empty')
561
        self.permissions.group_destroy(account)
562

    
563
        # remove all the cached allowed paths
564
        # removing the specific path could be more expensive
565
        self._reset_allowed_paths()
566

    
567
    @debug_method
568
    @backend_method
569
    @list_method
570
    def list_containers(self, user, account, marker=None, limit=10000,
571
                        shared=False, until=None, public=False):
572
        """Return a list of containers existing under an account."""
573

    
574
        self._can_read_account(user, account)
575
        if user != account:
576
            if until:
577
                raise NotAllowedError
578
            return self._allowed_containers(user, account)
579
        if shared or public:
580
            allowed = set()
581
            if shared:
582
                allowed.update([x.split('/', 2)[1] for x in
583
                               self.permissions.access_list_shared(account)])
584
            if public:
585
                allowed.update([x[0].split('/', 2)[1] for x in
586
                               self.permissions.public_list(account)])
587
            return sorted(allowed)
588
        node = self.node.node_lookup(account)
589
        return [x[0] for x in self._list_object_properties(
590
            node, account, '', '/', marker, limit, False, None, [], until)]
591

    
592
    @debug_method
593
    @backend_method
594
    def list_container_meta(self, user, account, container, domain,
595
                            until=None):
596
        """Return a list of the container's object meta keys for a domain."""
597

    
598
        self._can_read_container(user, account, container)
599
        allowed = []
600
        if user != account:
601
            if until:
602
                raise NotAllowedError
603
        path, node = self._lookup_container(account, container)
604
        before = until if until is not None else inf
605
        allowed = self._get_formatted_paths(allowed)
606
        return self.node.latest_attribute_keys(node, domain, before,
607
                                               CLUSTER_DELETED, allowed)
608

    
609
    @debug_method
610
    @backend_method
611
    def get_container_meta(self, user, account, container, domain, until=None,
612
                           include_user_defined=True):
613
        """Return a dictionary with the container metadata for the domain."""
614

    
615
        self._can_read_container(user, account, container)
616
        if user != account:
617
            if until:
618
                raise NotAllowedError
619
        path, node = self._lookup_container(account, container)
620
        props = self._get_properties(node, until)
621
        mtime = props[self.MTIME]
622
        count, bytes, tstamp = self._get_statistics(node, until)
623
        tstamp = max(tstamp, mtime)
624
        if until is None:
625
            modified = tstamp
626
        else:
627
            modified = self._get_statistics(
628
                node)[2]  # Overall last modification.
629
            modified = max(modified, mtime)
630

    
631
        if user != account:
632
            meta = {'name': container}
633
        else:
634
            meta = {}
635
            if include_user_defined:
636
                meta.update(
637
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
638
            if until is not None:
639
                meta.update({'until_timestamp': tstamp})
640
            meta.update({'name': container, 'count': count, 'bytes': bytes})
641
        meta.update({'modified': modified})
642
        return meta
643

    
644
    @debug_method
645
    @backend_method
646
    def update_container_meta(self, user, account, container, domain, meta,
647
                              replace=False):
648
        """Update the metadata associated with the container for the domain."""
649

    
650
        self._can_write_container(user, account, container)
651
        path, node = self._lookup_container(account, container)
652
        src_version_id, dest_version_id = self._put_metadata(
653
            user, node, domain, meta, replace,
654
            update_statistics_ancestors_depth=0)
655
        if src_version_id is not None:
656
            versioning = self._get_policy(
657
                node, is_account_policy=False)['versioning']
658
            if versioning != 'auto':
659
                self.node.version_remove(src_version_id,
660
                                         update_statistics_ancestors_depth=0)
661

    
662
    @debug_method
663
    @backend_method
664
    def get_container_policy(self, user, account, container):
665
        """Return a dictionary with the container policy."""
666

    
667
        self._can_read_container(user, account, container)
668
        if user != account:
669
            return {}
670
        path, node = self._lookup_container(account, container)
671
        return self._get_policy(node, is_account_policy=False)
672

    
673
    @debug_method
674
    @backend_method
675
    def update_container_policy(self, user, account, container, policy,
676
                                replace=False):
677
        """Update the policy associated with the container."""
678

    
679
        self._can_write_container(user, account, container)
680
        path, node = self._lookup_container(account, container)
681
        self._check_policy(policy, is_account_policy=False)
682
        self._put_policy(node, policy, replace, is_account_policy=False)
683

    
684
    @debug_method
685
    @backend_method
686
    def put_container(self, user, account, container, policy=None):
687
        """Create a new container with the given name."""
688

    
689
        policy = policy or {}
690
        self._can_write_container(user, account, container)
691
        try:
692
            path, node = self._lookup_container(account, container)
693
        except NameError:
694
            pass
695
        else:
696
            raise ContainerExists('Container already exists')
697
        if policy:
698
            self._check_policy(policy, is_account_policy=False)
699
        path = '/'.join((account, container))
700
        node = self._put_path(
701
            user, self._lookup_account(account, True)[1], path,
702
            update_statistics_ancestors_depth=-1)
703
        self._put_policy(node, policy, True, is_account_policy=False)
704

    
705
    @debug_method
706
    @backend_method
707
    def delete_container(self, user, account, container, until=None, prefix='',
708
                         delimiter=None):
709
        """Delete/purge the container with the given name."""
710

    
711
        self._can_write_container(user, account, container)
712
        path, node = self._lookup_container(account, container)
713

    
714
        if until is not None:
715
            hashes, size, serials = self.node.node_purge_children(
716
                node, until, CLUSTER_HISTORY,
717
                update_statistics_ancestors_depth=0)
718
            for h in hashes:
719
                self.store.map_delete(h)
720
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
721
                                          update_statistics_ancestors_depth=0)
722
            if not self.free_versioning:
723
                self._report_size_change(
724
                    user, account, -size, {
725
                        'action': 'container purge',
726
                        'path': path,
727
                        'versions': ','.join(str(i) for i in serials)
728
                    }
729
                )
730
            return
731

    
732
        if not delimiter:
733
            if self._get_statistics(node)[0] > 0:
734
                raise ContainerNotEmpty('Container is not empty')
735
            hashes, size, serials = self.node.node_purge_children(
736
                node, inf, CLUSTER_HISTORY,
737
                update_statistics_ancestors_depth=0)
738
            for h in hashes:
739
                self.store.map_delete(h)
740
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
741
                                          update_statistics_ancestors_depth=0)
742
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
743
            if not self.free_versioning:
744
                self._report_size_change(
745
                    user, account, -size, {
746
                        'action': 'container purge',
747
                        'path': path,
748
                        'versions': ','.join(str(i) for i in serials)
749
                    }
750
                )
751
        else:
752
            # remove only contents
753
            src_names = self._list_objects_no_limit(
754
                user, account, container, prefix='', delimiter=None,
755
                virtual=False, domain=None, keys=[], shared=False, until=None,
756
                size_range=None, all_props=True, public=False)
757
            paths = []
758
            for t in src_names:
759
                path = '/'.join((account, container, t[0]))
760
                node = t[2]
761
                if not self._exists(node):
762
                    continue
763
                src_version_id, dest_version_id = self._put_version_duplicate(
764
                    user, node, size=0, type='', hash=None, checksum='',
765
                    cluster=CLUSTER_DELETED,
766
                    update_statistics_ancestors_depth=1)
767
                del_size = self._apply_versioning(
768
                    account, container, src_version_id,
769
                    update_statistics_ancestors_depth=1)
770
                self._report_size_change(
771
                    user, account, -del_size, {
772
                        'action': 'object delete',
773
                        'path': path,
774
                        'versions': ','.join([str(dest_version_id)])})
775
                self._report_object_change(
776
                    user, account, path, details={'action': 'object delete'})
777
                paths.append(path)
778
            self.permissions.access_clear_bulk(paths)
779

    
780
        # remove all the cached allowed paths
781
        # removing the specific path could be more expensive
782
        self._reset_allowed_paths()
783

    
784
    def _list_objects(self, user, account, container, prefix, delimiter,
785
                      marker, limit, virtual, domain, keys, shared, until,
786
                      size_range, all_props, public):
787
        if user != account and until:
788
            raise NotAllowedError
789

    
790
        objects = []
791
        if shared and public:
792
            # get shared first
793
            shared_paths = self._list_object_permissions(
794
                user, account, container, prefix, shared=True, public=False)
795
            if shared_paths:
796
                path, node = self._lookup_container(account, container)
797
                shared_paths = self._get_formatted_paths(shared_paths)
798
                objects = set(self._list_object_properties(
799
                    node, path, prefix, delimiter, marker, limit, virtual,
800
                    domain, keys, until, size_range, shared_paths, all_props))
801

    
802
            # get public
803
            objects |= set(self._list_public_object_properties(
804
                user, account, container, prefix, all_props))
805
            objects = list(objects)
806

    
807
            objects.sort(key=lambda x: x[0])
808
        elif public:
809
            objects = self._list_public_object_properties(
810
                user, account, container, prefix, all_props)
811
        else:
812
            allowed = self._list_object_permissions(
813
                user, account, container, prefix, shared, public=False)
814
            if shared and not allowed:
815
                return []
816
            path, node = self._lookup_container(account, container)
817
            allowed = self._get_formatted_paths(allowed)
818
            objects = self._list_object_properties(
819
                node, path, prefix, delimiter, marker, limit, virtual, domain,
820
                keys, until, size_range, allowed, all_props)
821

    
822
        # apply limits
823
        start, limit = self._list_limits(objects, marker, limit)
824
        return objects[start:start + limit]
825

    
826
    def _list_public_object_properties(self, user, account, container, prefix,
827
                                       all_props):
828
        public = self._list_object_permissions(
829
            user, account, container, prefix, shared=False, public=True)
830
        paths, nodes = self._lookup_objects(public)
831
        path = '/'.join((account, container))
832
        cont_prefix = path + '/'
833
        paths = [x[len(cont_prefix):] for x in paths]
834
        objects = [(p,) + props for p, props in
835
                   zip(paths, self.node.version_lookup_bulk(
836
                       nodes, all_props=all_props, order_by_path=True))]
837
        return objects
838

    
839
    def _list_objects_no_limit(self, user, account, container, prefix,
840
                               delimiter, virtual, domain, keys, shared, until,
841
                               size_range, all_props, public):
842
        objects = []
843
        while True:
844
            marker = objects[-1] if objects else None
845
            limit = 10000
846
            l = self._list_objects(
847
                user, account, container, prefix, delimiter, marker, limit,
848
                virtual, domain, keys, shared, until, size_range, all_props,
849
                public)
850
            objects.extend(l)
851
            if not l or len(l) < limit:
852
                break
853
        return objects
854

    
855
    def _list_object_permissions(self, user, account, container, prefix,
856
                                 shared, public):
857
        allowed = []
858
        path = '/'.join((account, container, prefix)).rstrip('/')
859
        if user != account:
860
            allowed = self.permissions.access_list_paths(user, path)
861
            if not allowed:
862
                raise NotAllowedError
863
        else:
864
            allowed = set()
865
            if shared:
866
                allowed.update(self.permissions.access_list_shared(path))
867
            if public:
868
                allowed.update(
869
                    [x[0] for x in self.permissions.public_list(path)])
870
            allowed = sorted(allowed)
871
            if not allowed:
872
                return []
873
        return allowed
874

    
875
    @debug_method
876
    @backend_method
877
    def list_objects(self, user, account, container, prefix='', delimiter=None,
878
                     marker=None, limit=10000, virtual=True, domain=None,
879
                     keys=None, shared=False, until=None, size_range=None,
880
                     public=False):
881
        """List (object name, object version_id) under a container."""
882

    
883
        keys = keys or []
884
        return self._list_objects(
885
            user, account, container, prefix, delimiter, marker, limit,
886
            virtual, domain, keys, shared, until, size_range, False, public)
887

    
888
    @debug_method
889
    @backend_method
890
    def list_object_meta(self, user, account, container, prefix='',
891
                         delimiter=None, marker=None, limit=10000,
892
                         virtual=True, domain=None, keys=None, shared=False,
893
                         until=None, size_range=None, public=False):
894
        """Return a list of metadata dicts of objects under a container."""
895

    
896
        keys = keys or []
897
        props = self._list_objects(
898
            user, account, container, prefix, delimiter, marker, limit,
899
            virtual, domain, keys, shared, until, size_range, True, public)
900
        objects = []
901
        for p in props:
902
            if len(p) == 2:
903
                objects.append({'subdir': p[0]})
904
            else:
905
                objects.append({
906
                    'name': p[0],
907
                    'bytes': p[self.SIZE + 1],
908
                    'type': p[self.TYPE + 1],
909
                    'hash': p[self.HASH + 1],
910
                    'version': p[self.SERIAL + 1],
911
                    'version_timestamp': p[self.MTIME + 1],
912
                    'modified': p[self.MTIME + 1] if until is None else None,
913
                    'modified_by': p[self.MUSER + 1],
914
                    'uuid': p[self.UUID + 1],
915
                    'checksum': p[self.CHECKSUM + 1],
916
                    'available': p[self.AVAILABLE + 1],
917
                    'map_check_timestamp': p[self.MAP_CHECK_TIMESTAMP + 1]})
918
        return objects
919

    
920
    @debug_method
921
    @backend_method
922
    def list_object_permissions(self, user, account, container, prefix=''):
923
        """Return a list of paths enforce permissions under a container."""
924

    
925
        return self._list_object_permissions(user, account, container, prefix,
926
                                             True, False)
927

    
928
    @debug_method
929
    @backend_method
930
    def list_object_public(self, user, account, container, prefix=''):
931
        """Return a mapping of object paths to public ids under a container."""
932

    
933
        public = {}
934
        for path, p in self.permissions.public_list('/'.join((account,
935
                                                              container,
936
                                                              prefix))):
937
            public[path] = p
938
        return public
939

    
940
    @debug_method
941
    @backend_method
942
    def get_object_meta(self, user, account, container, name, domain,
943
                        version=None, include_user_defined=True):
944
        """Return a dictionary with the object metadata for the domain."""
945

    
946
        self._can_read_object(user, account, container, name)
947
        path, node = self._lookup_object(account, container, name)
948
        props = self._get_version(node, version)
949
        if version is None:
950
            if not props[self.AVAILABLE]:
951
                try:
952
                    self._update_available(props)
953
                except (NotAllowedError, IllegalOperationError):
954
                    pass  # just update the database
955
                finally:
956
                    # get updated properties
957
                    props = self._get_version(node, version)
958
            modified = props[self.MTIME]
959
        else:
960
            try:
961
                modified = self._get_version(
962
                    node)[self.MTIME]  # Overall last modification.
963
            except NameError:  # Object may be deleted.
964
                del_props = self.node.version_lookup(
965
                    node, inf, CLUSTER_DELETED)
966
                if del_props is None:
967
                    raise ItemNotExists('Object does not exist')
968
                modified = del_props[self.MTIME]
969

    
970
        meta = {}
971
        if include_user_defined:
972
            meta.update(
973
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
974
        meta.update({'name': name,
975
                     'bytes': props[self.SIZE],
976
                     'type': props[self.TYPE],
977
                     'hash': props[self.HASH],
978
                     'version': props[self.SERIAL],
979
                     'version_timestamp': props[self.MTIME],
980
                     'modified': modified,
981
                     'modified_by': props[self.MUSER],
982
                     'uuid': props[self.UUID],
983
                     'checksum': props[self.CHECKSUM],
984
                     'available': props[self.AVAILABLE],
985
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
986
        return meta
987

    
988
    @debug_method
989
    @backend_method
990
    def update_object_meta(self, user, account, container, name, domain, meta,
991
                           replace=False):
992
        """Update object metadata for a domain and return the new version."""
993

    
994
        self._can_write_object(user, account, container, name)
995

    
996
        path, node = self._lookup_object(account, container, name,
997
                                         lock_container=True)
998
        src_version_id, dest_version_id = self._put_metadata(
999
            user, node, domain, meta, replace,
1000
            update_statistics_ancestors_depth=1)
1001
        self._apply_versioning(account, container, src_version_id,
1002
                               update_statistics_ancestors_depth=1)
1003
        return dest_version_id
1004

    
1005
    @debug_method
1006
    @backend_method
1007
    def get_object_permissions_bulk(self, user, account, container, names):
1008
        """Return the action allowed on the object, the path
1009
        from which the object gets its permissions from,
1010
        along with a dictionary containing the permissions."""
1011

    
1012
        permissions_path = self._get_permissions_path_bulk(account, container,
1013
                                                           names)
1014
        access_objects = self.permissions.access_check_bulk(permissions_path,
1015
                                                            user)
1016
        #group_parents = access_objects['group_parents']
1017
        nobject_permissions = {}
1018
        cpath = '/'.join((account, container, ''))
1019
        cpath_idx = len(cpath)
1020
        for path in permissions_path:
1021
            allowed = 1
1022
            name = path[cpath_idx:]
1023
            if user != account:
1024
                try:
1025
                    allowed = access_objects[path]
1026
                except KeyError:
1027
                    raise NotAllowedError
1028
            access_dict, allowed = \
1029
                self.permissions.access_get_for_bulk(access_objects[path])
1030
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
1031
                                         access_dict)
1032
        self._lookup_objects(permissions_path)
1033
        return nobject_permissions
1034

    
1035
    @debug_method
1036
    @backend_method
1037
    def get_object_permissions(self, user, account, container, name):
1038
        """Return the action allowed on the object, the path
1039
        from which the object gets its permissions from,
1040
        along with a dictionary containing the permissions."""
1041

    
1042
        allowed = 'write'
1043
        permissions_path = self._get_permissions_path(account, container, name)
1044
        if user != account:
1045
            if self.permissions.access_check(permissions_path, self.WRITE,
1046
                                             user):
1047
                allowed = 'write'
1048
            elif self.permissions.access_check(permissions_path, self.READ,
1049
                                               user):
1050
                allowed = 'read'
1051
            else:
1052
                raise NotAllowedError
1053
        self._lookup_object(account, container, name)
1054
        return (allowed,
1055
                permissions_path,
1056
                self.permissions.access_get(permissions_path))
1057

    
1058
    @debug_method
1059
    @backend_method
1060
    def update_object_permissions(self, user, account, container, name,
1061
                                  permissions):
1062
        """Update the permissions associated with the object."""
1063

    
1064
        if user != account:
1065
            raise NotAllowedError
1066
        path = self._lookup_object(account, container, name,
1067
                                   lock_container=True)[0]
1068
        self._check_permissions(path, permissions)
1069
        try:
1070
            self.permissions.access_set(path, permissions)
1071
        except:
1072
            raise ValueError
1073
        else:
1074
            self._report_sharing_change(user, account, path, {'members':
1075
                                        self.permissions.access_members(path)})
1076

    
1077
        # remove all the cached allowed paths
1078
        # filtering out only those affected could be more expensive
1079
        self._reset_allowed_paths()
1080

    
1081
    @debug_method
1082
    @backend_method
1083
    def get_object_public(self, user, account, container, name):
1084
        """Return the public id of the object if applicable."""
1085

    
1086
        self._can_read_object(user, account, container, name)
1087
        path = self._lookup_object(account, container, name)[0]
1088
        p = self.permissions.public_get(path)
1089
        return p
1090

    
1091
    @debug_method
1092
    @backend_method
1093
    def update_object_public(self, user, account, container, name, public):
1094
        """Update the public status of the object."""
1095

    
1096
        self._can_write_object(user, account, container, name)
1097
        path = self._lookup_object(account, container, name,
1098
                                   lock_container=True)[0]
1099
        if not public:
1100
            self.permissions.public_unset(path)
1101
        else:
1102
            self.permissions.public_set(
1103
                path, self.public_url_security, self.public_url_alphabet)
1104

    
1105
    def _update_available(self, props):
1106
        """Checks if the object map exists and updates the database"""
1107

    
1108
        if not props[self.AVAILABLE]:
1109
            if props[self.MAP_CHECK_TIMESTAMP]:
1110
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1111
                if elapsed_time < self.map_check_interval:
1112
                    raise NotAllowedError(
1113
                        'Consequent map checks are limited: retry later.')
1114
        try:
1115
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1116
                                                     props[self.SIZE])
1117
        except:  # map does not exist
1118
            # Raising an exception results in db transaction rollback
1119
            # However we have to force the update of the database
1120
            self.wrapper.rollback()  # rollback existing transaction
1121
            self.wrapper.execute()  # start new transaction
1122
            self.node.version_put_property(props[self.SERIAL],
1123
                                           'map_check_timestamp', time())
1124
            self.wrapper.commit()  # commit transaction
1125
            self.wrapper.execute()  # start new transaction
1126
            raise IllegalOperationError(
1127
                'Unable to retrieve Archipelago Volume hashmap.')
1128
        else:  # map exists
1129
            self.node.version_put_property(props[self.SERIAL],
1130
                                           'available', True)
1131
            self.node.version_put_property(props[self.SERIAL],
1132
                                           'map_check_timestamp', time())
1133
            return hashmap
1134

    
1135
    @debug_method
1136
    @backend_method
1137
    def get_object_hashmap(self, user, account, container, name, version=None):
1138
        """Return the object's size and a list with partial hashes."""
1139

    
1140
        self._can_read_object(user, account, container, name)
1141
        path, node = self._lookup_object(account, container, name)
1142
        props = self._get_version(node, version)
1143
        if props[self.HASH] is None:
1144
            return 0, ()
1145
        if props[self.HASH].startswith('archip:'):
1146
            hashmap = self._update_available(props)
1147
            return props[self.SIZE], [x for x in hashmap]
1148
        else:
1149
            hashmap = self.store.map_get(self._unhexlify_hash(
1150
                props[self.HASH]))
1151
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1152

    
1153
    def _update_object_hash(self, user, account, container, name, size, type,
1154
                            hash, checksum, domain, meta, replace_meta,
1155
                            permissions, src_node=None, src_version_id=None,
1156
                            is_copy=False, report_size_change=True,
1157
                            available=True):
1158
        if permissions is not None and user != account:
1159
            raise NotAllowedError
1160
        self._can_write_object(user, account, container, name)
1161
        if permissions is not None:
1162
            path = '/'.join((account, container, name))
1163
            self._check_permissions(path, permissions)
1164

    
1165
        account_path, account_node = self._lookup_account(account, True)
1166
        container_path, container_node = self._lookup_container(
1167
            account, container)
1168

    
1169
        path, node = self._put_object_node(
1170
            container_path, container_node, name)
1171
        pre_version_id, dest_version_id = self._put_version_duplicate(
1172
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1173
            checksum=checksum, is_copy=is_copy,
1174
            update_statistics_ancestors_depth=1,
1175
            available=available, keep_available=False)
1176

    
1177
        # Handle meta.
1178
        if src_version_id is None:
1179
            src_version_id = pre_version_id
1180
        self._put_metadata_duplicate(
1181
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1182

    
1183
        del_size = self._apply_versioning(account, container, pre_version_id,
1184
                                          update_statistics_ancestors_depth=1)
1185
        size_delta = size - del_size
1186
        if size_delta > 0:
1187
            # Check account quota.
1188
            if not self.using_external_quotaholder:
1189
                account_quota = long(self._get_policy(
1190
                    account_node, is_account_policy=True)['quota'])
1191
                account_usage = self._get_statistics(account_node,
1192
                                                     compute=True)[1]
1193
                if (account_quota > 0 and account_usage > account_quota):
1194
                    raise QuotaError(
1195
                        'Account quota exceeded: limit: %s, usage: %s' % (
1196
                            account_quota, account_usage))
1197

    
1198
            # Check container quota.
1199
            container_quota = long(self._get_policy(
1200
                container_node, is_account_policy=False)['quota'])
1201
            container_usage = self._get_statistics(container_node)[1]
1202
            if (container_quota > 0 and container_usage > container_quota):
1203
                # This must be executed in a transaction, so the version is
1204
                # never created if it fails.
1205
                raise QuotaError(
1206
                    'Container quota exceeded: limit: %s, usage: %s' % (
1207
                        container_quota, container_usage
1208
                    )
1209
                )
1210

    
1211
        if report_size_change:
1212
            self._report_size_change(
1213
                user, account, size_delta,
1214
                {'action': 'object update', 'path': path,
1215
                 'versions': ','.join([str(dest_version_id)])})
1216
        if permissions is not None:
1217
            self.permissions.access_set(path, permissions)
1218
            self._report_sharing_change(
1219
                user, account, path,
1220
                {'members': self.permissions.access_members(path)})
1221

    
1222
        self._report_object_change(
1223
            user, account, path,
1224
            details={'version': dest_version_id, 'action': 'object update'})
1225
        return dest_version_id
1226

    
1227
    @debug_method
1228
    @backend_method
1229
    def register_object_map(self, user, account, container, name, size, type,
1230
                            mapfile, checksum='', domain='pithos', meta=None,
1231
                            replace_meta=False, permissions=None):
1232
        """Register an object mapfile without providing any data.
1233

1234
        Lock the container path, create a node pointing to the object path,
1235
        create a version pointing to the mapfile
1236
        and issue the size change in the quotaholder.
1237

1238
        :param user: the user account which performs the action
1239

1240
        :param account: the account under which the object resides
1241

1242
        :param container: the container under which the object resides
1243

1244
        :param name: the object name
1245

1246
        :param size: the object size
1247

1248
        :param type: the object mimetype
1249

1250
        :param mapfile: the mapfile pointing to the object data
1251

1252
        :param checkcum: the md5 checksum (optional)
1253

1254
        :param domain: the object domain
1255

1256
        :param meta: a dict with custom object metadata
1257

1258
        :param replace_meta: replace existing metadata or not
1259

1260
        :param permissions: a dict with the read and write object permissions
1261

1262
        :returns: the new object uuid
1263

1264
        :raises: ItemNotExists, NotAllowedError, QuotaError
1265
        """
1266

    
1267
        meta = meta or {}
1268
        try:
1269
            self.lock_container_path = True
1270
            self.put_container(user, account, container, policy=None)
1271
        except ContainerExists:
1272
            pass
1273
        finally:
1274
            self.lock_container_path = False
1275
        dest_version_id = self._update_object_hash(
1276
            user, account, container, name, size, type, mapfile, checksum,
1277
            domain, meta, replace_meta, permissions, available=False)
1278
        return self.node.version_get_properties(dest_version_id,
1279
                                                keys=('uuid',))[0]
1280

    
1281
    @debug_method
1282
    def update_object_hashmap(self, user, account, container, name, size, type,
1283
                              hashmap, checksum, domain, meta=None,
1284
                              replace_meta=False, permissions=None):
1285
        """Create/update an object's hashmap and return the new version."""
1286

    
1287
        for h in hashmap:
1288
            if h.startswith('archip_'):
1289
                raise IllegalOperationError(
1290
                    'Cannot update Archipelago Volume hashmap.')
1291
        meta = meta or {}
1292
        if size == 0:  # No such thing as an empty hashmap.
1293
            hashmap = [self.put_block('')]
1294
        map = HashMap(self.block_size, self.hash_algorithm)
1295
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1296
        missing = self.store.block_search(map)
1297
        if missing:
1298
            ie = IndexError()
1299
            ie.data = [binascii.hexlify(x) for x in missing]
1300
            raise ie
1301

    
1302
        hash = map.hash()
1303
        hexlified = binascii.hexlify(hash)
1304
        # _update_object_hash() locks destination path
1305
        dest_version_id = self._update_object_hash(
1306
            user, account, container, name, size, type, hexlified, checksum,
1307
            domain, meta, replace_meta, permissions)
1308
        self.store.map_put(hash, map)
1309
        return dest_version_id, hexlified
1310

    
1311
    @debug_method
1312
    @backend_method
1313
    def update_object_checksum(self, user, account, container, name, version,
1314
                               checksum):
1315
        """Update an object's checksum."""
1316

    
1317
        # Update objects with greater version and same hashmap
1318
        # and size (fix metadata updates).
1319
        self._can_write_object(user, account, container, name)
1320
        path, node = self._lookup_object(account, container, name,
1321
                                         lock_container=True)
1322
        props = self._get_version(node, version)
1323
        versions = self.node.node_get_versions(node)
1324
        for x in versions:
1325
            if (x[self.SERIAL] >= int(version) and
1326
                x[self.HASH] == props[self.HASH] and
1327
                    x[self.SIZE] == props[self.SIZE]):
1328
                self.node.version_put_property(
1329
                    x[self.SERIAL], 'checksum', checksum)
1330

    
1331
    def _copy_object(self, user, src_account, src_container, src_name,
1332
                     dest_account, dest_container, dest_name, type,
1333
                     dest_domain=None, dest_meta=None, replace_meta=False,
1334
                     permissions=None, src_version=None, is_move=False,
1335
                     delimiter=None):
1336

    
1337
        report_size_change = not is_move
1338
        dest_meta = dest_meta or {}
1339
        dest_version_ids = []
1340
        self._can_read_object(user, src_account, src_container, src_name)
1341

    
1342
        src_container_path = '/'.join((src_account, src_container))
1343
        dest_container_path = '/'.join((dest_account, dest_container))
1344
        # Lock container paths in alphabetical order
1345
        if src_container_path < dest_container_path:
1346
            self._lookup_container(src_account, src_container)
1347
            self._lookup_container(dest_account, dest_container)
1348
        else:
1349
            self._lookup_container(dest_account, dest_container)
1350
            self._lookup_container(src_account, src_container)
1351

    
1352
        path, node = self._lookup_object(src_account, src_container, src_name)
1353
        # TODO: Will do another fetch of the properties in duplicate version...
1354
        props = self._get_version(
1355
            node, src_version)  # Check to see if source exists.
1356
        src_version_id = props[self.SERIAL]
1357
        hash = props[self.HASH]
1358
        size = props[self.SIZE]
1359
        is_copy = not is_move and (src_account, src_container, src_name) != (
1360
            dest_account, dest_container, dest_name)  # New uuid.
1361
        dest_version_ids.append(self._update_object_hash(
1362
            user, dest_account, dest_container, dest_name, size, type, hash,
1363
            None, dest_domain, dest_meta, replace_meta, permissions,
1364
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1365
            report_size_change=report_size_change))
1366
        if is_move and ((src_account, src_container, src_name) !=
1367
                        (dest_account, dest_container, dest_name)):
1368
            self._delete_object(user, src_account, src_container, src_name,
1369
                                report_size_change=report_size_change)
1370

    
1371
        if delimiter:
1372
            prefix = (src_name + delimiter if not
1373
                      src_name.endswith(delimiter) else src_name)
1374
            src_names = self._list_objects_no_limit(
1375
                user, src_account, src_container, prefix, delimiter=None,
1376
                virtual=False, domain=None, keys=[], shared=False, until=None,
1377
                size_range=None, all_props=True, public=False)
1378
            src_names.sort(key=lambda x: x[2])  # order by nodes
1379
            paths = [elem[0] for elem in src_names]
1380
            nodes = [elem[2] for elem in src_names]
1381
            # TODO: Will do another fetch of the properties
1382
            # in duplicate version...
1383
            props = self._get_versions(nodes)  # Check to see if source exists.
1384

    
1385
            for prop, path, node in zip(props, paths, nodes):
1386
                src_version_id = prop[self.SERIAL]
1387
                hash = prop[self.HASH]
1388
                vtype = prop[self.TYPE]
1389
                size = prop[self.SIZE]
1390
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1391
                    delimiter) else dest_name
1392
                vdest_name = path.replace(prefix, dest_prefix, 1)
1393
                # _update_object_hash() locks destination path
1394
                dest_version_ids.append(self._update_object_hash(
1395
                    user, dest_account, dest_container, vdest_name, size,
1396
                    vtype, hash, None, dest_domain, meta={},
1397
                    replace_meta=False, permissions=None, src_node=node,
1398
                    src_version_id=src_version_id, is_copy=is_copy,
1399
                    report_size_change=report_size_change))
1400
                if is_move and ((src_account, src_container, src_name) !=
1401
                                (dest_account, dest_container, dest_name)):
1402
                    self._delete_object(user, src_account, src_container, path,
1403
                                        report_size_change=report_size_change)
1404
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1405
                dest_version_ids)
1406

    
1407
    @debug_method
1408
    @backend_method
1409
    def copy_object(self, user, src_account, src_container, src_name,
1410
                    dest_account, dest_container, dest_name, type, domain,
1411
                    meta=None, replace_meta=False, permissions=None,
1412
                    src_version=None, delimiter=None):
1413
        """Copy an object's data and metadata."""
1414

    
1415
        meta = meta or {}
1416
        dest_version_id = self._copy_object(
1417
            user, src_account, src_container, src_name, dest_account,
1418
            dest_container, dest_name, type, domain, meta, replace_meta,
1419
            permissions, src_version, False, delimiter)
1420
        return dest_version_id
1421

    
1422
    @debug_method
1423
    @backend_method
1424
    def move_object(self, user, src_account, src_container, src_name,
1425
                    dest_account, dest_container, dest_name, type, domain,
1426
                    meta=None, replace_meta=False, permissions=None,
1427
                    delimiter=None):
1428
        """Move an object's data and metadata."""
1429

    
1430
        meta = meta or {}
1431
        if user != src_account:
1432
            raise NotAllowedError
1433
        dest_version_id = self._move_object(
1434
            user, src_account, src_container, src_name, dest_account,
1435
            dest_container, dest_name, type, domain, meta, replace_meta,
1436
            permissions, None, delimiter=delimiter)
1437
        return dest_version_id
1438

    
1439
    def _delete_object(self, user, account, container, name, until=None,
1440
                       delimiter=None, report_size_change=True):
1441
        if user != account:
1442
            raise NotAllowedError
1443

    
1444
        # lookup object and lock container path also
1445
        path, node = self._lookup_object(account, container, name,
1446
                                         lock_container=True)
1447

    
1448
        if until is not None:
1449
            if node is None:
1450
                return
1451
            hashes = []
1452
            size = 0
1453
            serials = []
1454
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1455
                                           update_statistics_ancestors_depth=1)
1456
            hashes += h
1457
            size += s
1458
            serials += v
1459
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1460
                                           update_statistics_ancestors_depth=1)
1461
            hashes += h
1462
            if not self.free_versioning:
1463
                size += s
1464
            serials += v
1465
            for h in hashes:
1466
                self.store.map_delete(h)
1467
            self.node.node_purge(node, until, CLUSTER_DELETED,
1468
                                 update_statistics_ancestors_depth=1)
1469
            try:
1470
                self._get_version(node)
1471
            except NameError:
1472
                self.permissions.access_clear(path)
1473
            self._report_size_change(
1474
                user, account, -size, {
1475
                    'action': 'object purge',
1476
                    'path': path,
1477
                    'versions': ','.join(str(i) for i in serials)
1478
                }
1479
            )
1480
            return
1481

    
1482
        if not self._exists(node):
1483
            raise ItemNotExists('Object is deleted.')
1484

    
1485
        src_version_id, dest_version_id = self._put_version_duplicate(
1486
            user, node, size=0, type='', hash=None, checksum='',
1487
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1488
        del_size = self._apply_versioning(account, container, src_version_id,
1489
                                          update_statistics_ancestors_depth=1)
1490
        if report_size_change:
1491
            self._report_size_change(
1492
                user, account, -del_size,
1493
                {'action': 'object delete',
1494
                 'path': path,
1495
                 'versions': ','.join([str(dest_version_id)])})
1496
        self._report_object_change(
1497
            user, account, path, details={'action': 'object delete'})
1498
        self.permissions.access_clear(path)
1499

    
1500
        if delimiter:
1501
            prefix = name + delimiter if not name.endswith(delimiter) else name
1502
            src_names = self._list_objects_no_limit(
1503
                user, account, container, prefix, delimiter=None,
1504
                virtual=False, domain=None, keys=[], shared=False, until=None,
1505
                size_range=None, all_props=True, public=False)
1506
            paths = []
1507
            for t in src_names:
1508
                path = '/'.join((account, container, t[0]))
1509
                node = t[2]
1510
                if not self._exists(node):
1511
                    continue
1512
                src_version_id, dest_version_id = self._put_version_duplicate(
1513
                    user, node, size=0, type='', hash=None, checksum='',
1514
                    cluster=CLUSTER_DELETED,
1515
                    update_statistics_ancestors_depth=1)
1516
                del_size = self._apply_versioning(
1517
                    account, container, src_version_id,
1518
                    update_statistics_ancestors_depth=1)
1519
                if report_size_change:
1520
                    self._report_size_change(
1521
                        user, account, -del_size,
1522
                        {'action': 'object delete',
1523
                         'path': path,
1524
                         'versions': ','.join([str(dest_version_id)])})
1525
                self._report_object_change(
1526
                    user, account, path, details={'action': 'object delete'})
1527
                paths.append(path)
1528
            self.permissions.access_clear_bulk(paths)
1529

    
1530
        # remove all the cached allowed paths
1531
        # removing the specific path could be more expensive
1532
        self._reset_allowed_paths()
1533

    
1534
    @debug_method
1535
    @backend_method
1536
    def delete_object(self, user, account, container, name, until=None,
1537
                      prefix='', delimiter=None):
1538
        """Delete/purge an object."""
1539

    
1540
        self._delete_object(user, account, container, name, until, delimiter)
1541

    
1542
    @debug_method
1543
    @backend_method
1544
    def list_versions(self, user, account, container, name):
1545
        """Return a list of all object (version, version_timestamp) tuples."""
1546

    
1547
        self._can_read_object(user, account, container, name)
1548
        path, node = self._lookup_object(account, container, name)
1549
        versions = self.node.node_get_versions(node)
1550
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1551
                x[self.CLUSTER] != CLUSTER_DELETED]
1552

    
1553
    @debug_method
1554
    @backend_method
1555
    def get_uuid(self, user, uuid, check_permissions=True):
1556
        """Return the (account, container, name) for the UUID given."""
1557

    
1558
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1559
        if info is None:
1560
            raise NameError
1561
        path, serial = info
1562
        account, container, name = path.split('/', 2)
1563
        if check_permissions:
1564
            self._can_read_object(user, account, container, name)
1565
        return (account, container, name)
1566

    
1567
    @debug_method
1568
    @backend_method
1569
    def get_public(self, user, public):
1570
        """Return the (account, container, name) for the public id given."""
1571

    
1572
        path = self.permissions.public_path(public)
1573
        if path is None:
1574
            raise NameError
1575
        account, container, name = path.split('/', 2)
1576
        self._can_read_object(user, account, container, name)
1577
        return (account, container, name)
1578

    
1579
    def get_block(self, hash):
1580
        """Return a block's data."""
1581

    
1582
        logger.debug("get_block: %s", hash)
1583
        if hash.startswith('archip_'):
1584
            block = self.store.block_get_archipelago(hash)
1585
        else:
1586
            block = self.store.block_get(self._unhexlify_hash(hash))
1587
        if not block:
1588
            raise ItemNotExists('Block does not exist')
1589
        return block
1590

    
1591
    def put_block(self, data):
1592
        """Store a block and return the hash."""
1593

    
1594
        logger.debug("put_block: %s", len(data))
1595
        return binascii.hexlify(self.store.block_put(data))
1596

    
1597
    def update_block(self, hash, data, offset=0):
1598
        """Update a known block and return the hash."""
1599

    
1600
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1601
        if hash.startswith('archip_'):
1602
            raise IllegalOperationError(
1603
                'Cannot update an Archipelago Volume block.')
1604
        if offset == 0 and len(data) == self.block_size:
1605
            return self.put_block(data)
1606
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1607
        return binascii.hexlify(h)
1608

    
1609
    # Path functions.
1610

    
1611
    def _generate_uuid(self):
1612
        return str(uuidlib.uuid4())
1613

    
1614
    def _put_object_node(self, path, parent, name):
1615
        path = '/'.join((path, name))
1616
        node = self.node.node_lookup(path)
1617
        if node is None:
1618
            node = self.node.node_create(parent, path)
1619
        return path, node
1620

    
1621
    def _put_path(self, user, parent, path,
1622
                  update_statistics_ancestors_depth=None):
1623
        node = self.node.node_create(parent, path)
1624
        self.node.version_create(node, None, 0, '', None, user,
1625
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1626
                                 update_statistics_ancestors_depth)
1627
        return node
1628

    
1629
    def _lookup_account(self, account, create=True):
1630
        node = self.node.node_lookup(account)
1631
        if node is None and create:
1632
            node = self._put_path(
1633
                account, self.ROOTNODE, account,
1634
                update_statistics_ancestors_depth=-1)  # User is account.
1635
        return account, node
1636

    
1637
    def _lookup_container(self, account, container):
1638
        for_update = True if self.lock_container_path else False
1639
        path = '/'.join((account, container))
1640
        node = self.node.node_lookup(path, for_update)
1641
        if node is None:
1642
            raise ItemNotExists('Container does not exist')
1643
        return path, node
1644

    
1645
    def _lookup_object(self, account, container, name, lock_container=False):
1646
        if lock_container:
1647
            self._lookup_container(account, container)
1648

    
1649
        path = '/'.join((account, container, name))
1650
        node = self.node.node_lookup(path)
1651
        if node is None:
1652
            raise ItemNotExists('Object does not exist')
1653
        return path, node
1654

    
1655
    def _lookup_objects(self, paths):
1656
        nodes = self.node.node_lookup_bulk(paths)
1657
        return paths, nodes
1658

    
1659
    def _get_properties(self, node, until=None):
1660
        """Return properties until the timestamp given."""
1661

    
1662
        before = until if until is not None else inf
1663
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1664
        if props is None and until is not None:
1665
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1666
        if props is None:
1667
            raise ItemNotExists('Path does not exist')
1668
        return props
1669

    
1670
    def _get_statistics(self, node, until=None, compute=False):
1671
        """Return (count, sum of size, timestamp) of everything under node."""
1672

    
1673
        if until is not None:
1674
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1675
        elif compute:
1676
            stats = self.node.statistics_latest(node,
1677
                                                except_cluster=CLUSTER_DELETED)
1678
        else:
1679
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1680
        if stats is None:
1681
            stats = (0, 0, 0)
1682
        return stats
1683

    
1684
    def _get_version(self, node, version=None):
1685
        if version is None:
1686
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1687
            if props is None:
1688
                raise ItemNotExists('Object does not exist')
1689
        else:
1690
            try:
1691
                version = int(version)
1692
            except ValueError:
1693
                raise VersionNotExists('Version does not exist')
1694
            props = self.node.version_get_properties(version, node=node)
1695
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1696
                raise VersionNotExists('Version does not exist')
1697
        return props
1698

    
1699
    def _get_versions(self, nodes):
1700
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1701

    
1702
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1703
                               type=None, hash=None, checksum=None,
1704
                               cluster=CLUSTER_NORMAL, is_copy=False,
1705
                               update_statistics_ancestors_depth=None,
1706
                               available=True, keep_available=True):
1707
        """Create a new version of the node."""
1708

    
1709
        props = self.node.version_lookup(
1710
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1711
        if props is not None:
1712
            src_version_id = props[self.SERIAL]
1713
            src_hash = props[self.HASH]
1714
            src_size = props[self.SIZE]
1715
            src_type = props[self.TYPE]
1716
            src_checksum = props[self.CHECKSUM]
1717
            if keep_available:
1718
                src_available = props[self.AVAILABLE]
1719
                src_map_check_timestamp = props[self.MAP_CHECK_TIMESTAMP]
1720
            else:
1721
                src_available = available
1722
                src_map_check_timestamp = None
1723
        else:
1724
            src_version_id = None
1725
            src_hash = None
1726
            src_size = 0
1727
            src_type = ''
1728
            src_checksum = ''
1729
            src_available = available
1730
            src_map_check_timestamp = None
1731
        if size is None:  # Set metadata.
1732
            hash = src_hash  # This way hash can be set to None
1733
                             # (account or container).
1734
            size = src_size
1735
        if type is None:
1736
            type = src_type
1737
        if checksum is None:
1738
            checksum = src_checksum
1739
        uuid = self._generate_uuid(
1740
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1741

    
1742
        if src_node is None:
1743
            pre_version_id = src_version_id
1744
        else:
1745
            pre_version_id = None
1746
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1747
            if props is not None:
1748
                pre_version_id = props[self.SERIAL]
1749
        if pre_version_id is not None:
1750
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1751
                                        update_statistics_ancestors_depth)
1752

    
1753
        dest_version_id, mtime = self.node.version_create(
1754
            node, hash, size, type, src_version_id, user, uuid, checksum,
1755
            cluster, update_statistics_ancestors_depth,
1756
            available=src_available,
1757
            map_check_timestamp=src_map_check_timestamp)
1758

    
1759
        self.node.attribute_unset_is_latest(node, dest_version_id)
1760

    
1761
        return pre_version_id, dest_version_id
1762

    
1763
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1764
                                node, meta, replace=False):
1765
        if src_version_id is not None:
1766
            self.node.attribute_copy(src_version_id, dest_version_id)
1767
        if not replace:
1768
            self.node.attribute_del(dest_version_id, domain, (
1769
                k for k, v in meta.iteritems() if v == ''))
1770
            self.node.attribute_set(dest_version_id, domain, node, (
1771
                (k, v) for k, v in meta.iteritems() if v != ''))
1772
        else:
1773
            self.node.attribute_del(dest_version_id, domain)
1774
            self.node.attribute_set(dest_version_id, domain, node, ((
1775
                k, v) for k, v in meta.iteritems()))
1776

    
1777
    def _put_metadata(self, user, node, domain, meta, replace=False,
1778
                      update_statistics_ancestors_depth=None):
1779
        """Create a new version and store metadata."""
1780

    
1781
        src_version_id, dest_version_id = self._put_version_duplicate(
1782
            user, node,
1783
            update_statistics_ancestors_depth=
1784
            update_statistics_ancestors_depth)
1785
        self._put_metadata_duplicate(
1786
            src_version_id, dest_version_id, domain, node, meta, replace)
1787
        return src_version_id, dest_version_id
1788

    
1789
    def _list_limits(self, listing, marker, limit):
1790
        start = 0
1791
        if marker:
1792
            try:
1793
                start = listing.index(marker) + 1
1794
            except ValueError:
1795
                pass
1796
        if not limit or limit > 10000:
1797
            limit = 10000
1798
        return start, limit
1799

    
1800
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1801
                                marker=None, limit=10000, virtual=True,
1802
                                domain=None, keys=None, until=None,
1803
                                size_range=None, allowed=None,
1804
                                all_props=False):
1805
        keys = keys or []
1806
        allowed = allowed or []
1807
        cont_prefix = path + '/'
1808
        prefix = cont_prefix + prefix
1809
        start = cont_prefix + marker if marker else None
1810
        before = until if until is not None else inf
1811
        filterq = keys if domain else []
1812
        sizeq = size_range
1813

    
1814
        objects, prefixes = self.node.latest_version_list(
1815
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1816
            allowed, domain, filterq, sizeq, all_props)
1817
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1818
        objects.sort(key=lambda x: x[0])
1819
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1820
        return objects
1821

    
1822
    # Reporting functions.
1823

    
1824
    @debug_method
1825
    @backend_method
1826
    def _report_size_change(self, user, account, size, details=None):
1827
        details = details or {}
1828

    
1829
        if size == 0:
1830
            return
1831

    
1832
        account_node = self._lookup_account(account, True)[1]
1833
        total = self._get_statistics(account_node, compute=True)[1]
1834
        details.update({'user': user, 'total': total})
1835
        self.messages.append(
1836
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1837
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1838

    
1839
        if not self.using_external_quotaholder:
1840
            return
1841

    
1842
        try:
1843
            name = details['path'] if 'path' in details else ''
1844
            serial = self.astakosclient.issue_one_commission(
1845
                holder=account,
1846
                source=DEFAULT_SOURCE,
1847
                provisions={'pithos.diskspace': size},
1848
                name=name)
1849
        except BaseException, e:
1850
            raise QuotaError(e)
1851
        else:
1852
            self.serials.append(serial)
1853

    
1854
    @debug_method
1855
    @backend_method
1856
    def _report_object_change(self, user, account, path, details=None):
1857
        details = details or {}
1858
        details.update({'user': user})
1859
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1860
                              account, QUEUE_INSTANCE_ID, 'object', path,
1861
                              details))
1862

    
1863
    @debug_method
1864
    @backend_method
1865
    def _report_sharing_change(self, user, account, path, details=None):
1866
        details = details or {}
1867
        details.update({'user': user})
1868
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1869
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1870
                              details))
1871

    
1872
    # Policy functions.
1873

    
1874
    def _check_policy(self, policy, is_account_policy=True):
1875
        default_policy = self.default_account_policy \
1876
            if is_account_policy else self.default_container_policy
1877
        for k in policy.keys():
1878
            if policy[k] == '':
1879
                policy[k] = default_policy.get(k)
1880
        for k, v in policy.iteritems():
1881
            if k == 'quota':
1882
                q = int(v)  # May raise ValueError.
1883
                if q < 0:
1884
                    raise ValueError
1885
            elif k == 'versioning':
1886
                if v not in ['auto', 'none']:
1887
                    raise ValueError
1888
            else:
1889
                raise ValueError
1890

    
1891
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1892
        default_policy = self.default_account_policy \
1893
            if is_account_policy else self.default_container_policy
1894
        if replace:
1895
            for k, v in default_policy.iteritems():
1896
                if k not in policy:
1897
                    policy[k] = v
1898
        self.node.policy_set(node, policy)
1899

    
1900
    def _get_policy(self, node, is_account_policy=True):
1901
        default_policy = self.default_account_policy \
1902
            if is_account_policy else self.default_container_policy
1903
        policy = default_policy.copy()
1904
        policy.update(self.node.policy_get(node))
1905
        return policy
1906

    
1907
    def _apply_versioning(self, account, container, version_id,
1908
                          update_statistics_ancestors_depth=None):
1909
        """Delete the provided version if such is the policy.
1910
           Return size of object removed.
1911
        """
1912

    
1913
        if version_id is None:
1914
            return 0
1915
        path, node = self._lookup_container(account, container)
1916
        versioning = self._get_policy(
1917
            node, is_account_policy=False)['versioning']
1918
        if versioning != 'auto':
1919
            hash, size = self.node.version_remove(
1920
                version_id, update_statistics_ancestors_depth)
1921
            self.store.map_delete(hash)
1922
            return size
1923
        elif self.free_versioning:
1924
            return self.node.version_get_properties(
1925
                version_id, keys=('size',))[0]
1926
        return 0
1927

    
1928
    # Access control functions.
1929

    
1930
    def _check_groups(self, groups):
1931
        # raise ValueError('Bad characters in groups')
1932
        pass
1933

    
1934
    def _check_permissions(self, path, permissions):
1935
        # raise ValueError('Bad characters in permissions')
1936
        pass
1937

    
1938
    def _get_formatted_paths(self, paths):
1939
        formatted = []
1940
        if len(paths) == 0:
1941
            return formatted
1942
        props = self.node.get_props(paths)
1943
        if props:
1944
            for prop in props:
1945
                if prop[1].split(';', 1)[0].strip() in (
1946
                        'application/directory', 'application/folder'):
1947
                    formatted.append((prop[0].rstrip('/') + '/',
1948
                                      self.MATCH_PREFIX))
1949
                formatted.append((prop[0], self.MATCH_EXACT))
1950
        return formatted
1951

    
1952
    def _get_permissions_path(self, account, container, name):
1953
        path = '/'.join((account, container, name))
1954
        permission_paths = self.permissions.access_inherit(path)
1955
        permission_paths.sort()
1956
        permission_paths.reverse()
1957
        for p in permission_paths:
1958
            if p == path:
1959
                return p
1960
            else:
1961
                if p.count('/') < 2:
1962
                    continue
1963
                node = self.node.node_lookup(p)
1964
                props = None
1965
                if node is not None:
1966
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1967
                if props is not None:
1968
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1969
                            'application/directory', 'application/folder'):
1970
                        return p
1971
        return None
1972

    
1973
    def _get_permissions_path_bulk(self, account, container, names):
1974
        formatted_paths = []
1975
        for name in names:
1976
            path = '/'.join((account, container, name))
1977
            formatted_paths.append(path)
1978
        permission_paths = self.permissions.access_inherit_bulk(
1979
            formatted_paths)
1980
        permission_paths.sort()
1981
        permission_paths.reverse()
1982
        permission_paths_list = []
1983
        lookup_list = []
1984
        for p in permission_paths:
1985
            if p in formatted_paths:
1986
                permission_paths_list.append(p)
1987
            else:
1988
                if p.count('/') < 2:
1989
                    continue
1990
                lookup_list.append(p)
1991

    
1992
        if len(lookup_list) > 0:
1993
            props = self.node.get_props(lookup_list)
1994
            if props:
1995
                for prop in props:
1996
                    if prop[1].split(';', 1)[0].strip() in (
1997
                            'application/directory', 'application/folder'):
1998
                        permission_paths_list.append(prop[0])
1999

    
2000
        if len(permission_paths_list) > 0:
2001
            return permission_paths_list
2002

    
2003
        return None
2004

    
2005
    def _reset_allowed_paths(self):
2006
        self.read_allowed_paths = defaultdict(set)
2007
        self.write_allowed_paths = defaultdict(set)
2008

    
2009
    @check_allowed_paths(action=0)
2010
    def _can_read_account(self, user, account):
2011
        if user != account:
2012
            if account not in self._allowed_accounts(user):
2013
                raise NotAllowedError
2014

    
2015
    @check_allowed_paths(action=1)
2016
    def _can_write_account(self, user, account):
2017
        if user != account:
2018
            raise NotAllowedError
2019

    
2020
    @check_allowed_paths(action=0)
2021
    def _can_read_container(self, user, account, container):
2022
        if user != account:
2023
            if container not in self._allowed_containers(user, account):
2024
                raise NotAllowedError
2025

    
2026
    @check_allowed_paths(action=1)
2027
    def _can_write_container(self, user, account, container):
2028
        if user != account:
2029
            raise NotAllowedError
2030

    
2031
    @check_allowed_paths(action=0)
2032
    def _can_read_object(self, user, account, container, name):
2033
        if user == account:
2034
            return True
2035
        path = '/'.join((account, container, name))
2036
        if self.permissions.public_get(path) is not None:
2037
            return True
2038
        path = self._get_permissions_path(account, container, name)
2039
        if not path:
2040
            raise NotAllowedError
2041
        if (not self.permissions.access_check(path, self.READ, user) and not
2042
                self.permissions.access_check(path, self.WRITE, user)):
2043
            raise NotAllowedError
2044

    
2045
    @check_allowed_paths(action=1)
2046
    def _can_write_object(self, user, account, container, name):
2047
        if user == account:
2048
            return True
2049
        path = '/'.join((account, container, name))
2050
        path = self._get_permissions_path(account, container, name)
2051
        if not path:
2052
            raise NotAllowedError
2053
        if not self.permissions.access_check(path, self.WRITE, user):
2054
            raise NotAllowedError
2055

    
2056
    def _allowed_accounts(self, user):
2057
        allow = set()
2058
        for path in self.permissions.access_list_paths(user):
2059
            p = path.split('/', 1)[0]
2060
            allow.add(p)
2061
        self.read_allowed_paths[user] |= allow
2062
        return sorted(allow)
2063

    
2064
    def _allowed_containers(self, user, account):
2065
        allow = set()
2066
        for path in self.permissions.access_list_paths(user, account):
2067
            p = path.split('/', 2)[1]
2068
            allow.add(p)
2069
        self.read_allowed_paths[user] |= allow
2070
        return sorted(allow)
2071

    
2072
    # Domain functions
2073

    
2074
    @debug_method
2075
    @backend_method
2076
    def get_domain_objects(self, domain, user=None):
2077
        allowed_paths = self.permissions.access_list_paths(
2078
            user, include_owned=user is not None, include_containers=False)
2079
        if not allowed_paths:
2080
            return []
2081
        obj_list = self.node.domain_object_list(
2082
            domain, allowed_paths, CLUSTER_NORMAL)
2083
        return [(path,
2084
                 self._build_metadata(props, user_defined_meta),
2085
                 self.permissions.access_get(path)) for
2086
                path, props, user_defined_meta in obj_list]
2087

    
2088
    # util functions
2089

    
2090
    def _build_metadata(self, props, user_defined=None,
2091
                        include_user_defined=True):
2092
        meta = {'bytes': props[self.SIZE],
2093
                'type': props[self.TYPE],
2094
                'hash': props[self.HASH],
2095
                'version': props[self.SERIAL],
2096
                'version_timestamp': props[self.MTIME],
2097
                'modified_by': props[self.MUSER],
2098
                'uuid': props[self.UUID],
2099
                'checksum': props[self.CHECKSUM]}
2100
        if include_user_defined and user_defined is not None:
2101
            meta.update(user_defined)
2102
        return meta
2103

    
2104
    def _exists(self, node):
2105
        try:
2106
            self._get_version(node)
2107
        except ItemNotExists:
2108
            return False
2109
        else:
2110
            return True
2111

    
2112
    def _unhexlify_hash(self, hash):
2113
        try:
2114
            return binascii.unhexlify(hash)
2115
        except TypeError:
2116
            raise InvalidHash(hash)