Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 23b41f6f

History | View | Annotate | Download (70.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def backend_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        self.success_status = getattr(self, 'success_status', False)
131
        try:
132
            result = func(self, *args, **kw)
133
            return result
134
        except:
135
            self.backend.success_status = False
136
            raise
137
    return wrapper
138

    
139

    
140
def debug_method(func):
141
    @wraps(func)
142
    def wrapper(self, *args, **kw):
143
        try:
144
            result = func(self, *args, **kw)
145
            return result
146
        except:
147
            result = format_exc()
148
            raise
149
        finally:
150
            all_args = map(repr, args)
151
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
152
            logger.debug(">>> %s(%s) <<< %s" % (
153
                func.__name__, ', '.join(all_args).rstrip(', '), result))
154
    return wrapper
155

    
156

    
157
class ModularBackend(BaseBackend):
158
    """A modular backend.
159

160
    Uses modules for SQL functions and storage.
161
    """
162

    
163
    def __init__(self, db_module=None, db_connection=None,
164
                 block_module=None, block_path=None, block_umask=None,
165
                 block_size=None, hash_algorithm=None,
166
                 queue_module=None, queue_hosts=None, queue_exchange=None,
167
                 astakos_url=None, service_token=None,
168
                 astakosclient_poolsize=None,
169
                 free_versioning=True, block_params=None,
170
                 public_url_security=None,
171
                 public_url_alphabet=None,
172
                 account_quota_policy=None,
173
                 container_quota_policy=None,
174
                 container_versioning_policy=None):
175
        db_module = db_module or DEFAULT_DB_MODULE
176
        db_connection = db_connection or DEFAULT_DB_CONNECTION
177
        block_module = block_module or DEFAULT_BLOCK_MODULE
178
        block_path = block_path or DEFAULT_BLOCK_PATH
179
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
180
        block_params = block_params or DEFAULT_BLOCK_PARAMS
181
        block_size = block_size or DEFAULT_BLOCK_SIZE
182
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
183
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
184
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
185
        container_quota_policy = container_quota_policy \
186
            or DEFAULT_CONTAINER_QUOTA
187
        container_versioning_policy = container_versioning_policy \
188
            or DEFAULT_CONTAINER_VERSIONING
189

    
190
        self.default_account_policy = {'quota': account_quota_policy}
191
        self.default_container_policy = {
192
            'quota': container_quota_policy,
193
            'versioning': container_versioning_policy
194
        }
195
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
196
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
197

    
198
        self.public_url_security = (public_url_security or
199
                                    DEFAULT_PUBLIC_URL_SECURITY)
200
        self.public_url_alphabet = (public_url_alphabet or
201
                                    DEFAULT_PUBLIC_URL_ALPHABET)
202

    
203
        self.hash_algorithm = hash_algorithm
204
        self.block_size = block_size
205
        self.free_versioning = free_versioning
206

    
207
        def load_module(m):
208
            __import__(m)
209
            return sys.modules[m]
210

    
211
        self.db_module = load_module(db_module)
212
        self.wrapper = self.db_module.DBWrapper(db_connection)
213
        params = {'wrapper': self.wrapper}
214
        self.permissions = self.db_module.Permissions(**params)
215
        self.config = self.db_module.Config(**params)
216
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
217
        for x in ['READ', 'WRITE']:
218
            setattr(self, x, getattr(self.db_module, x))
219
        self.node = self.db_module.Node(**params)
220
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
221
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
222
                  'MATCH_PREFIX', 'MATCH_EXACT']:
223
            setattr(self, x, getattr(self.db_module, x))
224

    
225
        self.block_module = load_module(block_module)
226
        self.block_params = block_params
227
        params = {'path': block_path,
228
                  'block_size': self.block_size,
229
                  'hash_algorithm': self.hash_algorithm,
230
                  'umask': block_umask}
231
        params.update(self.block_params)
232
        self.store = self.block_module.Store(**params)
233

    
234
        if queue_module and queue_hosts:
235
            self.queue_module = load_module(queue_module)
236
            params = {'hosts': queue_hosts,
237
                      'exchange': queue_exchange,
238
                      'client_id': QUEUE_CLIENT_ID}
239
            self.queue = self.queue_module.Queue(**params)
240
        else:
241
            class NoQueue:
242
                def send(self, *args):
243
                    pass
244

    
245
                def close(self):
246
                    pass
247

    
248
            self.queue = NoQueue()
249

    
250
        self.astakos_url = astakos_url
251
        self.service_token = service_token
252

    
253
        if not astakos_url or not AstakosClient:
254
            self.astakosclient = DisabledAstakosClient(
255
                astakos_url,
256
                use_pool=True,
257
                pool_size=astakosclient_poolsize)
258
        else:
259
            self.astakosclient = AstakosClient(
260
                astakos_url,
261
                use_pool=True,
262
                pool_size=astakosclient_poolsize)
263

    
264
        self.serials = []
265
        self.messages = []
266

    
267
        self._move_object = partial(self._copy_object, is_move=True)
268

    
269
        self.lock_container_path = False
270

    
271
    def __enter__(self):
272
        self.wrapper.execute()
273
        return self
274

    
275
    def __exit__(self, type, value, traceback):
276
        success_status = getattr(self, 'success_status', True)
277
        if success_status:
278
            # send messages produced
279
            for m in self.messages:
280
                self.queue.send(*m)
281

    
282
            # register serials
283
            if self.serials:
284
                self.commission_serials.insert_many(
285
                    self.serials)
286

    
287
                # commit to ensure that the serials are registered
288
                # even if resolve commission fails
289
                self.wrapper.commit()
290

    
291
                # start new transaction
292
                self.wrapper.execute()
293

    
294
                r = self.astakosclient.resolve_commissions(
295
                    token=self.service_token,
296
                    accept_serials=self.serials,
297
                    reject_serials=[])
298
                self.commission_serials.delete_many(
299
                    r['accepted'])
300

    
301
            self.wrapper.commit()
302
        else:
303
            if self.serials:
304
                self.astakosclient.resolve_commissions(
305
                    token=self.service_token,
306
                    accept_serials=[],
307
                    reject_serials=self.serials)
308
            self.wrapper.rollback()
309
        self.close()
310

    
311
    def close(self):
312
        self.wrapper.close()
313
        self.queue.close()
314

    
315
    @property
316
    def using_external_quotaholder(self):
317
        return not isinstance(self.astakosclient, DisabledAstakosClient)
318

    
319
    @debug_method
320
    def list_accounts(self, user, marker=None, limit=10000):
321
        """Return a list of accounts the user can access."""
322

    
323
        allowed = self._allowed_accounts(user)
324
        start, limit = self._list_limits(allowed, marker, limit)
325
        return allowed[start:start + limit]
326

    
327
    @debug_method
328
    def get_account_meta(
329
            self, user, account, domain, until=None, include_user_defined=True,
330
            external_quota=None):
331
        """Return a dictionary with the account metadata for the domain."""
332

    
333
        path, node = self._lookup_account(account, user == account)
334
        if user != account:
335
            if until or (node is None) or (account not
336
                                           in self._allowed_accounts(user)):
337
                raise NotAllowedError
338
        try:
339
            props = self._get_properties(node, until)
340
            mtime = props[self.MTIME]
341
        except NameError:
342
            props = None
343
            mtime = until
344
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
345
        tstamp = max(tstamp, mtime)
346
        if until is None:
347
            modified = tstamp
348
        else:
349
            modified = self._get_statistics(
350
                node, compute=True)[2]  # Overall last modification.
351
            modified = max(modified, mtime)
352

    
353
        if user != account:
354
            meta = {'name': account}
355
        else:
356
            meta = {}
357
            if props is not None and include_user_defined:
358
                meta.update(
359
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
360
            if until is not None:
361
                meta.update({'until_timestamp': tstamp})
362
            meta.update({'name': account, 'count': count, 'bytes': bytes})
363
            if self.using_external_quotaholder:
364
                external_quota = external_quota or {}
365
                meta['bytes'] = external_quota.get('usage', 0)
366
        meta.update({'modified': modified})
367
        return meta
368

    
369
    @debug_method
370
    def update_account_meta(self, user, account, domain, meta, replace=False):
371
        """Update the metadata associated with the account for the domain."""
372

    
373
        if user != account:
374
            raise NotAllowedError
375
        path, node = self._lookup_account(account, True)
376
        self._put_metadata(user, node, domain, meta, replace,
377
                           update_statistics_ancestors_depth=-1)
378

    
379
    @debug_method
380
    def get_account_groups(self, user, account):
381
        """Return a dictionary with the user groups defined for the account."""
382

    
383
        if user != account:
384
            if account not in self._allowed_accounts(user):
385
                raise NotAllowedError
386
            return {}
387
        self._lookup_account(account, True)
388
        return self.permissions.group_dict(account)
389

    
390
    @debug_method
391
    def update_account_groups(self, user, account, groups, replace=False):
392
        """Update the groups associated with the account."""
393

    
394
        if user != account:
395
            raise NotAllowedError
396
        self._lookup_account(account, True)
397
        self._check_groups(groups)
398
        if replace:
399
            self.permissions.group_destroy(account)
400
        for k, v in groups.iteritems():
401
            if not replace:  # If not already deleted.
402
                self.permissions.group_delete(account, k)
403
            if v:
404
                self.permissions.group_addmany(account, k, v)
405

    
406
    @debug_method
407
    def get_account_policy(self, user, account, external_quota=None):
408
        """Return a dictionary with the account policy."""
409

    
410
        if user != account:
411
            if account not in self._allowed_accounts(user):
412
                raise NotAllowedError
413
            return {}
414
        path, node = self._lookup_account(account, True)
415
        policy = self._get_policy(node, is_account_policy=True)
416
        if self.using_external_quotaholder:
417
            external_quota = external_quota or {}
418
            policy['quota'] = external_quota.get('limit', 0)
419
        return policy
420

    
421
    @debug_method
422
    def update_account_policy(self, user, account, policy, replace=False):
423
        """Update the policy associated with the account."""
424

    
425
        if user != account:
426
            raise NotAllowedError
427
        path, node = self._lookup_account(account, True)
428
        self._check_policy(policy, is_account_policy=True)
429
        self._put_policy(node, policy, replace, is_account_policy=True)
430

    
431
    @debug_method
432
    def put_account(self, user, account, policy=None):
433
        """Create a new account with the given name."""
434

    
435
        policy = policy or {}
436
        if user != account:
437
            raise NotAllowedError
438
        node = self.node.node_lookup(account)
439
        if node is not None:
440
            raise AccountExists('Account already exists')
441
        if policy:
442
            self._check_policy(policy, is_account_policy=True)
443
        node = self._put_path(user, self.ROOTNODE, account,
444
                              update_statistics_ancestors_depth=-1)
445
        self._put_policy(node, policy, True, is_account_policy=True)
446

    
447
    @debug_method
448
    def delete_account(self, user, account):
449
        """Delete the account with the given name."""
450

    
451
        if user != account:
452
            raise NotAllowedError
453
        node = self.node.node_lookup(account)
454
        if node is None:
455
            return
456
        if not self.node.node_remove(node,
457
                                     update_statistics_ancestors_depth=-1):
458
            raise AccountNotEmpty('Account is not empty')
459
        self.permissions.group_destroy(account)
460

    
461
    @debug_method
462
    def list_containers(self, user, account, marker=None, limit=10000,
463
                        shared=False, until=None, public=False):
464
        """Return a list of containers existing under an account."""
465

    
466
        if user != account:
467
            if until or account not in self._allowed_accounts(user):
468
                raise NotAllowedError
469
            allowed = self._allowed_containers(user, account)
470
            start, limit = self._list_limits(allowed, marker, limit)
471
            return allowed[start:start + limit]
472
        if shared or public:
473
            allowed = set()
474
            if shared:
475
                allowed.update([x.split('/', 2)[1] for x in
476
                               self.permissions.access_list_shared(account)])
477
            if public:
478
                allowed.update([x[0].split('/', 2)[1] for x in
479
                               self.permissions.public_list(account)])
480
            allowed = sorted(allowed)
481
            start, limit = self._list_limits(allowed, marker, limit)
482
            return allowed[start:start + limit]
483
        node = self.node.node_lookup(account)
484
        containers = [x[0] for x in self._list_object_properties(
485
            node, account, '', '/', marker, limit, False, None, [], until)]
486
        start, limit = self._list_limits(
487
            [x[0] for x in containers], marker, limit)
488
        return containers[start:start + limit]
489

    
490
    @debug_method
491
    def list_container_meta(self, user, account, container, domain,
492
                            until=None):
493
        """Return a list of the container's object meta keys for a domain."""
494

    
495
        allowed = []
496
        if user != account:
497
            if until:
498
                raise NotAllowedError
499
            allowed = self.permissions.access_list_paths(
500
                user, '/'.join((account, container)))
501
            if not allowed:
502
                raise NotAllowedError
503
        path, node = self._lookup_container(account, container)
504
        before = until if until is not None else inf
505
        allowed = self._get_formatted_paths(allowed)
506
        return self.node.latest_attribute_keys(node, domain, before,
507
                                               CLUSTER_DELETED, allowed)
508

    
509
    @debug_method
510
    def get_container_meta(self, user, account, container, domain, until=None,
511
                           include_user_defined=True):
512
        """Return a dictionary with the container metadata for the domain."""
513

    
514
        if user != account:
515
            if until or container not in self._allowed_containers(user,
516
                                                                  account):
517
                raise NotAllowedError
518
        path, node = self._lookup_container(account, container)
519
        props = self._get_properties(node, until)
520
        mtime = props[self.MTIME]
521
        count, bytes, tstamp = self._get_statistics(node, until)
522
        tstamp = max(tstamp, mtime)
523
        if until is None:
524
            modified = tstamp
525
        else:
526
            modified = self._get_statistics(
527
                node)[2]  # Overall last modification.
528
            modified = max(modified, mtime)
529

    
530
        if user != account:
531
            meta = {'name': container}
532
        else:
533
            meta = {}
534
            if include_user_defined:
535
                meta.update(
536
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
537
            if until is not None:
538
                meta.update({'until_timestamp': tstamp})
539
            meta.update({'name': container, 'count': count, 'bytes': bytes})
540
        meta.update({'modified': modified})
541
        return meta
542

    
543
    @debug_method
544
    def update_container_meta(self, user, account, container, domain, meta,
545
                              replace=False):
546
        """Update the metadata associated with the container for the domain."""
547

    
548
        if user != account:
549
            raise NotAllowedError
550
        path, node = self._lookup_container(account, container)
551
        src_version_id, dest_version_id = self._put_metadata(
552
            user, node, domain, meta, replace,
553
            update_statistics_ancestors_depth=0)
554
        if src_version_id is not None:
555
            versioning = self._get_policy(
556
                node, is_account_policy=False)['versioning']
557
            if versioning != 'auto':
558
                self.node.version_remove(src_version_id,
559
                                         update_statistics_ancestors_depth=0)
560

    
561
    @debug_method
562
    def get_container_policy(self, user, account, container):
563
        """Return a dictionary with the container policy."""
564

    
565
        if user != account:
566
            if container not in self._allowed_containers(user, account):
567
                raise NotAllowedError
568
            return {}
569
        path, node = self._lookup_container(account, container)
570
        return self._get_policy(node, is_account_policy=False)
571

    
572
    @debug_method
573
    def update_container_policy(self, user, account, container, policy,
574
                                replace=False):
575
        """Update the policy associated with the container."""
576

    
577
        if user != account:
578
            raise NotAllowedError
579
        path, node = self._lookup_container(account, container)
580
        self._check_policy(policy, is_account_policy=False)
581
        self._put_policy(node, policy, replace, is_account_policy=False)
582

    
583
    @debug_method
584
    def put_container(self, user, account, container, policy=None):
585
        """Create a new container with the given name."""
586

    
587
        policy = policy or {}
588
        if user != account:
589
            raise NotAllowedError
590
        try:
591
            path, node = self._lookup_container(account, container)
592
        except NameError:
593
            pass
594
        else:
595
            raise ContainerExists('Container already exists')
596
        if policy:
597
            self._check_policy(policy, is_account_policy=False)
598
        path = '/'.join((account, container))
599
        node = self._put_path(
600
            user, self._lookup_account(account, True)[1], path,
601
            update_statistics_ancestors_depth=-1)
602
        self._put_policy(node, policy, True, is_account_policy=False)
603

    
604
    @debug_method
605
    def delete_container(self, user, account, container, until=None, prefix='',
606
                         delimiter=None):
607
        """Delete/purge the container with the given name."""
608

    
609
        if user != account:
610
            raise NotAllowedError
611
        path, node = self._lookup_container(account, container)
612

    
613
        if until is not None:
614
            hashes, size, serials = self.node.node_purge_children(
615
                node, until, CLUSTER_HISTORY,
616
                update_statistics_ancestors_depth=0)
617
            for h in hashes:
618
                self.store.map_delete(h)
619
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
620
                                          update_statistics_ancestors_depth=0)
621
            if not self.free_versioning:
622
                self._report_size_change(
623
                    user, account, -size, {
624
                        'action': 'container purge',
625
                        'path': path,
626
                        'versions': ','.join(str(i) for i in serials)
627
                    }
628
                )
629
            return
630

    
631
        if not delimiter:
632
            if self._get_statistics(node)[0] > 0:
633
                raise ContainerNotEmpty('Container is not empty')
634
            hashes, size, serials = self.node.node_purge_children(
635
                node, inf, CLUSTER_HISTORY,
636
                update_statistics_ancestors_depth=0)
637
            for h in hashes:
638
                self.store.map_delete(h)
639
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
640
                                          update_statistics_ancestors_depth=0)
641
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
642
            if not self.free_versioning:
643
                self._report_size_change(
644
                    user, account, -size, {
645
                        'action': 'container purge',
646
                        'path': path,
647
                        'versions': ','.join(str(i) for i in serials)
648
                    }
649
                )
650
        else:
651
            # remove only contents
652
            src_names = self._list_objects_no_limit(
653
                user, account, container, prefix='', delimiter=None,
654
                virtual=False, domain=None, keys=[], shared=False, until=None,
655
                size_range=None, all_props=True, public=False)
656
            paths = []
657
            for t in src_names:
658
                path = '/'.join((account, container, t[0]))
659
                node = t[2]
660
                if not self._exists(node):
661
                    continue
662
                src_version_id, dest_version_id = self._put_version_duplicate(
663
                    user, node, size=0, type='', hash=None, checksum='',
664
                    cluster=CLUSTER_DELETED,
665
                    update_statistics_ancestors_depth=1)
666
                del_size = self._apply_versioning(
667
                    account, container, src_version_id,
668
                    update_statistics_ancestors_depth=1)
669
                self._report_size_change(
670
                    user, account, -del_size, {
671
                        'action': 'object delete',
672
                        'path': path,
673
                        'versions': ','.join([str(dest_version_id)])})
674
                self._report_object_change(
675
                    user, account, path, details={'action': 'object delete'})
676
                paths.append(path)
677
            self.permissions.access_clear_bulk(paths)
678

    
679
    def _list_objects(self, user, account, container, prefix, delimiter,
680
                      marker, limit, virtual, domain, keys, shared, until,
681
                      size_range, all_props, public):
682
        if user != account and until:
683
            raise NotAllowedError
684
        if shared and public:
685
            # get shared first
686
            shared_paths = self._list_object_permissions(
687
                user, account, container, prefix, shared=True, public=False)
688
            objects = set()
689
            if shared_paths:
690
                path, node = self._lookup_container(account, container)
691
                shared_paths = self._get_formatted_paths(shared_paths)
692
                objects |= set(self._list_object_properties(
693
                    node, path, prefix, delimiter, marker, limit, virtual,
694
                    domain, keys, until, size_range, shared_paths, all_props))
695

    
696
            # get public
697
            objects |= set(self._list_public_object_properties(
698
                user, account, container, prefix, all_props))
699
            objects = list(objects)
700

    
701
            objects.sort(key=lambda x: x[0])
702
            start, limit = self._list_limits(
703
                [x[0] for x in objects], marker, limit)
704
            return objects[start:start + limit]
705
        elif public:
706
            objects = self._list_public_object_properties(
707
                user, account, container, prefix, all_props)
708
            start, limit = self._list_limits(
709
                [x[0] for x in objects], marker, limit)
710
            return objects[start:start + limit]
711

    
712
        allowed = self._list_object_permissions(
713
            user, account, container, prefix, shared, public)
714
        if shared and not allowed:
715
            return []
716
        path, node = self._lookup_container(account, container)
717
        allowed = self._get_formatted_paths(allowed)
718
        objects = self._list_object_properties(
719
            node, path, prefix, delimiter, marker, limit, virtual, domain,
720
            keys, until, size_range, allowed, all_props)
721
        start, limit = self._list_limits(
722
            [x[0] for x in objects], marker, limit)
723
        return objects[start:start + limit]
724

    
725
    def _list_public_object_properties(self, user, account, container, prefix,
726
                                       all_props):
727
        public = self._list_object_permissions(
728
            user, account, container, prefix, shared=False, public=True)
729
        paths, nodes = self._lookup_objects(public)
730
        path = '/'.join((account, container))
731
        cont_prefix = path + '/'
732
        paths = [x[len(cont_prefix):] for x in paths]
733
        objects = [(p,) + props for p, props in
734
                   zip(paths, self.node.version_lookup_bulk(
735
                       nodes, all_props=all_props))]
736
        return objects
737

    
738
    def _list_objects_no_limit(self, user, account, container, prefix,
739
                               delimiter, virtual, domain, keys, shared, until,
740
                               size_range, all_props, public):
741
        objects = []
742
        while True:
743
            marker = objects[-1] if objects else None
744
            limit = 10000
745
            l = self._list_objects(
746
                user, account, container, prefix, delimiter, marker, limit,
747
                virtual, domain, keys, shared, until, size_range, all_props,
748
                public)
749
            objects.extend(l)
750
            if not l or len(l) < limit:
751
                break
752
        return objects
753

    
754
    def _list_object_permissions(self, user, account, container, prefix,
755
                                 shared, public):
756
        allowed = []
757
        path = '/'.join((account, container, prefix)).rstrip('/')
758
        if user != account:
759
            allowed = self.permissions.access_list_paths(user, path)
760
            if not allowed:
761
                raise NotAllowedError
762
        else:
763
            allowed = set()
764
            if shared:
765
                allowed.update(self.permissions.access_list_shared(path))
766
            if public:
767
                allowed.update(
768
                    [x[0] for x in self.permissions.public_list(path)])
769
            allowed = sorted(allowed)
770
            if not allowed:
771
                return []
772
        return allowed
773

    
774
    @debug_method
775
    def list_objects(self, user, account, container, prefix='', delimiter=None,
776
                     marker=None, limit=10000, virtual=True, domain=None,
777
                     keys=None, shared=False, until=None, size_range=None,
778
                     public=False):
779
        """List (object name, object version_id) under a container."""
780

    
781
        keys = keys or []
782
        return self._list_objects(
783
            user, account, container, prefix, delimiter, marker, limit,
784
            virtual, domain, keys, shared, until, size_range, False, public)
785

    
786
    @debug_method
787
    def list_object_meta(self, user, account, container, prefix='',
788
                         delimiter=None, marker=None, limit=10000,
789
                         virtual=True, domain=None, keys=None, shared=False,
790
                         until=None, size_range=None, public=False):
791
        """Return a list of metadata dicts of objects under a container."""
792

    
793
        keys = keys or []
794
        props = self._list_objects(
795
            user, account, container, prefix, delimiter, marker, limit,
796
            virtual, domain, keys, shared, until, size_range, True, public)
797
        objects = []
798
        for p in props:
799
            if len(p) == 2:
800
                objects.append({'subdir': p[0]})
801
            else:
802
                objects.append({
803
                    'name': p[0],
804
                    'bytes': p[self.SIZE + 1],
805
                    'type': p[self.TYPE + 1],
806
                    'hash': p[self.HASH + 1],
807
                    'version': p[self.SERIAL + 1],
808
                    'version_timestamp': p[self.MTIME + 1],
809
                    'modified': p[self.MTIME + 1] if until is None else None,
810
                    'modified_by': p[self.MUSER + 1],
811
                    'uuid': p[self.UUID + 1],
812
                    'checksum': p[self.CHECKSUM + 1]})
813
        return objects
814

    
815
    @debug_method
816
    def list_object_permissions(self, user, account, container, prefix=''):
817
        """Return a list of paths enforce permissions under a container."""
818

    
819
        return self._list_object_permissions(user, account, container, prefix,
820
                                             True, False)
821

    
822
    @debug_method
823
    def list_object_public(self, user, account, container, prefix=''):
824
        """Return a mapping of object paths to public ids under a container."""
825

    
826
        public = {}
827
        for path, p in self.permissions.public_list('/'.join((account,
828
                                                              container,
829
                                                              prefix))):
830
            public[path] = p
831
        return public
832

    
833
    @debug_method
834
    def get_object_meta(self, user, account, container, name, domain,
835
                        version=None, include_user_defined=True):
836
        """Return a dictionary with the object metadata for the domain."""
837

    
838
        self._can_read(user, account, container, name)
839
        path, node = self._lookup_object(account, container, name)
840
        props = self._get_version(node, version)
841
        if version is None:
842
            modified = props[self.MTIME]
843
        else:
844
            try:
845
                modified = self._get_version(
846
                    node)[self.MTIME]  # Overall last modification.
847
            except NameError:  # Object may be deleted.
848
                del_props = self.node.version_lookup(
849
                    node, inf, CLUSTER_DELETED)
850
                if del_props is None:
851
                    raise ItemNotExists('Object does not exist')
852
                modified = del_props[self.MTIME]
853

    
854
        meta = {}
855
        if include_user_defined:
856
            meta.update(
857
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
858
        meta.update({'name': name,
859
                     'bytes': props[self.SIZE],
860
                     'type': props[self.TYPE],
861
                     'hash': props[self.HASH],
862
                     'version': props[self.SERIAL],
863
                     'version_timestamp': props[self.MTIME],
864
                     'modified': modified,
865
                     'modified_by': props[self.MUSER],
866
                     'uuid': props[self.UUID],
867
                     'checksum': props[self.CHECKSUM]})
868
        return meta
869

    
870
    @debug_method
871
    def update_object_meta(self, user, account, container, name, domain, meta,
872
                           replace=False):
873
        """Update object metadata for a domain and return the new version."""
874

    
875
        self._can_write(user, account, container, name)
876

    
877
        path, node = self._lookup_object(account, container, name,
878
                                         lock_container=True)
879
        src_version_id, dest_version_id = self._put_metadata(
880
            user, node, domain, meta, replace,
881
            update_statistics_ancestors_depth=1)
882
        self._apply_versioning(account, container, src_version_id,
883
                               update_statistics_ancestors_depth=1)
884
        return dest_version_id
885

    
886
    @debug_method
887
    def get_object_permissions(self, user, account, container, name):
888
        """Return the action allowed on the object, the path
889
        from which the object gets its permissions from,
890
        along with a dictionary containing the permissions."""
891

    
892
        allowed = 'write'
893
        permissions_path = self._get_permissions_path(account, container, name)
894
        if user != account:
895
            if self.permissions.access_check(permissions_path, self.WRITE,
896
                                             user):
897
                allowed = 'write'
898
            elif self.permissions.access_check(permissions_path, self.READ,
899
                                               user):
900
                allowed = 'read'
901
            else:
902
                raise NotAllowedError
903
        self._lookup_object(account, container, name)
904
        return (allowed,
905
                permissions_path,
906
                self.permissions.access_get(permissions_path))
907

    
908
    @debug_method
909
    def update_object_permissions(self, user, account, container, name,
910
                                  permissions):
911
        """Update the permissions associated with the object."""
912

    
913
        if user != account:
914
            raise NotAllowedError
915
        path = self._lookup_object(account, container, name,
916
                                   lock_container=True)[0]
917
        self._check_permissions(path, permissions)
918
        self.permissions.access_set(path, permissions)
919
        self._report_sharing_change(user, account, path, {'members':
920
                                    self.permissions.access_members(path)})
921

    
922
    @debug_method
923
    def get_object_public(self, user, account, container, name):
924
        """Return the public id of the object if applicable."""
925

    
926
        self._can_read(user, account, container, name)
927
        path = self._lookup_object(account, container, name)[0]
928
        p = self.permissions.public_get(path)
929
        return p
930

    
931
    @debug_method
932
    def update_object_public(self, user, account, container, name, public):
933
        """Update the public status of the object."""
934

    
935
        self._can_write(user, account, container, name)
936
        path = self._lookup_object(account, container, name,
937
                                   lock_container=True)[0]
938
        if not public:
939
            self.permissions.public_unset(path)
940
        else:
941
            self.permissions.public_set(
942
                path, self.public_url_security, self.public_url_alphabet)
943

    
944
    @debug_method
945
    def get_object_hashmap(self, user, account, container, name, version=None):
946
        """Return the object's size and a list with partial hashes."""
947

    
948
        self._can_read(user, account, container, name)
949
        path, node = self._lookup_object(account, container, name)
950
        props = self._get_version(node, version)
951
        if props[self.HASH] is None:
952
            return 0, ()
953
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
954
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
955

    
956
    def _update_object_hash(self, user, account, container, name, size, type,
957
                            hash, checksum, domain, meta, replace_meta,
958
                            permissions, src_node=None, src_version_id=None,
959
                            is_copy=False, report_size_change=True):
960
        if permissions is not None and user != account:
961
            raise NotAllowedError
962
        self._can_write(user, account, container, name)
963
        if permissions is not None:
964
            path = '/'.join((account, container, name))
965
            self._check_permissions(path, permissions)
966

    
967
        account_path, account_node = self._lookup_account(account, True)
968
        container_path, container_node = self._lookup_container(
969
            account, container)
970

    
971
        path, node = self._put_object_node(
972
            container_path, container_node, name)
973
        pre_version_id, dest_version_id = self._put_version_duplicate(
974
            user, node, src_node=src_node, size=size, type=type, hash=hash,
975
            checksum=checksum, is_copy=is_copy,
976
            update_statistics_ancestors_depth=1)
977

    
978
        # Handle meta.
979
        if src_version_id is None:
980
            src_version_id = pre_version_id
981
        self._put_metadata_duplicate(
982
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
983

    
984
        del_size = self._apply_versioning(account, container, pre_version_id,
985
                                          update_statistics_ancestors_depth=1)
986
        size_delta = size - del_size
987
        if size_delta > 0:
988
            # Check account quota.
989
            if not self.using_external_quotaholder:
990
                account_quota = long(self._get_policy(
991
                    account_node, is_account_policy=True)['quota'])
992
                account_usage = self._get_statistics(account_node,
993
                                                     compute=True)[1]
994
                if (account_quota > 0 and account_usage > account_quota):
995
                    raise QuotaError(
996
                        'Account quota exceeded: limit: %s, usage: %s' % (
997
                            account_quota, account_usage))
998

    
999
            # Check container quota.
1000
            container_quota = long(self._get_policy(
1001
                container_node, is_account_policy=False)['quota'])
1002
            container_usage = self._get_statistics(container_node)[1]
1003
            if (container_quota > 0 and container_usage > container_quota):
1004
                # This must be executed in a transaction, so the version is
1005
                # never created if it fails.
1006
                raise QuotaError(
1007
                    'Container quota exceeded: limit: %s, usage: %s' % (
1008
                        container_quota, container_usage
1009
                    )
1010
                )
1011

    
1012
        if report_size_change:
1013
            self._report_size_change(
1014
                user, account, size_delta,
1015
                {'action': 'object update', 'path': path,
1016
                 'versions': ','.join([str(dest_version_id)])})
1017
        if permissions is not None:
1018
            self.permissions.access_set(path, permissions)
1019
            self._report_sharing_change(
1020
                user, account, path,
1021
                {'members': self.permissions.access_members(path)})
1022

    
1023
        self._report_object_change(
1024
            user, account, path,
1025
            details={'version': dest_version_id, 'action': 'object update'})
1026
        return dest_version_id
1027

    
1028
    @debug_method
1029
    def update_object_hashmap(self, user, account, container, name, size, type,
1030
                              hashmap, checksum, domain, meta=None,
1031
                              replace_meta=False, permissions=None):
1032
        """Create/update an object's hashmap and return the new version."""
1033

    
1034
        meta = meta or {}
1035
        if size == 0:  # No such thing as an empty hashmap.
1036
            hashmap = [self.put_block('')]
1037
        map = HashMap(self.block_size, self.hash_algorithm)
1038
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1039
        missing = self.store.block_search(map)
1040
        if missing:
1041
            ie = IndexError()
1042
            ie.data = [binascii.hexlify(x) for x in missing]
1043
            raise ie
1044

    
1045
        hash = map.hash()
1046
        hexlified = binascii.hexlify(hash)
1047
        dest_version_id = self._update_object_hash(
1048
            user, account, container, name, size, type, hexlified, checksum,
1049
            domain, meta, replace_meta, permissions)
1050
        self.store.map_put(hash, map)
1051
        return dest_version_id, hexlified
1052

    
1053
    @debug_method
1054
    def update_object_checksum(self, user, account, container, name, version,
1055
                               checksum):
1056
        """Update an object's checksum."""
1057

    
1058
        # Update objects with greater version and same hashmap
1059
        # and size (fix metadata updates).
1060
        self._can_write(user, account, container, name)
1061
        path, node = self._lookup_object(account, container, name,
1062
                                         lock_container=True)
1063
        props = self._get_version(node, version)
1064
        versions = self.node.node_get_versions(node)
1065
        for x in versions:
1066
            if (x[self.SERIAL] >= int(version) and
1067
                x[self.HASH] == props[self.HASH] and
1068
                    x[self.SIZE] == props[self.SIZE]):
1069
                self.node.version_put_property(
1070
                    x[self.SERIAL], 'checksum', checksum)
1071

    
1072
    def _copy_object(self, user, src_account, src_container, src_name,
1073
                     dest_account, dest_container, dest_name, type,
1074
                     dest_domain=None, dest_meta=None, replace_meta=False,
1075
                     permissions=None, src_version=None, is_move=False,
1076
                     delimiter=None):
1077

    
1078
        report_size_change = not is_move
1079
        dest_meta = dest_meta or {}
1080
        dest_version_ids = []
1081
        self._can_read(user, src_account, src_container, src_name)
1082
        path, node = self._lookup_object(src_account, src_container, src_name,
1083
                                         lock_container=True)
1084
        # TODO: Will do another fetch of the properties in duplicate version...
1085
        props = self._get_version(
1086
            node, src_version)  # Check to see if source exists.
1087
        src_version_id = props[self.SERIAL]
1088
        hash = props[self.HASH]
1089
        size = props[self.SIZE]
1090
        is_copy = not is_move and (src_account, src_container, src_name) != (
1091
            dest_account, dest_container, dest_name)  # New uuid.
1092
        dest_version_ids.append(self._update_object_hash(
1093
            user, dest_account, dest_container, dest_name, size, type, hash,
1094
            None, dest_domain, dest_meta, replace_meta, permissions,
1095
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1096
            report_size_change=report_size_change))
1097
        if is_move and ((src_account, src_container, src_name) !=
1098
                        (dest_account, dest_container, dest_name)):
1099
            self._delete_object(user, src_account, src_container, src_name,
1100
                                report_size_change=report_size_change)
1101

    
1102
        if delimiter:
1103
            prefix = (src_name + delimiter if not
1104
                      src_name.endswith(delimiter) else src_name)
1105
            src_names = self._list_objects_no_limit(
1106
                user, src_account, src_container, prefix, delimiter=None,
1107
                virtual=False, domain=None, keys=[], shared=False, until=None,
1108
                size_range=None, all_props=True, public=False)
1109
            src_names.sort(key=lambda x: x[2])  # order by nodes
1110
            paths = [elem[0] for elem in src_names]
1111
            nodes = [elem[2] for elem in src_names]
1112
            # TODO: Will do another fetch of the properties
1113
            # in duplicate version...
1114
            props = self._get_versions(nodes)  # Check to see if source exists.
1115

    
1116
            for prop, path, node in zip(props, paths, nodes):
1117
                src_version_id = prop[self.SERIAL]
1118
                hash = prop[self.HASH]
1119
                vtype = prop[self.TYPE]
1120
                size = prop[self.SIZE]
1121
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1122
                    delimiter) else dest_name
1123
                vdest_name = path.replace(prefix, dest_prefix, 1)
1124
                dest_version_ids.append(self._update_object_hash(
1125
                    user, dest_account, dest_container, vdest_name, size,
1126
                    vtype, hash, None, dest_domain, meta={},
1127
                    replace_meta=False, permissions=None, src_node=node,
1128
                    src_version_id=src_version_id, is_copy=is_copy))
1129
                if is_move and ((src_account, src_container, src_name) !=
1130
                                (dest_account, dest_container, dest_name)):
1131
                    self._delete_object(user, src_account, src_container, path)
1132
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1133
                dest_version_ids)
1134

    
1135
    @debug_method
1136
    def copy_object(self, user, src_account, src_container, src_name,
1137
                    dest_account, dest_container, dest_name, type, domain,
1138
                    meta=None, replace_meta=False, permissions=None,
1139
                    src_version=None, delimiter=None):
1140
        """Copy an object's data and metadata."""
1141

    
1142
        meta = meta or {}
1143
        dest_version_id = self._copy_object(
1144
            user, src_account, src_container, src_name, dest_account,
1145
            dest_container, dest_name, type, domain, meta, replace_meta,
1146
            permissions, src_version, False, delimiter)
1147
        return dest_version_id
1148

    
1149
    @debug_method
1150
    def move_object(self, user, src_account, src_container, src_name,
1151
                    dest_account, dest_container, dest_name, type, domain,
1152
                    meta=None, replace_meta=False, permissions=None,
1153
                    delimiter=None):
1154
        """Move an object's data and metadata."""
1155

    
1156
        meta = meta or {}
1157
        if user != src_account:
1158
            raise NotAllowedError
1159
        dest_version_id = self._move_object(
1160
            user, src_account, src_container, src_name, dest_account,
1161
            dest_container, dest_name, type, domain, meta, replace_meta,
1162
            permissions, None, delimiter=delimiter)
1163
        return dest_version_id
1164

    
1165
    def _delete_object(self, user, account, container, name, until=None,
1166
                       delimiter=None, report_size_change=True):
1167
        if user != account:
1168
            raise NotAllowedError
1169

    
1170
        # lookup object and lock container path also
1171
        path, node = self._lookup_object(account, container, name,
1172
                                         lock_container=True)
1173

    
1174
        if until is not None:
1175
            if node is None:
1176
                return
1177
            hashes = []
1178
            size = 0
1179
            serials = []
1180
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1181
                                           update_statistics_ancestors_depth=1)
1182
            hashes += h
1183
            size += s
1184
            serials += v
1185
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1186
                                           update_statistics_ancestors_depth=1)
1187
            hashes += h
1188
            if not self.free_versioning:
1189
                size += s
1190
            serials += v
1191
            for h in hashes:
1192
                self.store.map_delete(h)
1193
            self.node.node_purge(node, until, CLUSTER_DELETED,
1194
                                 update_statistics_ancestors_depth=1)
1195
            try:
1196
                self._get_version(node)
1197
            except NameError:
1198
                self.permissions.access_clear(path)
1199
            self._report_size_change(
1200
                user, account, -size, {
1201
                    'action': 'object purge',
1202
                    'path': path,
1203
                    'versions': ','.join(str(i) for i in serials)
1204
                }
1205
            )
1206
            return
1207

    
1208
        if not self._exists(node):
1209
            raise ItemNotExists('Object is deleted.')
1210
        src_version_id, dest_version_id = self._put_version_duplicate(
1211
            user, node, size=0, type='', hash=None, checksum='',
1212
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1213
        del_size = self._apply_versioning(account, container, src_version_id,
1214
                                          update_statistics_ancestors_depth=1)
1215
        if report_size_change:
1216
            self._report_size_change(
1217
                user, account, -del_size,
1218
                {'action': 'object delete',
1219
                 'path': path,
1220
                 'versions': ','.join([str(dest_version_id)])})
1221
        self._report_object_change(
1222
            user, account, path, details={'action': 'object delete'})
1223
        self.permissions.access_clear(path)
1224

    
1225
        if delimiter:
1226
            prefix = name + delimiter if not name.endswith(delimiter) else name
1227
            src_names = self._list_objects_no_limit(
1228
                user, account, container, prefix, delimiter=None,
1229
                virtual=False, domain=None, keys=[], shared=False, until=None,
1230
                size_range=None, all_props=True, public=False)
1231
            paths = []
1232
            for t in src_names:
1233
                path = '/'.join((account, container, t[0]))
1234
                node = t[2]
1235
                if not self._exists(node):
1236
                    continue
1237
                src_version_id, dest_version_id = self._put_version_duplicate(
1238
                    user, node, size=0, type='', hash=None, checksum='',
1239
                    cluster=CLUSTER_DELETED,
1240
                    update_statistics_ancestors_depth=1)
1241
                del_size = self._apply_versioning(
1242
                    account, container, src_version_id,
1243
                    update_statistics_ancestors_depth=1)
1244
                if report_size_change:
1245
                    self._report_size_change(
1246
                        user, account, -del_size,
1247
                        {'action': 'object delete',
1248
                         'path': path,
1249
                         'versions': ','.join([str(dest_version_id)])})
1250
                self._report_object_change(
1251
                    user, account, path, details={'action': 'object delete'})
1252
                paths.append(path)
1253
            self.permissions.access_clear_bulk(paths)
1254

    
1255
    @debug_method
1256
    def delete_object(self, user, account, container, name, until=None,
1257
                      prefix='', delimiter=None):
1258
        """Delete/purge an object."""
1259

    
1260
        self._delete_object(user, account, container, name, until, delimiter)
1261

    
1262
    @debug_method
1263
    def list_versions(self, user, account, container, name):
1264
        """Return a list of all object (version, version_timestamp) tuples."""
1265

    
1266
        self._can_read(user, account, container, name)
1267
        path, node = self._lookup_object(account, container, name)
1268
        versions = self.node.node_get_versions(node)
1269
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1270
                x[self.CLUSTER] != CLUSTER_DELETED]
1271

    
1272
    @debug_method
1273
    def get_uuid(self, user, uuid):
1274
        """Return the (account, container, name) for the UUID given."""
1275

    
1276
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1277
        if info is None:
1278
            raise NameError
1279
        path, serial = info
1280
        account, container, name = path.split('/', 2)
1281
        self._can_read(user, account, container, name)
1282
        return (account, container, name)
1283

    
1284
    @debug_method
1285
    def get_public(self, user, public):
1286
        """Return the (account, container, name) for the public id given."""
1287

    
1288
        path = self.permissions.public_path(public)
1289
        if path is None:
1290
            raise NameError
1291
        account, container, name = path.split('/', 2)
1292
        self._can_read(user, account, container, name)
1293
        return (account, container, name)
1294

    
1295
    def get_block(self, hash):
1296
        """Return a block's data."""
1297

    
1298
        logger.debug("get_block: %s", hash)
1299
        block = self.store.block_get(self._unhexlify_hash(hash))
1300
        if not block:
1301
            raise ItemNotExists('Block does not exist')
1302
        return block
1303

    
1304
    def put_block(self, data):
1305
        """Store a block and return the hash."""
1306

    
1307
        logger.debug("put_block: %s", len(data))
1308
        return binascii.hexlify(self.store.block_put(data))
1309

    
1310
    def update_block(self, hash, data, offset=0):
1311
        """Update a known block and return the hash."""
1312

    
1313
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1314
        if offset == 0 and len(data) == self.block_size:
1315
            return self.put_block(data)
1316
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1317
        return binascii.hexlify(h)
1318

    
1319
    # Path functions.
1320

    
1321
    def _generate_uuid(self):
1322
        return str(uuidlib.uuid4())
1323

    
1324
    def _put_object_node(self, path, parent, name):
1325
        path = '/'.join((path, name))
1326
        node = self.node.node_lookup(path)
1327
        if node is None:
1328
            node = self.node.node_create(parent, path)
1329
        return path, node
1330

    
1331
    def _put_path(self, user, parent, path,
1332
                  update_statistics_ancestors_depth=None):
1333
        node = self.node.node_create(parent, path)
1334
        self.node.version_create(node, None, 0, '', None, user,
1335
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1336
                                 update_statistics_ancestors_depth)
1337
        return node
1338

    
1339
    def _lookup_account(self, account, create=True):
1340
        node = self.node.node_lookup(account)
1341
        if node is None and create:
1342
            node = self._put_path(
1343
                account, self.ROOTNODE, account,
1344
                update_statistics_ancestors_depth=-1)  # User is account.
1345
        return account, node
1346

    
1347
    def _lookup_container(self, account, container):
1348
        for_update = True if self.lock_container_path else False
1349
        path = '/'.join((account, container))
1350
        node = self.node.node_lookup(path, for_update)
1351
        if node is None:
1352
            raise ItemNotExists('Container does not exist')
1353
        return path, node
1354

    
1355
    def _lookup_object(self, account, container, name, lock_container=False):
1356
        if lock_container:
1357
            self._lookup_container(account, container)
1358

    
1359
        path = '/'.join((account, container, name))
1360
        node = self.node.node_lookup(path)
1361
        if node is None:
1362
            raise ItemNotExists('Object does not exist')
1363
        return path, node
1364

    
1365
    def _lookup_objects(self, paths):
1366
        nodes = self.node.node_lookup_bulk(paths)
1367
        return paths, nodes
1368

    
1369
    def _get_properties(self, node, until=None):
1370
        """Return properties until the timestamp given."""
1371

    
1372
        before = until if until is not None else inf
1373
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1374
        if props is None and until is not None:
1375
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1376
        if props is None:
1377
            raise ItemNotExists('Path does not exist')
1378
        return props
1379

    
1380
    def _get_statistics(self, node, until=None, compute=False):
1381
        """Return (count, sum of size, timestamp) of everything under node."""
1382

    
1383
        if until is not None:
1384
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1385
        elif compute:
1386
            stats = self.node.statistics_latest(node,
1387
                                                except_cluster=CLUSTER_DELETED)
1388
        else:
1389
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1390
        if stats is None:
1391
            stats = (0, 0, 0)
1392
        return stats
1393

    
1394
    def _get_version(self, node, version=None):
1395
        if version is None:
1396
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1397
            if props is None:
1398
                raise ItemNotExists('Object does not exist')
1399
        else:
1400
            try:
1401
                version = int(version)
1402
            except ValueError:
1403
                raise VersionNotExists('Version does not exist')
1404
            props = self.node.version_get_properties(version, node=node)
1405
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1406
                raise VersionNotExists('Version does not exist')
1407
        return props
1408

    
1409
    def _get_versions(self, nodes):
1410
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1411

    
1412
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1413
                               type=None, hash=None, checksum=None,
1414
                               cluster=CLUSTER_NORMAL, is_copy=False,
1415
                               update_statistics_ancestors_depth=None):
1416
        """Create a new version of the node."""
1417

    
1418
        props = self.node.version_lookup(
1419
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1420
        if props is not None:
1421
            src_version_id = props[self.SERIAL]
1422
            src_hash = props[self.HASH]
1423
            src_size = props[self.SIZE]
1424
            src_type = props[self.TYPE]
1425
            src_checksum = props[self.CHECKSUM]
1426
        else:
1427
            src_version_id = None
1428
            src_hash = None
1429
            src_size = 0
1430
            src_type = ''
1431
            src_checksum = ''
1432
        if size is None:  # Set metadata.
1433
            hash = src_hash  # This way hash can be set to None
1434
                             # (account or container).
1435
            size = src_size
1436
        if type is None:
1437
            type = src_type
1438
        if checksum is None:
1439
            checksum = src_checksum
1440
        uuid = self._generate_uuid(
1441
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1442

    
1443
        if src_node is None:
1444
            pre_version_id = src_version_id
1445
        else:
1446
            pre_version_id = None
1447
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1448
            if props is not None:
1449
                pre_version_id = props[self.SERIAL]
1450
        if pre_version_id is not None:
1451
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1452
                                        update_statistics_ancestors_depth)
1453

    
1454
        dest_version_id, mtime = self.node.version_create(
1455
            node, hash, size, type, src_version_id, user, uuid, checksum,
1456
            cluster, update_statistics_ancestors_depth)
1457

    
1458
        self.node.attribute_unset_is_latest(node, dest_version_id)
1459

    
1460
        return pre_version_id, dest_version_id
1461

    
1462
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1463
                                node, meta, replace=False):
1464
        if src_version_id is not None:
1465
            self.node.attribute_copy(src_version_id, dest_version_id)
1466
        if not replace:
1467
            self.node.attribute_del(dest_version_id, domain, (
1468
                k for k, v in meta.iteritems() if v == ''))
1469
            self.node.attribute_set(dest_version_id, domain, node, (
1470
                (k, v) for k, v in meta.iteritems() if v != ''))
1471
        else:
1472
            self.node.attribute_del(dest_version_id, domain)
1473
            self.node.attribute_set(dest_version_id, domain, node, ((
1474
                k, v) for k, v in meta.iteritems()))
1475

    
1476
    def _put_metadata(self, user, node, domain, meta, replace=False,
1477
                      update_statistics_ancestors_depth=None):
1478
        """Create a new version and store metadata."""
1479

    
1480
        src_version_id, dest_version_id = self._put_version_duplicate(
1481
            user, node,
1482
            update_statistics_ancestors_depth=
1483
            update_statistics_ancestors_depth)
1484
        self._put_metadata_duplicate(
1485
            src_version_id, dest_version_id, domain, node, meta, replace)
1486
        return src_version_id, dest_version_id
1487

    
1488
    def _list_limits(self, listing, marker, limit):
1489
        start = 0
1490
        if marker:
1491
            try:
1492
                start = listing.index(marker) + 1
1493
            except ValueError:
1494
                pass
1495
        if not limit or limit > 10000:
1496
            limit = 10000
1497
        return start, limit
1498

    
1499
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1500
                                marker=None, limit=10000, virtual=True,
1501
                                domain=None, keys=None, until=None,
1502
                                size_range=None, allowed=None,
1503
                                all_props=False):
1504
        keys = keys or []
1505
        allowed = allowed or []
1506
        cont_prefix = path + '/'
1507
        prefix = cont_prefix + prefix
1508
        start = cont_prefix + marker if marker else None
1509
        before = until if until is not None else inf
1510
        filterq = keys if domain else []
1511
        sizeq = size_range
1512

    
1513
        objects, prefixes = self.node.latest_version_list(
1514
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1515
            allowed, domain, filterq, sizeq, all_props)
1516
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1517
        objects.sort(key=lambda x: x[0])
1518
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1519
        return objects
1520

    
1521
    # Reporting functions.
1522

    
1523
    @debug_method
1524
    def _report_size_change(self, user, account, size, details=None):
1525
        details = details or {}
1526

    
1527
        if size == 0:
1528
            return
1529

    
1530
        account_node = self._lookup_account(account, True)[1]
1531
        total = self._get_statistics(account_node, compute=True)[1]
1532
        details.update({'user': user, 'total': total})
1533
        self.messages.append(
1534
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1535
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1536

    
1537
        if not self.using_external_quotaholder:
1538
            return
1539

    
1540
        try:
1541
            name = details['path'] if 'path' in details else ''
1542
            serial = self.astakosclient.issue_one_commission(
1543
                token=self.service_token,
1544
                holder=account,
1545
                source=DEFAULT_SOURCE,
1546
                provisions={'pithos.diskspace': size},
1547
                name=name)
1548
        except BaseException, e:
1549
            raise QuotaError(e)
1550
        else:
1551
            self.serials.append(serial)
1552

    
1553
    @debug_method
1554
    def _report_object_change(self, user, account, path, details=None):
1555
        details = details or {}
1556
        details.update({'user': user})
1557
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1558
                              account, QUEUE_INSTANCE_ID, 'object', path,
1559
                              details))
1560

    
1561
    @debug_method
1562
    def _report_sharing_change(self, user, account, path, details=None):
1563
        details = details or {}
1564
        details.update({'user': user})
1565
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1566
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1567
                              details))
1568

    
1569
    # Policy functions.
1570

    
1571
    def _check_policy(self, policy, is_account_policy=True):
1572
        default_policy = self.default_account_policy \
1573
            if is_account_policy else self.default_container_policy
1574
        for k in policy.keys():
1575
            if policy[k] == '':
1576
                policy[k] = default_policy.get(k)
1577
        for k, v in policy.iteritems():
1578
            if k == 'quota':
1579
                q = int(v)  # May raise ValueError.
1580
                if q < 0:
1581
                    raise ValueError
1582
            elif k == 'versioning':
1583
                if v not in ['auto', 'none']:
1584
                    raise ValueError
1585
            else:
1586
                raise ValueError
1587

    
1588
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1589
        default_policy = self.default_account_policy \
1590
            if is_account_policy else self.default_container_policy
1591
        if replace:
1592
            for k, v in default_policy.iteritems():
1593
                if k not in policy:
1594
                    policy[k] = v
1595
        self.node.policy_set(node, policy)
1596

    
1597
    def _get_policy(self, node, is_account_policy=True):
1598
        default_policy = self.default_account_policy \
1599
            if is_account_policy else self.default_container_policy
1600
        policy = default_policy.copy()
1601
        policy.update(self.node.policy_get(node))
1602
        return policy
1603

    
1604
    def _apply_versioning(self, account, container, version_id,
1605
                          update_statistics_ancestors_depth=None):
1606
        """Delete the provided version if such is the policy.
1607
           Return size of object removed.
1608
        """
1609

    
1610
        if version_id is None:
1611
            return 0
1612
        path, node = self._lookup_container(account, container)
1613
        versioning = self._get_policy(
1614
            node, is_account_policy=False)['versioning']
1615
        if versioning != 'auto':
1616
            hash, size = self.node.version_remove(
1617
                version_id, update_statistics_ancestors_depth)
1618
            self.store.map_delete(hash)
1619
            return size
1620
        elif self.free_versioning:
1621
            return self.node.version_get_properties(
1622
                version_id, keys=('size',))[0]
1623
        return 0
1624

    
1625
    # Access control functions.
1626

    
1627
    def _check_groups(self, groups):
1628
        # raise ValueError('Bad characters in groups')
1629
        pass
1630

    
1631
    def _check_permissions(self, path, permissions):
1632
        # raise ValueError('Bad characters in permissions')
1633
        pass
1634

    
1635
    def _get_formatted_paths(self, paths):
1636
        formatted = []
1637
        for p in paths:
1638
            node = self.node.node_lookup(p)
1639
            props = None
1640
            if node is not None:
1641
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1642
            if props is not None:
1643
                if props[self.TYPE].split(';', 1)[0].strip() in (
1644
                        'application/directory', 'application/folder'):
1645
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1646
                formatted.append((p, self.MATCH_EXACT))
1647
        return formatted
1648

    
1649
    def _get_permissions_path(self, account, container, name):
1650
        path = '/'.join((account, container, name))
1651
        permission_paths = self.permissions.access_inherit(path)
1652
        permission_paths.sort()
1653
        permission_paths.reverse()
1654
        for p in permission_paths:
1655
            if p == path:
1656
                return p
1657
            else:
1658
                if p.count('/') < 2:
1659
                    continue
1660
                node = self.node.node_lookup(p)
1661
                props = None
1662
                if node is not None:
1663
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1664
                if props is not None:
1665
                    if props[self.TYPE].split(';', 1)[0].strip() in (
1666
                            'application/directory', 'application/folder'):
1667
                        return p
1668
        return None
1669

    
1670
    def _can_read(self, user, account, container, name):
1671
        if user == account:
1672
            return True
1673
        path = '/'.join((account, container, name))
1674
        if self.permissions.public_get(path) is not None:
1675
            return True
1676
        path = self._get_permissions_path(account, container, name)
1677
        if not path:
1678
            raise NotAllowedError
1679
        if (not self.permissions.access_check(path, self.READ, user) and not
1680
                self.permissions.access_check(path, self.WRITE, user)):
1681
            raise NotAllowedError
1682

    
1683
    def _can_write(self, user, account, container, name):
1684
        if user == account:
1685
            return True
1686
        path = '/'.join((account, container, name))
1687
        path = self._get_permissions_path(account, container, name)
1688
        if not path:
1689
            raise NotAllowedError
1690
        if not self.permissions.access_check(path, self.WRITE, user):
1691
            raise NotAllowedError
1692

    
1693
    def _allowed_accounts(self, user):
1694
        allow = set()
1695
        for path in self.permissions.access_list_paths(user):
1696
            allow.add(path.split('/', 1)[0])
1697
        return sorted(allow)
1698

    
1699
    def _allowed_containers(self, user, account):
1700
        allow = set()
1701
        for path in self.permissions.access_list_paths(user, account):
1702
            allow.add(path.split('/', 2)[1])
1703
        return sorted(allow)
1704

    
1705
    # Domain functions
1706

    
1707
    @debug_method
1708
    def get_domain_objects(self, domain, user=None):
1709
        allowed_paths = self.permissions.access_list_paths(
1710
            user, include_owned=user is not None, include_containers=False)
1711
        if not allowed_paths:
1712
            return []
1713
        obj_list = self.node.domain_object_list(
1714
            domain, allowed_paths, CLUSTER_NORMAL)
1715
        return [(path,
1716
                 self._build_metadata(props, user_defined_meta),
1717
                 self.permissions.access_get(path)) for
1718
                path, props, user_defined_meta in obj_list]
1719

    
1720
    # util functions
1721

    
1722
    def _build_metadata(self, props, user_defined=None,
1723
                        include_user_defined=True):
1724
        meta = {'bytes': props[self.SIZE],
1725
                'type': props[self.TYPE],
1726
                'hash': props[self.HASH],
1727
                'version': props[self.SERIAL],
1728
                'version_timestamp': props[self.MTIME],
1729
                'modified_by': props[self.MUSER],
1730
                'uuid': props[self.UUID],
1731
                'checksum': props[self.CHECKSUM]}
1732
        if include_user_defined and user_defined is not None:
1733
            meta.update(user_defined)
1734
        return meta
1735

    
1736
    def _exists(self, node):
1737
        try:
1738
            self._get_version(node)
1739
        except ItemNotExists:
1740
            return False
1741
        else:
1742
            return True
1743

    
1744
    def _unhexlify_hash(self, hash):
1745
        try:
1746
            return binascii.unhexlify(hash)
1747
        except TypeError:
1748
            raise InvalidHash(hash)