Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ fac5f6be

History | View | Annotate | Download (75.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash, IllegalOperationError)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
124

    
125
logger = logging.getLogger(__name__)
126

    
127

    
128
def backend_method(func):
129
    @wraps(func)
130
    def wrapper(self, *args, **kw):
131
        # if we are inside a database transaction
132
        # just proceed with the method execution
133
        # otherwise manage a new transaction
134
        if self.in_transaction:
135
            return func(self, *args, **kw)
136

    
137
        try:
138
            self.pre_exec()
139
            result = func(self, *args, **kw)
140
            success_status = True
141
            return result
142
        except:
143
            success_status = False
144
            raise
145
        finally:
146
            self.post_exec(success_status)
147
    return wrapper
148

    
149

    
150
def debug_method(func):
151
    @wraps(func)
152
    def wrapper(self, *args, **kw):
153
        try:
154
            result = func(self, *args, **kw)
155
            return result
156
        except:
157
            result = format_exc()
158
            raise
159
        finally:
160
            all_args = map(repr, args)
161
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
162
            logger.debug(">>> %s(%s) <<< %s" % (
163
                func.__name__, ', '.join(all_args).rstrip(', '), result))
164
    return wrapper
165

    
166

    
167
def list_method(func):
168
    @wraps(func)
169
    def wrapper(self, *args, **kw):
170
        marker = kw.get('marker')
171
        limit = kw.get('limit')
172
        result = func(self, *args, **kw)
173
        start, limit = self._list_limits(result, marker, limit)
174
        return result[start:start + limit]
175
    return wrapper
176

    
177

    
178
class ModularBackend(BaseBackend):
179
    """A modular backend.
180

181
    Uses modules for SQL functions and storage.
182
    """
183

    
184
    def __init__(self, db_module=None, db_connection=None,
185
                 block_module=None, block_path=None, block_umask=None,
186
                 block_size=None, hash_algorithm=None,
187
                 queue_module=None, queue_hosts=None, queue_exchange=None,
188
                 astakos_auth_url=None, service_token=None,
189
                 astakosclient_poolsize=None,
190
                 free_versioning=True, block_params=None,
191
                 public_url_security=None,
192
                 public_url_alphabet=None,
193
                 account_quota_policy=None,
194
                 container_quota_policy=None,
195
                 container_versioning_policy=None):
196
        db_module = db_module or DEFAULT_DB_MODULE
197
        db_connection = db_connection or DEFAULT_DB_CONNECTION
198
        block_module = block_module or DEFAULT_BLOCK_MODULE
199
        block_path = block_path or DEFAULT_BLOCK_PATH
200
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
201
        block_params = block_params or DEFAULT_BLOCK_PARAMS
202
        block_size = block_size or DEFAULT_BLOCK_SIZE
203
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
204
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
205
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
206
        container_quota_policy = container_quota_policy \
207
            or DEFAULT_CONTAINER_QUOTA
208
        container_versioning_policy = container_versioning_policy \
209
            or DEFAULT_CONTAINER_VERSIONING
210

    
211
        self.default_account_policy = {'quota': account_quota_policy}
212
        self.default_container_policy = {
213
            'quota': container_quota_policy,
214
            'versioning': container_versioning_policy
215
        }
216
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
217
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
218

    
219
        self.public_url_security = (public_url_security or
220
                                    DEFAULT_PUBLIC_URL_SECURITY)
221
        self.public_url_alphabet = (public_url_alphabet or
222
                                    DEFAULT_PUBLIC_URL_ALPHABET)
223

    
224
        self.hash_algorithm = hash_algorithm
225
        self.block_size = block_size
226
        self.free_versioning = free_versioning
227

    
228
        def load_module(m):
229
            __import__(m)
230
            return sys.modules[m]
231

    
232
        self.db_module = load_module(db_module)
233
        self.wrapper = self.db_module.DBWrapper(db_connection)
234
        params = {'wrapper': self.wrapper}
235
        self.permissions = self.db_module.Permissions(**params)
236
        self.config = self.db_module.Config(**params)
237
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
238
        for x in ['READ', 'WRITE']:
239
            setattr(self, x, getattr(self.db_module, x))
240
        self.node = self.db_module.Node(**params)
241
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
242
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
243
                  'MATCH_PREFIX', 'MATCH_EXACT']:
244
            setattr(self, x, getattr(self.db_module, x))
245

    
246
        self.ALLOWED = ['read', 'write']
247

    
248
        self.block_module = load_module(block_module)
249
        self.block_params = block_params
250
        params = {'path': block_path,
251
                  'block_size': self.block_size,
252
                  'hash_algorithm': self.hash_algorithm,
253
                  'umask': block_umask}
254
        params.update(self.block_params)
255
        self.store = self.block_module.Store(**params)
256

    
257
        if queue_module and queue_hosts:
258
            self.queue_module = load_module(queue_module)
259
            params = {'hosts': queue_hosts,
260
                      'exchange': queue_exchange,
261
                      'client_id': QUEUE_CLIENT_ID}
262
            self.queue = self.queue_module.Queue(**params)
263
        else:
264
            class NoQueue:
265
                def send(self, *args):
266
                    pass
267

    
268
                def close(self):
269
                    pass
270

    
271
            self.queue = NoQueue()
272

    
273
        self.astakos_auth_url = astakos_auth_url
274
        self.service_token = service_token
275

    
276
        if not astakos_auth_url or not AstakosClient:
277
            self.astakosclient = DisabledAstakosClient(
278
                service_token, astakos_auth_url,
279
                use_pool=True,
280
                pool_size=astakosclient_poolsize)
281
        else:
282
            self.astakosclient = AstakosClient(
283
                service_token, astakos_auth_url,
284
                use_pool=True,
285
                pool_size=astakosclient_poolsize)
286

    
287
        self.serials = []
288
        self.messages = []
289

    
290
        self._move_object = partial(self._copy_object, is_move=True)
291

    
292
        self.lock_container_path = False
293

    
294
        self.in_transaction = False
295

    
296
    def pre_exec(self, lock_container_path=False):
297
        self.lock_container_path = lock_container_path
298
        self.wrapper.execute()
299
        self.serials = []
300
        self.in_transaction = True
301

    
302
    def post_exec(self, success_status=True):
303
        if success_status:
304
            # send messages produced
305
            for m in self.messages:
306
                self.queue.send(*m)
307

    
308
            # register serials
309
            if self.serials:
310
                self.commission_serials.insert_many(
311
                    self.serials)
312

    
313
                # commit to ensure that the serials are registered
314
                # even if resolve commission fails
315
                self.wrapper.commit()
316

    
317
                # start new transaction
318
                self.wrapper.execute()
319

    
320
                r = self.astakosclient.resolve_commissions(
321
                    accept_serials=self.serials,
322
                    reject_serials=[])
323
                self.commission_serials.delete_many(
324
                    r['accepted'])
325

    
326
            self.wrapper.commit()
327
        else:
328
            if self.serials:
329
                r = self.astakosclient.resolve_commissions(
330
                    accept_serials=[],
331
                    reject_serials=self.serials)
332
                self.commission_serials.delete_many(
333
                    r['rejected'])
334
            self.wrapper.rollback()
335
        self.in_transaction = False
336

    
337
    def close(self):
338
        self.wrapper.close()
339
        self.queue.close()
340

    
341
    @property
342
    def using_external_quotaholder(self):
343
        return not isinstance(self.astakosclient, DisabledAstakosClient)
344

    
345
    @debug_method
346
    @backend_method
347
    @list_method
348
    def list_accounts(self, user, marker=None, limit=10000):
349
        """Return a list of accounts the user can access."""
350

    
351
        return self._allowed_accounts(user)
352

    
353
    def _get_account_quotas(self, account):
354
        """Get account usage from astakos."""
355

    
356
        quotas = self.astakosclient.service_get_quotas(account)[account]
357
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
358
                                                  {})
359

    
360
    def _get_account_quotas(self, account):
361
        """Get account usage from astakos."""
362

    
363
        quotas = self.astakosclient.service_get_quotas(account)[account]
364
        return quotas.get(DEFAULT_SOURCE, {}).get(DEFAULT_DISKSPACE_RESOURCE,
365
                                                  {})
366

    
367
    @debug_method
368
    @backend_method
369
    def get_account_meta(self, user, account, domain, until=None,
370
                         include_user_defined=True):
371
        """Return a dictionary with the account metadata for the domain."""
372

    
373
        path, node = self._lookup_account(account, user == account)
374
        if user != account:
375
            if until or (node is None) or (account not
376
                                           in self._allowed_accounts(user)):
377
                raise NotAllowedError
378
        try:
379
            props = self._get_properties(node, until)
380
            mtime = props[self.MTIME]
381
        except NameError:
382
            props = None
383
            mtime = until
384
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
385
        tstamp = max(tstamp, mtime)
386
        if until is None:
387
            modified = tstamp
388
        else:
389
            modified = self._get_statistics(
390
                node, compute=True)[2]  # Overall last modification.
391
            modified = max(modified, mtime)
392

    
393
        if user != account:
394
            meta = {'name': account}
395
        else:
396
            meta = {}
397
            if props is not None and include_user_defined:
398
                meta.update(
399
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
400
            if until is not None:
401
                meta.update({'until_timestamp': tstamp})
402
            meta.update({'name': account, 'count': count, 'bytes': bytes})
403
            if self.using_external_quotaholder:
404
                external_quota = self._get_account_quotas(account)
405
                meta['bytes'] = external_quota.get('usage', 0)
406
        meta.update({'modified': modified})
407
        return meta
408

    
409
    @debug_method
410
    @backend_method
411
    def update_account_meta(self, user, account, domain, meta, replace=False):
412
        """Update the metadata associated with the account for the domain."""
413

    
414
        if user != account:
415
            raise NotAllowedError
416
        path, node = self._lookup_account(account, True)
417
        self._put_metadata(user, node, domain, meta, replace,
418
                           update_statistics_ancestors_depth=-1)
419

    
420
    @debug_method
421
    @backend_method
422
    def get_account_groups(self, user, account):
423
        """Return a dictionary with the user groups defined for the account."""
424

    
425
        if user != account:
426
            if account not in self._allowed_accounts(user):
427
                raise NotAllowedError
428
            return {}
429
        self._lookup_account(account, True)
430
        return self.permissions.group_dict(account)
431

    
432
    @debug_method
433
    @backend_method
434
    def update_account_groups(self, user, account, groups, replace=False):
435
        """Update the groups associated with the account."""
436

    
437
        if user != account:
438
            raise NotAllowedError
439
        self._lookup_account(account, True)
440
        self._check_groups(groups)
441
        if replace:
442
            self.permissions.group_destroy(account)
443
        for k, v in groups.iteritems():
444
            if not replace:  # If not already deleted.
445
                self.permissions.group_delete(account, k)
446
            if v:
447
                self.permissions.group_addmany(account, k, v)
448

    
449
    @debug_method
450
    @backend_method
451
    def get_account_policy(self, user, account):
452
        """Return a dictionary with the account policy."""
453

    
454
        if user != account:
455
            if account not in self._allowed_accounts(user):
456
                raise NotAllowedError
457
            return {}
458
        path, node = self._lookup_account(account, True)
459
        policy = self._get_policy(node, is_account_policy=True)
460
        if self.using_external_quotaholder:
461
            external_quota = self._get_account_quotas(account)
462
            policy['quota'] = external_quota.get('limit', 0)
463
        return policy
464

    
465
    @debug_method
466
    @backend_method
467
    def update_account_policy(self, user, account, policy, replace=False):
468
        """Update the policy associated with the account."""
469

    
470
        if user != account:
471
            raise NotAllowedError
472
        path, node = self._lookup_account(account, True)
473
        self._check_policy(policy, is_account_policy=True)
474
        self._put_policy(node, policy, replace, is_account_policy=True)
475

    
476
    @debug_method
477
    @backend_method
478
    def put_account(self, user, account, policy=None):
479
        """Create a new account with the given name."""
480

    
481
        policy = policy or {}
482
        if user != account:
483
            raise NotAllowedError
484
        node = self.node.node_lookup(account)
485
        if node is not None:
486
            raise AccountExists('Account already exists')
487
        if policy:
488
            self._check_policy(policy, is_account_policy=True)
489
        node = self._put_path(user, self.ROOTNODE, account,
490
                              update_statistics_ancestors_depth=-1)
491
        self._put_policy(node, policy, True, is_account_policy=True)
492

    
493
    @debug_method
494
    @backend_method
495
    def delete_account(self, user, account):
496
        """Delete the account with the given name."""
497

    
498
        if user != account:
499
            raise NotAllowedError
500
        node = self.node.node_lookup(account)
501
        if node is None:
502
            return
503
        if not self.node.node_remove(node,
504
                                     update_statistics_ancestors_depth=-1):
505
            raise AccountNotEmpty('Account is not empty')
506
        self.permissions.group_destroy(account)
507

    
508
    @debug_method
509
    @backend_method
510
    @list_method
511
    def list_containers(self, user, account, marker=None, limit=10000,
512
                        shared=False, until=None, public=False):
513
        """Return a list of containers existing under an account."""
514

    
515
        if user != account:
516
            if until or account not in self._allowed_accounts(user):
517
                raise NotAllowedError
518
            return self._allowed_containers(user, account)
519
        if shared or public:
520
            allowed = set()
521
            if shared:
522
                allowed.update([x.split('/', 2)[1] for x in
523
                               self.permissions.access_list_shared(account)])
524
            if public:
525
                allowed.update([x[0].split('/', 2)[1] for x in
526
                               self.permissions.public_list(account)])
527
            return sorted(allowed)
528
        node = self.node.node_lookup(account)
529
        return [x[0] for x in self._list_object_properties(
530
            node, account, '', '/', marker, limit, False, None, [], until)]
531

    
532
    @debug_method
533
    @backend_method
534
    def list_container_meta(self, user, account, container, domain,
535
                            until=None):
536
        """Return a list of the container's object meta keys for a domain."""
537

    
538
        allowed = []
539
        if user != account:
540
            if until:
541
                raise NotAllowedError
542
            allowed = self.permissions.access_list_paths(
543
                user, '/'.join((account, container)))
544
            if not allowed:
545
                raise NotAllowedError
546
        path, node = self._lookup_container(account, container)
547
        before = until if until is not None else inf
548
        allowed = self._get_formatted_paths(allowed)
549
        return self.node.latest_attribute_keys(node, domain, before,
550
                                               CLUSTER_DELETED, allowed)
551

    
552
    @debug_method
553
    @backend_method
554
    def get_container_meta(self, user, account, container, domain, until=None,
555
                           include_user_defined=True):
556
        """Return a dictionary with the container metadata for the domain."""
557

    
558
        if user != account:
559
            if until or container not in self._allowed_containers(user,
560
                                                                  account):
561
                raise NotAllowedError
562
        path, node = self._lookup_container(account, container)
563
        props = self._get_properties(node, until)
564
        mtime = props[self.MTIME]
565
        count, bytes, tstamp = self._get_statistics(node, until)
566
        tstamp = max(tstamp, mtime)
567
        if until is None:
568
            modified = tstamp
569
        else:
570
            modified = self._get_statistics(
571
                node)[2]  # Overall last modification.
572
            modified = max(modified, mtime)
573

    
574
        if user != account:
575
            meta = {'name': container}
576
        else:
577
            meta = {}
578
            if include_user_defined:
579
                meta.update(
580
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
581
            if until is not None:
582
                meta.update({'until_timestamp': tstamp})
583
            meta.update({'name': container, 'count': count, 'bytes': bytes})
584
        meta.update({'modified': modified})
585
        return meta
586

    
587
    @debug_method
588
    @backend_method
589
    def update_container_meta(self, user, account, container, domain, meta,
590
                              replace=False):
591
        """Update the metadata associated with the container for the domain."""
592

    
593
        if user != account:
594
            raise NotAllowedError
595
        path, node = self._lookup_container(account, container)
596
        src_version_id, dest_version_id = self._put_metadata(
597
            user, node, domain, meta, replace,
598
            update_statistics_ancestors_depth=0)
599
        if src_version_id is not None:
600
            versioning = self._get_policy(
601
                node, is_account_policy=False)['versioning']
602
            if versioning != 'auto':
603
                self.node.version_remove(src_version_id,
604
                                         update_statistics_ancestors_depth=0)
605

    
606
    @debug_method
607
    @backend_method
608
    def get_container_policy(self, user, account, container):
609
        """Return a dictionary with the container policy."""
610

    
611
        if user != account:
612
            if container not in self._allowed_containers(user, account):
613
                raise NotAllowedError
614
            return {}
615
        path, node = self._lookup_container(account, container)
616
        return self._get_policy(node, is_account_policy=False)
617

    
618
    @debug_method
619
    @backend_method
620
    def update_container_policy(self, user, account, container, policy,
621
                                replace=False):
622
        """Update the policy associated with the container."""
623

    
624
        if user != account:
625
            raise NotAllowedError
626
        path, node = self._lookup_container(account, container)
627
        self._check_policy(policy, is_account_policy=False)
628
        self._put_policy(node, policy, replace, is_account_policy=False)
629

    
630
    @debug_method
631
    @backend_method
632
    def put_container(self, user, account, container, policy=None):
633
        """Create a new container with the given name."""
634

    
635
        policy = policy or {}
636
        if user != account:
637
            raise NotAllowedError
638
        try:
639
            path, node = self._lookup_container(account, container)
640
        except NameError:
641
            pass
642
        else:
643
            raise ContainerExists('Container already exists')
644
        if policy:
645
            self._check_policy(policy, is_account_policy=False)
646
        path = '/'.join((account, container))
647
        node = self._put_path(
648
            user, self._lookup_account(account, True)[1], path,
649
            update_statistics_ancestors_depth=-1)
650
        self._put_policy(node, policy, True, is_account_policy=False)
651

    
652
    @debug_method
653
    @backend_method
654
    def delete_container(self, user, account, container, until=None, prefix='',
655
                         delimiter=None):
656
        """Delete/purge the container with the given name."""
657

    
658
        if user != account:
659
            raise NotAllowedError
660
        path, node = self._lookup_container(account, container)
661

    
662
        if until is not None:
663
            hashes, size, serials = self.node.node_purge_children(
664
                node, until, CLUSTER_HISTORY,
665
                update_statistics_ancestors_depth=0)
666
            for h in hashes:
667
                self.store.map_delete(h)
668
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
669
                                          update_statistics_ancestors_depth=0)
670
            if not self.free_versioning:
671
                self._report_size_change(
672
                    user, account, -size, {
673
                        'action': 'container purge',
674
                        'path': path,
675
                        'versions': ','.join(str(i) for i in serials)
676
                    }
677
                )
678
            return
679

    
680
        if not delimiter:
681
            if self._get_statistics(node)[0] > 0:
682
                raise ContainerNotEmpty('Container is not empty')
683
            hashes, size, serials = self.node.node_purge_children(
684
                node, inf, CLUSTER_HISTORY,
685
                update_statistics_ancestors_depth=0)
686
            for h in hashes:
687
                self.store.map_delete(h)
688
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
689
                                          update_statistics_ancestors_depth=0)
690
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
691
            if not self.free_versioning:
692
                self._report_size_change(
693
                    user, account, -size, {
694
                        'action': 'container purge',
695
                        'path': path,
696
                        'versions': ','.join(str(i) for i in serials)
697
                    }
698
                )
699
        else:
700
            # remove only contents
701
            src_names = self._list_objects_no_limit(
702
                user, account, container, prefix='', delimiter=None,
703
                virtual=False, domain=None, keys=[], shared=False, until=None,
704
                size_range=None, all_props=True, public=False)
705
            paths = []
706
            for t in src_names:
707
                path = '/'.join((account, container, t[0]))
708
                node = t[2]
709
                if not self._exists(node):
710
                    continue
711
                src_version_id, dest_version_id = self._put_version_duplicate(
712
                    user, node, size=0, type='', hash=None, checksum='',
713
                    cluster=CLUSTER_DELETED,
714
                    update_statistics_ancestors_depth=1)
715
                del_size = self._apply_versioning(
716
                    account, container, src_version_id,
717
                    update_statistics_ancestors_depth=1)
718
                self._report_size_change(
719
                    user, account, -del_size, {
720
                        'action': 'object delete',
721
                        'path': path,
722
                        'versions': ','.join([str(dest_version_id)])})
723
                self._report_object_change(
724
                    user, account, path, details={'action': 'object delete'})
725
                paths.append(path)
726
            self.permissions.access_clear_bulk(paths)
727

    
728
    def _list_objects(self, user, account, container, prefix, delimiter,
729
                      marker, limit, virtual, domain, keys, shared, until,
730
                      size_range, all_props, public):
731
        if user != account and until:
732
            raise NotAllowedError
733

    
734
        objects = []
735
        if shared and public:
736
            # get shared first
737
            shared_paths = self._list_object_permissions(
738
                user, account, container, prefix, shared=True, public=False)
739
            if shared_paths:
740
                path, node = self._lookup_container(account, container)
741
                shared_paths = self._get_formatted_paths(shared_paths)
742
                objects = set(self._list_object_properties(
743
                    node, path, prefix, delimiter, marker, limit, virtual,
744
                    domain, keys, until, size_range, shared_paths, all_props))
745

    
746
            # get public
747
            objects |= set(self._list_public_object_properties(
748
                user, account, container, prefix, all_props))
749
            objects = list(objects)
750

    
751
            objects.sort(key=lambda x: x[0])
752
        elif public:
753
            objects = self._list_public_object_properties(
754
                user, account, container, prefix, all_props)
755
        else:
756
            allowed = self._list_object_permissions(
757
                user, account, container, prefix, shared, public=False)
758
            if shared and not allowed:
759
                return []
760
            path, node = self._lookup_container(account, container)
761
            allowed = self._get_formatted_paths(allowed)
762
            objects = self._list_object_properties(
763
                node, path, prefix, delimiter, marker, limit, virtual, domain,
764
                keys, until, size_range, allowed, all_props)
765

    
766
        # apply limits
767
        start, limit = self._list_limits(objects, marker, limit)
768
        return objects[start:start + limit]
769

    
770
    def _list_public_object_properties(self, user, account, container, prefix,
771
                                       all_props):
772
        public = self._list_object_permissions(
773
            user, account, container, prefix, shared=False, public=True)
774
        paths, nodes = self._lookup_objects(public)
775
        path = '/'.join((account, container))
776
        cont_prefix = path + '/'
777
        paths = [x[len(cont_prefix):] for x in paths]
778
        objects = [(p,) + props for p, props in
779
                   zip(paths, self.node.version_lookup_bulk(
780
                       nodes, all_props=all_props, order_by_path=True))]
781
        return objects
782

    
783
    def _list_objects_no_limit(self, user, account, container, prefix,
784
                               delimiter, virtual, domain, keys, shared, until,
785
                               size_range, all_props, public):
786
        objects = []
787
        while True:
788
            marker = objects[-1] if objects else None
789
            limit = 10000
790
            l = self._list_objects(
791
                user, account, container, prefix, delimiter, marker, limit,
792
                virtual, domain, keys, shared, until, size_range, all_props,
793
                public)
794
            objects.extend(l)
795
            if not l or len(l) < limit:
796
                break
797
        return objects
798

    
799
    def _list_object_permissions(self, user, account, container, prefix,
800
                                 shared, public):
801
        allowed = []
802
        path = '/'.join((account, container, prefix)).rstrip('/')
803
        if user != account:
804
            allowed = self.permissions.access_list_paths(user, path)
805
            if not allowed:
806
                raise NotAllowedError
807
        else:
808
            allowed = set()
809
            if shared:
810
                allowed.update(self.permissions.access_list_shared(path))
811
            if public:
812
                allowed.update(
813
                    [x[0] for x in self.permissions.public_list(path)])
814
            allowed = sorted(allowed)
815
            if not allowed:
816
                return []
817
        return allowed
818

    
819
    @debug_method
820
    @backend_method
821
    def list_objects(self, user, account, container, prefix='', delimiter=None,
822
                     marker=None, limit=10000, virtual=True, domain=None,
823
                     keys=None, shared=False, until=None, size_range=None,
824
                     public=False):
825
        """List (object name, object version_id) under a container."""
826

    
827
        keys = keys or []
828
        return self._list_objects(
829
            user, account, container, prefix, delimiter, marker, limit,
830
            virtual, domain, keys, shared, until, size_range, False, public)
831

    
832
    @debug_method
833
    @backend_method
834
    def list_object_meta(self, user, account, container, prefix='',
835
                         delimiter=None, marker=None, limit=10000,
836
                         virtual=True, domain=None, keys=None, shared=False,
837
                         until=None, size_range=None, public=False):
838
        """Return a list of metadata dicts of objects under a container."""
839

    
840
        keys = keys or []
841
        props = self._list_objects(
842
            user, account, container, prefix, delimiter, marker, limit,
843
            virtual, domain, keys, shared, until, size_range, True, public)
844
        objects = []
845
        for p in props:
846
            if len(p) == 2:
847
                objects.append({'subdir': p[0]})
848
            else:
849
                objects.append({
850
                    'name': p[0],
851
                    'bytes': p[self.SIZE + 1],
852
                    'type': p[self.TYPE + 1],
853
                    'hash': p[self.HASH + 1],
854
                    'version': p[self.SERIAL + 1],
855
                    'version_timestamp': p[self.MTIME + 1],
856
                    'modified': p[self.MTIME + 1] if until is None else None,
857
                    'modified_by': p[self.MUSER + 1],
858
                    'uuid': p[self.UUID + 1],
859
                    'checksum': p[self.CHECKSUM + 1]})
860
        return objects
861

    
862
    @debug_method
863
    @backend_method
864
    def list_object_permissions(self, user, account, container, prefix=''):
865
        """Return a list of paths enforce permissions under a container."""
866

    
867
        return self._list_object_permissions(user, account, container, prefix,
868
                                             True, False)
869

    
870
    @debug_method
871
    @backend_method
872
    def list_object_public(self, user, account, container, prefix=''):
873
        """Return a mapping of object paths to public ids under a container."""
874

    
875
        public = {}
876
        for path, p in self.permissions.public_list('/'.join((account,
877
                                                              container,
878
                                                              prefix))):
879
            public[path] = p
880
        return public
881

    
882
    @debug_method
883
    @backend_method
884
    def get_object_meta(self, user, account, container, name, domain,
885
                        version=None, include_user_defined=True):
886
        """Return a dictionary with the object metadata for the domain."""
887

    
888
        self._can_read(user, account, container, name)
889
        path, node = self._lookup_object(account, container, name)
890
        props = self._get_version(node, version)
891
        if version is None:
892
            modified = props[self.MTIME]
893
        else:
894
            try:
895
                modified = self._get_version(
896
                    node)[self.MTIME]  # Overall last modification.
897
            except NameError:  # Object may be deleted.
898
                del_props = self.node.version_lookup(
899
                    node, inf, CLUSTER_DELETED)
900
                if del_props is None:
901
                    raise ItemNotExists('Object does not exist')
902
                modified = del_props[self.MTIME]
903

    
904
        meta = {}
905
        if include_user_defined:
906
            meta.update(
907
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
908
        meta.update({'name': name,
909
                     'bytes': props[self.SIZE],
910
                     'type': props[self.TYPE],
911
                     'hash': props[self.HASH],
912
                     'version': props[self.SERIAL],
913
                     'version_timestamp': props[self.MTIME],
914
                     'modified': modified,
915
                     'modified_by': props[self.MUSER],
916
                     'uuid': props[self.UUID],
917
                     'checksum': props[self.CHECKSUM]})
918
        return meta
919

    
920
    @debug_method
921
    @backend_method
922
    def update_object_meta(self, user, account, container, name, domain, meta,
923
                           replace=False):
924
        """Update object metadata for a domain and return the new version."""
925

    
926
        self._can_write(user, account, container, name)
927

    
928
        path, node = self._lookup_object(account, container, name,
929
                                         lock_container=True)
930
        src_version_id, dest_version_id = self._put_metadata(
931
            user, node, domain, meta, replace,
932
            update_statistics_ancestors_depth=1)
933
        self._apply_versioning(account, container, src_version_id,
934
                               update_statistics_ancestors_depth=1)
935
        return dest_version_id
936

    
937
    @debug_method
938
    @backend_method
939
    def get_object_permissions_bulk(self, user, account, container, names):
940
        """Return the action allowed on the object, the path
941
        from which the object gets its permissions from,
942
        along with a dictionary containing the permissions."""
943

    
944
        permissions_path = self._get_permissions_path_bulk(account, container,
945
                                                           names)
946
        access_objects = self.permissions.access_check_bulk(permissions_path,
947
                                                            user)
948
        #group_parents = access_objects['group_parents']
949
        nobject_permissions = {}
950
        cpath = '/'.join((account, container, ''))
951
        cpath_idx = len(cpath)
952
        for path in permissions_path:
953
            allowed = 1
954
            name = path[cpath_idx:]
955
            if user != account:
956
                try:
957
                    allowed = access_objects[path]
958
                except KeyError:
959
                    raise NotAllowedError
960
            access_dict, allowed = \
961
                self.permissions.access_get_for_bulk(access_objects[path])
962
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
963
                                         access_dict)
964
        self._lookup_objects(permissions_path)
965
        return nobject_permissions
966

    
967
    @debug_method
968
    @backend_method
969
    def get_object_permissions(self, user, account, container, name):
970
        """Return the action allowed on the object, the path
971
        from which the object gets its permissions from,
972
        along with a dictionary containing the permissions."""
973

    
974
        allowed = 'write'
975
        permissions_path = self._get_permissions_path(account, container, name)
976
        if user != account:
977
            if self.permissions.access_check(permissions_path, self.WRITE,
978
                                             user):
979
                allowed = 'write'
980
            elif self.permissions.access_check(permissions_path, self.READ,
981
                                               user):
982
                allowed = 'read'
983
            else:
984
                raise NotAllowedError
985
        self._lookup_object(account, container, name)
986
        return (allowed,
987
                permissions_path,
988
                self.permissions.access_get(permissions_path))
989

    
990
    @debug_method
991
    @backend_method
992
    def update_object_permissions(self, user, account, container, name,
993
                                  permissions):
994
        """Update the permissions associated with the object."""
995

    
996
        if user != account:
997
            raise NotAllowedError
998
        path = self._lookup_object(account, container, name,
999
                                   lock_container=True)[0]
1000
        self._check_permissions(path, permissions)
1001
        try:
1002
            self.permissions.access_set(path, permissions)
1003
        except:
1004
            raise ValueError
1005
        else:
1006
            self._report_sharing_change(user, account, path, {'members':
1007
                                        self.permissions.access_members(path)})
1008

    
1009
    @debug_method
1010
    @backend_method
1011
    def get_object_public(self, user, account, container, name):
1012
        """Return the public id of the object if applicable."""
1013

    
1014
        self._can_read(user, account, container, name)
1015
        path = self._lookup_object(account, container, name)[0]
1016
        p = self.permissions.public_get(path)
1017
        return p
1018

    
1019
    @debug_method
1020
    @backend_method
1021
    def update_object_public(self, user, account, container, name, public):
1022
        """Update the public status of the object."""
1023

    
1024
        self._can_write(user, account, container, name)
1025
        path = self._lookup_object(account, container, name,
1026
                                   lock_container=True)[0]
1027
        if not public:
1028
            self.permissions.public_unset(path)
1029
        else:
1030
            self.permissions.public_set(
1031
                path, self.public_url_security, self.public_url_alphabet)
1032

    
1033
    @debug_method
1034
    @backend_method
1035
    def get_object_hashmap(self, user, account, container, name, version=None):
1036
        """Return the object's size and a list with partial hashes."""
1037

    
1038
        self._can_read(user, account, container, name)
1039
        path, node = self._lookup_object(account, container, name)
1040
        props = self._get_version(node, version)
1041
        if props[self.HASH] is None:
1042
            return 0, ()
1043
        if props[self.HASH].startswith('archip:'):
1044
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1045
                                                     props[self.SIZE])
1046
            return props[self.SIZE], [x for x in hashmap]
1047
        else:
1048
            hashmap = self.store.map_get(self._unhexlify_hash(
1049
                props[self.HASH]))
1050
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
1051

    
1052
    def _update_object_hash(self, user, account, container, name, size, type,
1053
                            hash, checksum, domain, meta, replace_meta,
1054
                            permissions, src_node=None, src_version_id=None,
1055
                            is_copy=False, report_size_change=True):
1056
        if permissions is not None and user != account:
1057
            raise NotAllowedError
1058
        self._can_write(user, account, container, name)
1059
        if permissions is not None:
1060
            path = '/'.join((account, container, name))
1061
            self._check_permissions(path, permissions)
1062

    
1063
        account_path, account_node = self._lookup_account(account, True)
1064
        container_path, container_node = self._lookup_container(
1065
            account, container)
1066

    
1067
        path, node = self._put_object_node(
1068
            container_path, container_node, name)
1069
        pre_version_id, dest_version_id = self._put_version_duplicate(
1070
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1071
            checksum=checksum, is_copy=is_copy,
1072
            update_statistics_ancestors_depth=1)
1073

    
1074
        # Handle meta.
1075
        if src_version_id is None:
1076
            src_version_id = pre_version_id
1077
        self._put_metadata_duplicate(
1078
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1079

    
1080
        del_size = self._apply_versioning(account, container, pre_version_id,
1081
                                          update_statistics_ancestors_depth=1)
1082
        size_delta = size - del_size
1083
        if size_delta > 0:
1084
            # Check account quota.
1085
            if not self.using_external_quotaholder:
1086
                account_quota = long(self._get_policy(
1087
                    account_node, is_account_policy=True)['quota'])
1088
                account_usage = self._get_statistics(account_node,
1089
                                                     compute=True)[1]
1090
                if (account_quota > 0 and account_usage > account_quota):
1091
                    raise QuotaError(
1092
                        'Account quota exceeded: limit: %s, usage: %s' % (
1093
                            account_quota, account_usage))
1094

    
1095
            # Check container quota.
1096
            container_quota = long(self._get_policy(
1097
                container_node, is_account_policy=False)['quota'])
1098
            container_usage = self._get_statistics(container_node)[1]
1099
            if (container_quota > 0 and container_usage > container_quota):
1100
                # This must be executed in a transaction, so the version is
1101
                # never created if it fails.
1102
                raise QuotaError(
1103
                    'Container quota exceeded: limit: %s, usage: %s' % (
1104
                        container_quota, container_usage
1105
                    )
1106
                )
1107

    
1108
        if report_size_change:
1109
            self._report_size_change(
1110
                user, account, size_delta,
1111
                {'action': 'object update', 'path': path,
1112
                 'versions': ','.join([str(dest_version_id)])})
1113
        if permissions is not None:
1114
            self.permissions.access_set(path, permissions)
1115
            self._report_sharing_change(
1116
                user, account, path,
1117
                {'members': self.permissions.access_members(path)})
1118

    
1119
        self._report_object_change(
1120
            user, account, path,
1121
            details={'version': dest_version_id, 'action': 'object update'})
1122
        return dest_version_id
1123

    
1124
    @debug_method
1125
    def update_object_hashmap(self, user, account, container, name, size, type,
1126
                              hashmap, checksum, domain, meta=None,
1127
                              replace_meta=False, permissions=None):
1128
        """Create/update an object's hashmap and return the new version."""
1129

    
1130
        for h in hashmap:
1131
            if h.startswith('archip_'):
1132
                raise IllegalOperationError(
1133
                    'Cannot update Archipelago Volume hashmap.')
1134
        meta = meta or {}
1135
        if size == 0:  # No such thing as an empty hashmap.
1136
            hashmap = [self.put_block('')]
1137
        map = HashMap(self.block_size, self.hash_algorithm)
1138
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1139
        missing = self.store.block_search(map)
1140
        if missing:
1141
            ie = IndexError()
1142
            ie.data = [binascii.hexlify(x) for x in missing]
1143
            raise ie
1144

    
1145
        hash = map.hash()
1146
        hexlified = binascii.hexlify(hash)
1147
        # _update_object_hash() locks destination path
1148
        dest_version_id = self._update_object_hash(
1149
            user, account, container, name, size, type, hexlified, checksum,
1150
            domain, meta, replace_meta, permissions)
1151
        self.store.map_put(hash, map)
1152
        return dest_version_id, hexlified
1153

    
1154
    @debug_method
1155
    @backend_method
1156
    def update_object_checksum(self, user, account, container, name, version,
1157
                               checksum):
1158
        """Update an object's checksum."""
1159

    
1160
        # Update objects with greater version and same hashmap
1161
        # and size (fix metadata updates).
1162
        self._can_write(user, account, container, name)
1163
        path, node = self._lookup_object(account, container, name,
1164
                                         lock_container=True)
1165
        props = self._get_version(node, version)
1166
        versions = self.node.node_get_versions(node)
1167
        for x in versions:
1168
            if (x[self.SERIAL] >= int(version) and
1169
                x[self.HASH] == props[self.HASH] and
1170
                    x[self.SIZE] == props[self.SIZE]):
1171
                self.node.version_put_property(
1172
                    x[self.SERIAL], 'checksum', checksum)
1173

    
1174
    def _copy_object(self, user, src_account, src_container, src_name,
1175
                     dest_account, dest_container, dest_name, type,
1176
                     dest_domain=None, dest_meta=None, replace_meta=False,
1177
                     permissions=None, src_version=None, is_move=False,
1178
                     delimiter=None):
1179

    
1180
        report_size_change = not is_move
1181
        dest_meta = dest_meta or {}
1182
        dest_version_ids = []
1183
        self._can_read(user, src_account, src_container, src_name)
1184

    
1185
        src_container_path = '/'.join((src_account, src_container))
1186
        dest_container_path = '/'.join((dest_account, dest_container))
1187
        # Lock container paths in alphabetical order
1188
        if src_container_path < dest_container_path:
1189
            self._lookup_container(src_account, src_container)
1190
            self._lookup_container(dest_account, dest_container)
1191
        else:
1192
            self._lookup_container(dest_account, dest_container)
1193
            self._lookup_container(src_account, src_container)
1194

    
1195
        path, node = self._lookup_object(src_account, src_container, src_name)
1196
        # TODO: Will do another fetch of the properties in duplicate version...
1197
        props = self._get_version(
1198
            node, src_version)  # Check to see if source exists.
1199
        src_version_id = props[self.SERIAL]
1200
        hash = props[self.HASH]
1201
        size = props[self.SIZE]
1202
        is_copy = not is_move and (src_account, src_container, src_name) != (
1203
            dest_account, dest_container, dest_name)  # New uuid.
1204
        dest_version_ids.append(self._update_object_hash(
1205
            user, dest_account, dest_container, dest_name, size, type, hash,
1206
            None, dest_domain, dest_meta, replace_meta, permissions,
1207
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1208
            report_size_change=report_size_change))
1209
        if is_move and ((src_account, src_container, src_name) !=
1210
                        (dest_account, dest_container, dest_name)):
1211
            self._delete_object(user, src_account, src_container, src_name,
1212
                                report_size_change=report_size_change)
1213

    
1214
        if delimiter:
1215
            prefix = (src_name + delimiter if not
1216
                      src_name.endswith(delimiter) else src_name)
1217
            src_names = self._list_objects_no_limit(
1218
                user, src_account, src_container, prefix, delimiter=None,
1219
                virtual=False, domain=None, keys=[], shared=False, until=None,
1220
                size_range=None, all_props=True, public=False)
1221
            src_names.sort(key=lambda x: x[2])  # order by nodes
1222
            paths = [elem[0] for elem in src_names]
1223
            nodes = [elem[2] for elem in src_names]
1224
            # TODO: Will do another fetch of the properties
1225
            # in duplicate version...
1226
            props = self._get_versions(nodes)  # Check to see if source exists.
1227

    
1228
            for prop, path, node in zip(props, paths, nodes):
1229
                src_version_id = prop[self.SERIAL]
1230
                hash = prop[self.HASH]
1231
                vtype = prop[self.TYPE]
1232
                size = prop[self.SIZE]
1233
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1234
                    delimiter) else dest_name
1235
                vdest_name = path.replace(prefix, dest_prefix, 1)
1236
                # _update_object_hash() locks destination path
1237
                dest_version_ids.append(self._update_object_hash(
1238
                    user, dest_account, dest_container, vdest_name, size,
1239
                    vtype, hash, None, dest_domain, meta={},
1240
                    replace_meta=False, permissions=None, src_node=node,
1241
                    src_version_id=src_version_id, is_copy=is_copy,
1242
                    report_size_change=report_size_change))
1243
                if is_move and ((src_account, src_container, src_name) !=
1244
                                (dest_account, dest_container, dest_name)):
1245
                    self._delete_object(user, src_account, src_container, path,
1246
                                        report_size_change=report_size_change)
1247
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1248
                dest_version_ids)
1249

    
1250
    @debug_method
1251
    @backend_method
1252
    def copy_object(self, user, src_account, src_container, src_name,
1253
                    dest_account, dest_container, dest_name, type, domain,
1254
                    meta=None, replace_meta=False, permissions=None,
1255
                    src_version=None, delimiter=None):
1256
        """Copy an object's data and metadata."""
1257

    
1258
        meta = meta or {}
1259
        dest_version_id = self._copy_object(
1260
            user, src_account, src_container, src_name, dest_account,
1261
            dest_container, dest_name, type, domain, meta, replace_meta,
1262
            permissions, src_version, False, delimiter)
1263
        return dest_version_id
1264

    
1265
    @debug_method
1266
    @backend_method
1267
    def move_object(self, user, src_account, src_container, src_name,
1268
                    dest_account, dest_container, dest_name, type, domain,
1269
                    meta=None, replace_meta=False, permissions=None,
1270
                    delimiter=None):
1271
        """Move an object's data and metadata."""
1272

    
1273
        meta = meta or {}
1274
        if user != src_account:
1275
            raise NotAllowedError
1276
        dest_version_id = self._move_object(
1277
            user, src_account, src_container, src_name, dest_account,
1278
            dest_container, dest_name, type, domain, meta, replace_meta,
1279
            permissions, None, delimiter=delimiter)
1280
        return dest_version_id
1281

    
1282
    def _delete_object(self, user, account, container, name, until=None,
1283
                       delimiter=None, report_size_change=True):
1284
        if user != account:
1285
            raise NotAllowedError
1286

    
1287
        # lookup object and lock container path also
1288
        path, node = self._lookup_object(account, container, name,
1289
                                         lock_container=True)
1290

    
1291
        if until is not None:
1292
            if node is None:
1293
                return
1294
            hashes = []
1295
            size = 0
1296
            serials = []
1297
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1298
                                           update_statistics_ancestors_depth=1)
1299
            hashes += h
1300
            size += s
1301
            serials += v
1302
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1303
                                           update_statistics_ancestors_depth=1)
1304
            hashes += h
1305
            if not self.free_versioning:
1306
                size += s
1307
            serials += v
1308
            for h in hashes:
1309
                self.store.map_delete(h)
1310
            self.node.node_purge(node, until, CLUSTER_DELETED,
1311
                                 update_statistics_ancestors_depth=1)
1312
            try:
1313
                self._get_version(node)
1314
            except NameError:
1315
                self.permissions.access_clear(path)
1316
            self._report_size_change(
1317
                user, account, -size, {
1318
                    'action': 'object purge',
1319
                    'path': path,
1320
                    'versions': ','.join(str(i) for i in serials)
1321
                }
1322
            )
1323
            return
1324

    
1325
        if not self._exists(node):
1326
            raise ItemNotExists('Object is deleted.')
1327

    
1328
        src_version_id, dest_version_id = self._put_version_duplicate(
1329
            user, node, size=0, type='', hash=None, checksum='',
1330
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1331
        del_size = self._apply_versioning(account, container, src_version_id,
1332
                                          update_statistics_ancestors_depth=1)
1333
        if report_size_change:
1334
            self._report_size_change(
1335
                user, account, -del_size,
1336
                {'action': 'object delete',
1337
                 'path': path,
1338
                 'versions': ','.join([str(dest_version_id)])})
1339
        self._report_object_change(
1340
            user, account, path, details={'action': 'object delete'})
1341
        self.permissions.access_clear(path)
1342

    
1343
        if delimiter:
1344
            prefix = name + delimiter if not name.endswith(delimiter) else name
1345
            src_names = self._list_objects_no_limit(
1346
                user, account, container, prefix, delimiter=None,
1347
                virtual=False, domain=None, keys=[], shared=False, until=None,
1348
                size_range=None, all_props=True, public=False)
1349
            paths = []
1350
            for t in src_names:
1351
                path = '/'.join((account, container, t[0]))
1352
                node = t[2]
1353
                if not self._exists(node):
1354
                    continue
1355
                src_version_id, dest_version_id = self._put_version_duplicate(
1356
                    user, node, size=0, type='', hash=None, checksum='',
1357
                    cluster=CLUSTER_DELETED,
1358
                    update_statistics_ancestors_depth=1)
1359
                del_size = self._apply_versioning(
1360
                    account, container, src_version_id,
1361
                    update_statistics_ancestors_depth=1)
1362
                if report_size_change:
1363
                    self._report_size_change(
1364
                        user, account, -del_size,
1365
                        {'action': 'object delete',
1366
                         'path': path,
1367
                         'versions': ','.join([str(dest_version_id)])})
1368
                self._report_object_change(
1369
                    user, account, path, details={'action': 'object delete'})
1370
                paths.append(path)
1371
            self.permissions.access_clear_bulk(paths)
1372

    
1373
    @debug_method
1374
    @backend_method
1375
    def delete_object(self, user, account, container, name, until=None,
1376
                      prefix='', delimiter=None):
1377
        """Delete/purge an object."""
1378

    
1379
        self._delete_object(user, account, container, name, until, delimiter)
1380

    
1381
    @debug_method
1382
    @backend_method
1383
    def list_versions(self, user, account, container, name):
1384
        """Return a list of all object (version, version_timestamp) tuples."""
1385

    
1386
        self._can_read(user, account, container, name)
1387
        path, node = self._lookup_object(account, container, name)
1388
        versions = self.node.node_get_versions(node)
1389
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1390
                x[self.CLUSTER] != CLUSTER_DELETED]
1391

    
1392
    @debug_method
1393
    @backend_method
1394
    def get_uuid(self, user, uuid, check_permissions=True):
1395
        """Return the (account, container, name) for the UUID given."""
1396

    
1397
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1398
        if info is None:
1399
            raise NameError
1400
        path, serial = info
1401
        account, container, name = path.split('/', 2)
1402
        if check_permissions:
1403
            self._can_read(user, account, container, name)
1404
        return (account, container, name)
1405

    
1406
    @debug_method
1407
    @backend_method
1408
    def get_public(self, user, public):
1409
        """Return the (account, container, name) for the public id given."""
1410

    
1411
        path = self.permissions.public_path(public)
1412
        if path is None:
1413
            raise NameError
1414
        account, container, name = path.split('/', 2)
1415
        self._can_read(user, account, container, name)
1416
        return (account, container, name)
1417

    
1418
    def get_block(self, hash):
1419
        """Return a block's data."""
1420

    
1421
        logger.debug("get_block: %s", hash)
1422
        if hash.startswith('archip_'):
1423
            block = self.store.block_get_archipelago(hash)
1424
        else:
1425
            block = self.store.block_get(self._unhexlify_hash(hash))
1426
        if not block:
1427
            raise ItemNotExists('Block does not exist')
1428
        return block
1429

    
1430
    def put_block(self, data):
1431
        """Store a block and return the hash."""
1432

    
1433
        logger.debug("put_block: %s", len(data))
1434
        return binascii.hexlify(self.store.block_put(data))
1435

    
1436
    def update_block(self, hash, data, offset=0):
1437
        """Update a known block and return the hash."""
1438

    
1439
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1440
        if hash.startswith('archip_'):
1441
            raise IllegalOperationError(
1442
                'Cannot update an Archipelago Volume block.')
1443
        if offset == 0 and len(data) == self.block_size:
1444
            return self.put_block(data)
1445
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1446
        return binascii.hexlify(h)
1447

    
1448
    # Path functions.
1449

    
1450
    def _generate_uuid(self):
1451
        return str(uuidlib.uuid4())
1452

    
1453
    def _put_object_node(self, path, parent, name):
1454
        path = '/'.join((path, name))
1455
        node = self.node.node_lookup(path)
1456
        if node is None:
1457
            node = self.node.node_create(parent, path)
1458
        return path, node
1459

    
1460
    def _put_path(self, user, parent, path,
1461
                  update_statistics_ancestors_depth=None):
1462
        node = self.node.node_create(parent, path)
1463
        self.node.version_create(node, None, 0, '', None, user,
1464
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1465
                                 update_statistics_ancestors_depth)
1466
        return node
1467

    
1468
    def _lookup_account(self, account, create=True):
1469
        node = self.node.node_lookup(account)
1470
        if node is None and create:
1471
            node = self._put_path(
1472
                account, self.ROOTNODE, account,
1473
                update_statistics_ancestors_depth=-1)  # User is account.
1474
        return account, node
1475

    
1476
    def _lookup_container(self, account, container):
1477
        for_update = True if self.lock_container_path else False
1478
        path = '/'.join((account, container))
1479
        node = self.node.node_lookup(path, for_update)
1480
        if node is None:
1481
            raise ItemNotExists('Container does not exist')
1482
        return path, node
1483

    
1484
    def _lookup_object(self, account, container, name, lock_container=False):
1485
        if lock_container:
1486
            self._lookup_container(account, container)
1487

    
1488
        path = '/'.join((account, container, name))
1489
        node = self.node.node_lookup(path)
1490
        if node is None:
1491
            raise ItemNotExists('Object does not exist')
1492
        return path, node
1493

    
1494
    def _lookup_objects(self, paths):
1495
        nodes = self.node.node_lookup_bulk(paths)
1496
        return paths, nodes
1497

    
1498
    def _get_properties(self, node, until=None):
1499
        """Return properties until the timestamp given."""
1500

    
1501
        before = until if until is not None else inf
1502
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1503
        if props is None and until is not None:
1504
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1505
        if props is None:
1506
            raise ItemNotExists('Path does not exist')
1507
        return props
1508

    
1509
    def _get_statistics(self, node, until=None, compute=False):
1510
        """Return (count, sum of size, timestamp) of everything under node."""
1511

    
1512
        if until is not None:
1513
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1514
        elif compute:
1515
            stats = self.node.statistics_latest(node,
1516
                                                except_cluster=CLUSTER_DELETED)
1517
        else:
1518
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1519
        if stats is None:
1520
            stats = (0, 0, 0)
1521
        return stats
1522

    
1523
    def _get_version(self, node, version=None):
1524
        if version is None:
1525
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1526
            if props is None:
1527
                raise ItemNotExists('Object does not exist')
1528
        else:
1529
            try:
1530
                version = int(version)
1531
            except ValueError:
1532
                raise VersionNotExists('Version does not exist')
1533
            props = self.node.version_get_properties(version, node=node)
1534
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1535
                raise VersionNotExists('Version does not exist')
1536
        return props
1537

    
1538
    def _get_versions(self, nodes):
1539
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1540

    
1541
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1542
                               type=None, hash=None, checksum=None,
1543
                               cluster=CLUSTER_NORMAL, is_copy=False,
1544
                               update_statistics_ancestors_depth=None):
1545
        """Create a new version of the node."""
1546

    
1547
        props = self.node.version_lookup(
1548
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1549
        if props is not None:
1550
            src_version_id = props[self.SERIAL]
1551
            src_hash = props[self.HASH]
1552
            src_size = props[self.SIZE]
1553
            src_type = props[self.TYPE]
1554
            src_checksum = props[self.CHECKSUM]
1555
        else:
1556
            src_version_id = None
1557
            src_hash = None
1558
            src_size = 0
1559
            src_type = ''
1560
            src_checksum = ''
1561
        if size is None:  # Set metadata.
1562
            hash = src_hash  # This way hash can be set to None
1563
                             # (account or container).
1564
            size = src_size
1565
        if type is None:
1566
            type = src_type
1567
        if checksum is None:
1568
            checksum = src_checksum
1569
        uuid = self._generate_uuid(
1570
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1571

    
1572
        if src_node is None:
1573
            pre_version_id = src_version_id
1574
        else:
1575
            pre_version_id = None
1576
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1577
            if props is not None:
1578
                pre_version_id = props[self.SERIAL]
1579
        if pre_version_id is not None:
1580
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1581
                                        update_statistics_ancestors_depth)
1582

    
1583
        dest_version_id, mtime = self.node.version_create(
1584
            node, hash, size, type, src_version_id, user, uuid, checksum,
1585
            cluster, update_statistics_ancestors_depth)
1586

    
1587
        self.node.attribute_unset_is_latest(node, dest_version_id)
1588

    
1589
        return pre_version_id, dest_version_id
1590

    
1591
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1592
                                node, meta, replace=False):
1593
        if src_version_id is not None:
1594
            self.node.attribute_copy(src_version_id, dest_version_id)
1595
        if not replace:
1596
            self.node.attribute_del(dest_version_id, domain, (
1597
                k for k, v in meta.iteritems() if v == ''))
1598
            self.node.attribute_set(dest_version_id, domain, node, (
1599
                (k, v) for k, v in meta.iteritems() if v != ''))
1600
        else:
1601
            self.node.attribute_del(dest_version_id, domain)
1602
            self.node.attribute_set(dest_version_id, domain, node, ((
1603
                k, v) for k, v in meta.iteritems()))
1604

    
1605
    def _put_metadata(self, user, node, domain, meta, replace=False,
1606
                      update_statistics_ancestors_depth=None):
1607
        """Create a new version and store metadata."""
1608

    
1609
        src_version_id, dest_version_id = self._put_version_duplicate(
1610
            user, node,
1611
            update_statistics_ancestors_depth=
1612
            update_statistics_ancestors_depth)
1613
        self._put_metadata_duplicate(
1614
            src_version_id, dest_version_id, domain, node, meta, replace)
1615
        return src_version_id, dest_version_id
1616

    
1617
    def _list_limits(self, listing, marker, limit):
1618
        start = 0
1619
        if marker:
1620
            try:
1621
                start = listing.index(marker) + 1
1622
            except ValueError:
1623
                pass
1624
        if not limit or limit > 10000:
1625
            limit = 10000
1626
        return start, limit
1627

    
1628
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1629
                                marker=None, limit=10000, virtual=True,
1630
                                domain=None, keys=None, until=None,
1631
                                size_range=None, allowed=None,
1632
                                all_props=False):
1633
        keys = keys or []
1634
        allowed = allowed or []
1635
        cont_prefix = path + '/'
1636
        prefix = cont_prefix + prefix
1637
        start = cont_prefix + marker if marker else None
1638
        before = until if until is not None else inf
1639
        filterq = keys if domain else []
1640
        sizeq = size_range
1641

    
1642
        objects, prefixes = self.node.latest_version_list(
1643
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1644
            allowed, domain, filterq, sizeq, all_props)
1645
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1646
        objects.sort(key=lambda x: x[0])
1647
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1648
        return objects
1649

    
1650
    # Reporting functions.
1651

    
1652
    @debug_method
1653
    @backend_method
1654
    def _report_size_change(self, user, account, size, details=None):
1655
        details = details or {}
1656

    
1657
        if size == 0:
1658
            return
1659

    
1660
        account_node = self._lookup_account(account, True)[1]
1661
        total = self._get_statistics(account_node, compute=True)[1]
1662
        details.update({'user': user, 'total': total})
1663
        self.messages.append(
1664
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1665
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1666

    
1667
        if not self.using_external_quotaholder:
1668
            return
1669

    
1670
        try:
1671
            name = details['path'] if 'path' in details else ''
1672
            serial = self.astakosclient.issue_one_commission(
1673
                holder=account,
1674
                source=DEFAULT_SOURCE,
1675
                provisions={'pithos.diskspace': size},
1676
                name=name)
1677
        except BaseException, e:
1678
            raise QuotaError(e)
1679
        else:
1680
            self.serials.append(serial)
1681

    
1682
    @debug_method
1683
    @backend_method
1684
    def _report_object_change(self, user, account, path, details=None):
1685
        details = details or {}
1686
        details.update({'user': user})
1687
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1688
                              account, QUEUE_INSTANCE_ID, 'object', path,
1689
                              details))
1690

    
1691
    @debug_method
1692
    @backend_method
1693
    def _report_sharing_change(self, user, account, path, details=None):
1694
        details = details or {}
1695
        details.update({'user': user})
1696
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1697
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1698
                              details))
1699

    
1700
    # Policy functions.
1701

    
1702
    def _check_policy(self, policy, is_account_policy=True):
1703
        default_policy = self.default_account_policy \
1704
            if is_account_policy else self.default_container_policy
1705
        for k in policy.keys():
1706
            if policy[k] == '':
1707
                policy[k] = default_policy.get(k)
1708
        for k, v in policy.iteritems():
1709
            if k == 'quota':
1710
                q = int(v)  # May raise ValueError.
1711
                if q < 0:
1712
                    raise ValueError
1713
            elif k == 'versioning':
1714
                if v not in ['auto', 'none']:
1715
                    raise ValueError
1716
            else:
1717
                raise ValueError
1718

    
1719
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1720
        default_policy = self.default_account_policy \
1721
            if is_account_policy else self.default_container_policy
1722
        if replace:
1723
            for k, v in default_policy.iteritems():
1724
                if k not in policy:
1725
                    policy[k] = v
1726
        self.node.policy_set(node, policy)
1727

    
1728
    def _get_policy(self, node, is_account_policy=True):
1729
        default_policy = self.default_account_policy \
1730
            if is_account_policy else self.default_container_policy
1731
        policy = default_policy.copy()
1732
        policy.update(self.node.policy_get(node))
1733
        return policy
1734

    
1735
    def _apply_versioning(self, account, container, version_id,
1736
                          update_statistics_ancestors_depth=None):
1737
        """Delete the provided version if such is the policy.
1738
           Return size of object removed.
1739
        """
1740

    
1741
        if version_id is None:
1742
            return 0
1743
        path, node = self._lookup_container(account, container)
1744
        versioning = self._get_policy(
1745
            node, is_account_policy=False)['versioning']
1746
        if versioning != 'auto':
1747
            hash, size = self.node.version_remove(
1748
                version_id, update_statistics_ancestors_depth)
1749
            self.store.map_delete(hash)
1750
            return size
1751
        elif self.free_versioning:
1752
            return self.node.version_get_properties(
1753
                version_id, keys=('size',))[0]
1754
        return 0
1755

    
1756
    # Access control functions.
1757

    
1758
    def _check_groups(self, groups):
1759
        # raise ValueError('Bad characters in groups')
1760
        pass
1761

    
1762
    def _check_permissions(self, path, permissions):
1763
        # raise ValueError('Bad characters in permissions')
1764
        pass
1765

    
1766
    def _get_formatted_paths(self, paths):
1767
        formatted = []
1768
        if len(paths) == 0:
1769
            return formatted
1770
        props = self.node.get_props(paths)
1771
        if props:
1772
            for prop in props:
1773
                if prop[1].split(';', 1)[0].strip() in (
1774
                        'application/directory', 'application/folder'):
1775
                    formatted.append((prop[0].rstrip('/') + '/',
1776
                                      self.MATCH_PREFIX))
1777
                formatted.append((prop[0], self.MATCH_EXACT))
1778
        return formatted
1779

    
1780
    def _get_permissions_path(self, account, container, name):
1781
        path = '/'.join((account, container, name))
1782
        permission_paths = self.permissions.access_inherit(path)
1783
        permission_paths.sort()
1784
        permission_paths.reverse()
1785
        for p in permission_paths:
1786
            if p == path:
1787
                return p
1788
            else:
1789
                if p.count('/') < 2:
1790
                    continue
1791
                node = self.node.node_lookup(p)
1792
                props = None
1793
                if node is not None:
1794
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1795
                if props is not None:
1796
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1797
                            'application/directory', 'application/folder'):
1798
                        return p
1799
        return None
1800

    
1801
    def _get_permissions_path_bulk(self, account, container, names):
1802
        formatted_paths = []
1803
        for name in names:
1804
            path = '/'.join((account, container, name))
1805
            formatted_paths.append(path)
1806
        permission_paths = self.permissions.access_inherit_bulk(
1807
            formatted_paths)
1808
        permission_paths.sort()
1809
        permission_paths.reverse()
1810
        permission_paths_list = []
1811
        lookup_list = []
1812
        for p in permission_paths:
1813
            if p in formatted_paths:
1814
                permission_paths_list.append(p)
1815
            else:
1816
                if p.count('/') < 2:
1817
                    continue
1818
                lookup_list.append(p)
1819

    
1820
        if len(lookup_list) > 0:
1821
            props = self.node.get_props(lookup_list)
1822
            if props:
1823
                for prop in props:
1824
                    if prop[1].split(';', 1)[0].strip() in (
1825
                            'application/directory', 'application/folder'):
1826
                        permission_paths_list.append(prop[0])
1827

    
1828
        if len(permission_paths_list) > 0:
1829
            return permission_paths_list
1830

    
1831
        return None
1832

    
1833
    def _can_read(self, user, account, container, name):
1834
        if user == account:
1835
            return True
1836
        path = '/'.join((account, container, name))
1837
        if self.permissions.public_get(path) is not None:
1838
            return True
1839
        path = self._get_permissions_path(account, container, name)
1840
        if not path:
1841
            raise NotAllowedError
1842
        if (not self.permissions.access_check(path, self.READ, user) and not
1843
                self.permissions.access_check(path, self.WRITE, user)):
1844
            raise NotAllowedError
1845

    
1846
    def _can_write(self, user, account, container, name):
1847
        if user == account:
1848
            return True
1849
        path = '/'.join((account, container, name))
1850
        path = self._get_permissions_path(account, container, name)
1851
        if not path:
1852
            raise NotAllowedError
1853
        if not self.permissions.access_check(path, self.WRITE, user):
1854
            raise NotAllowedError
1855

    
1856
    def _allowed_accounts(self, user):
1857
        allow = set()
1858
        for path in self.permissions.access_list_paths(user):
1859
            allow.add(path.split('/', 1)[0])
1860
        return sorted(allow)
1861

    
1862
    def _allowed_containers(self, user, account):
1863
        allow = set()
1864
        for path in self.permissions.access_list_paths(user, account):
1865
            allow.add(path.split('/', 2)[1])
1866
        return sorted(allow)
1867

    
1868
    # Domain functions
1869

    
1870
    @debug_method
1871
    @backend_method
1872
    def get_domain_objects(self, domain, user=None):
1873
        allowed_paths = self.permissions.access_list_paths(
1874
            user, include_owned=user is not None, include_containers=False)
1875
        if not allowed_paths:
1876
            return []
1877
        obj_list = self.node.domain_object_list(
1878
            domain, allowed_paths, CLUSTER_NORMAL)
1879
        return [(path,
1880
                 self._build_metadata(props, user_defined_meta),
1881
                 self.permissions.access_get(path)) for
1882
                path, props, user_defined_meta in obj_list]
1883

    
1884
    # util functions
1885

    
1886
    def _build_metadata(self, props, user_defined=None,
1887
                        include_user_defined=True):
1888
        meta = {'bytes': props[self.SIZE],
1889
                'type': props[self.TYPE],
1890
                'hash': props[self.HASH],
1891
                'version': props[self.SERIAL],
1892
                'version_timestamp': props[self.MTIME],
1893
                'modified_by': props[self.MUSER],
1894
                'uuid': props[self.UUID],
1895
                'checksum': props[self.CHECKSUM]}
1896
        if include_user_defined and user_defined is not None:
1897
            meta.update(user_defined)
1898
        return meta
1899

    
1900
    def _exists(self, node):
1901
        try:
1902
            self._get_version(node)
1903
        except ItemNotExists:
1904
            return False
1905
        else:
1906
            return True
1907

    
1908
    def _unhexlify_hash(self, hash):
1909
        try:
1910
            return binascii.unhexlify(hash)
1911
        except TypeError:
1912
            raise InvalidHash(hash)