Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ d2fc71c9

History | View | Annotate | Download (70.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
98
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
99
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
100
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
101
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
102
                               'abcdefghijklmnopqrstuvwxyz'
103
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
104
DEFAULT_PUBLIC_URL_SECURITY = 16
105

    
106
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
107
QUEUE_CLIENT_ID = 'pithos'
108
QUEUE_INSTANCE_ID = '1'
109

    
110
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
111

    
112
inf = float('inf')
113

    
114
ULTIMATE_ANSWER = 42
115

    
116
DEFAULT_SOURCE = 'system'
117

    
118
logger = logging.getLogger(__name__)
119

    
120

    
121
def backend_method(func=None, autocommit=1):
122
    if func is None:
123
        def fn(func):
124
            return backend_method(func, autocommit)
125
        return fn
126

    
127
    if not autocommit:
128
        return func
129

    
130
    def fn(self, *args, **kw):
131
        self.wrapper.execute()
132
        serials = []
133
        self.serials = serials
134
        self.messages = []
135

    
136
        try:
137
            ret = func(self, *args, **kw)
138
            for m in self.messages:
139
                self.queue.send(*m)
140
            if self.serials:
141
                self.commission_serials.insert_many(self.serials)
142

    
143
                # commit to ensure that the serials are registered
144
                # even if accept commission fails
145
                self.wrapper.commit()
146
                self.wrapper.execute()
147

    
148
                r = self.astakosclient.resolve_commissions(
149
                            token=self.service_token,
150
                            accept_serials=self.serials,
151
                            reject_serials=[])
152
                self.commission_serials.delete_many(r['accepted'])
153

    
154
            self.wrapper.commit()
155
            return ret
156
        except:
157
            if self.serials:
158
                self.astakosclient.resolve_commissions(
159
                    token=self.service_token,
160
                    accept_serials=[],
161
                    reject_serials=self.serials)
162
            self.wrapper.rollback()
163
            raise
164
    return fn
165

    
166

    
167
class ModularBackend(BaseBackend):
168
    """A modular backend.
169

170
    Uses modules for SQL functions and storage.
171
    """
172

    
173
    def __init__(self, db_module=None, db_connection=None,
174
                 block_module=None, block_path=None, block_umask=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
191
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
192
        container_quota_policy = container_quota_policy \
193
            or DEFAULT_CONTAINER_QUOTA
194
        container_versioning_policy = container_versioning_policy \
195
            or DEFAULT_CONTAINER_VERSIONING
196

    
197
        self.default_account_policy = {'quota': account_quota_policy}
198
        self.default_container_policy = {
199
            'quota': container_quota_policy,
200
            'versioning': container_versioning_policy
201
        }
202
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
203
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
204

    
205
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
206
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
207

    
208
        self.hash_algorithm = 'sha256'
209
        self.block_size = 4 * 1024 * 1024  # 4MB
210
        self.free_versioning = free_versioning
211

    
212
        def load_module(m):
213
            __import__(m)
214
            return sys.modules[m]
215

    
216
        self.db_module = load_module(db_module)
217
        self.wrapper = self.db_module.DBWrapper(db_connection)
218
        params = {'wrapper': self.wrapper}
219
        self.permissions = self.db_module.Permissions(**params)
220
        self.config = self.db_module.Config(**params)
221
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
222
        for x in ['READ', 'WRITE']:
223
            setattr(self, x, getattr(self.db_module, x))
224
        self.node = self.db_module.Node(**params)
225
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
226
            setattr(self, x, getattr(self.db_module, x))
227

    
228
        self.block_module = load_module(block_module)
229
        self.block_params = block_params
230
        params = {'path': block_path,
231
                  'block_size': self.block_size,
232
                  'hash_algorithm': self.hash_algorithm,
233
                  'umask': block_umask}
234
        params.update(self.block_params)
235
        self.store = self.block_module.Store(**params)
236

    
237
        if queue_module and queue_hosts:
238
            self.queue_module = load_module(queue_module)
239
            params = {'hosts': queue_hosts,
240
                      'exchange': queue_exchange,
241
                      'client_id': QUEUE_CLIENT_ID}
242
            self.queue = self.queue_module.Queue(**params)
243
        else:
244
            class NoQueue:
245
                def send(self, *args):
246
                    pass
247

    
248
                def close(self):
249
                    pass
250

    
251
            self.queue = NoQueue()
252

    
253
        self.astakos_url = astakos_url
254
        self.service_token = service_token
255

    
256
        if not astakos_url or not AstakosClient:
257
            self.astakosclient = DisabledAstakosClient(
258
                astakos_url,
259
                use_pool=True,
260
                pool_size=astakosclient_poolsize)
261
        else:
262
            self.astakosclient = AstakosClient(
263
                astakos_url,
264
                use_pool=True,
265
                pool_size=astakosclient_poolsize)
266

    
267
        self.serials = []
268
        self.messages = []
269

    
270
    def close(self):
271
        self.wrapper.close()
272
        self.queue.close()
273

    
274
    @property
275
    def using_external_quotaholder(self):
276
        return not isinstance(self.astakosclient, DisabledAstakosClient)
277

    
278
    @backend_method
279
    def list_accounts(self, user, marker=None, limit=10000):
280
        """Return a list of accounts the user can access."""
281

    
282
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
283
        allowed = self._allowed_accounts(user)
284
        start, limit = self._list_limits(allowed, marker, limit)
285
        return allowed[start:start + limit]
286

    
287
    @backend_method
288
    def get_account_meta(
289
            self, user, account, domain, until=None, include_user_defined=True,
290
            external_quota=None):
291
        """Return a dictionary with the account metadata for the domain."""
292

    
293
        logger.debug(
294
            "get_account_meta: %s %s %s %s", user, account, domain, until)
295
        path, node = self._lookup_account(account, user == account)
296
        if user != account:
297
            if until or node is None or account not in self._allowed_accounts(user):
298
                raise NotAllowedError
299
        try:
300
            props = self._get_properties(node, until)
301
            mtime = props[self.MTIME]
302
        except NameError:
303
            props = None
304
            mtime = until
305
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
306
        tstamp = max(tstamp, mtime)
307
        if until is None:
308
            modified = tstamp
309
        else:
310
            modified = self._get_statistics(
311
                node, compute=True)[2]  # Overall last modification.
312
            modified = max(modified, mtime)
313

    
314
        if user != account:
315
            meta = {'name': account}
316
        else:
317
            meta = {}
318
            if props is not None and include_user_defined:
319
                meta.update(
320
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
321
            if until is not None:
322
                meta.update({'until_timestamp': tstamp})
323
            meta.update({'name': account, 'count': count, 'bytes': bytes})
324
            if self.using_external_quotaholder:
325
                external_quota = external_quota or {}
326
                meta['bytes'] = external_quota.get('usage', 0)
327
        meta.update({'modified': modified})
328
        return meta
329

    
330
    @backend_method
331
    def update_account_meta(self, user, account, domain, meta, replace=False):
332
        """Update the metadata associated with the account for the domain."""
333

    
334
        logger.debug("update_account_meta: %s %s %s %s %s", user,
335
                     account, domain, meta, replace)
336
        if user != account:
337
            raise NotAllowedError
338
        path, node = self._lookup_account(account, True)
339
        self._put_metadata(user, node, domain, meta, replace,
340
                           update_statistics_ancestors_depth=-1)
341

    
342
    @backend_method
343
    def get_account_groups(self, user, account):
344
        """Return a dictionary with the user groups defined for this account."""
345

    
346
        logger.debug("get_account_groups: %s %s", user, account)
347
        if user != account:
348
            if account not in self._allowed_accounts(user):
349
                raise NotAllowedError
350
            return {}
351
        self._lookup_account(account, True)
352
        return self.permissions.group_dict(account)
353

    
354
    @backend_method
355
    def update_account_groups(self, user, account, groups, replace=False):
356
        """Update the groups associated with the account."""
357

    
358
        logger.debug("update_account_groups: %s %s %s %s", user,
359
                     account, groups, replace)
360
        if user != account:
361
            raise NotAllowedError
362
        self._lookup_account(account, True)
363
        self._check_groups(groups)
364
        if replace:
365
            self.permissions.group_destroy(account)
366
        for k, v in groups.iteritems():
367
            if not replace:  # If not already deleted.
368
                self.permissions.group_delete(account, k)
369
            if v:
370
                self.permissions.group_addmany(account, k, v)
371

    
372
    @backend_method
373
    def get_account_policy(self, user, account, external_quota=None):
374
        """Return a dictionary with the account policy."""
375

    
376
        logger.debug("get_account_policy: %s %s", user, account)
377
        if user != account:
378
            if account not in self._allowed_accounts(user):
379
                raise NotAllowedError
380
            return {}
381
        path, node = self._lookup_account(account, True)
382
        policy = self._get_policy(node, is_account_policy=True)
383
        if self.using_external_quotaholder:
384
            external_quota = external_quota or {}
385
            policy['quota'] = external_quota.get('limit', 0)
386
        return policy
387

    
388
    @backend_method
389
    def update_account_policy(self, user, account, policy, replace=False):
390
        """Update the policy associated with the account."""
391

    
392
        logger.debug("update_account_policy: %s %s %s %s", user,
393
                     account, policy, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_account(account, True)
397
        self._check_policy(policy, is_account_policy=True)
398
        self._put_policy(node, policy, replace, is_account_policy=True)
399

    
400
    @backend_method
401
    def put_account(self, user, account, policy=None):
402
        """Create a new account with the given name."""
403

    
404
        logger.debug("put_account: %s %s %s", user, account, policy)
405
        policy = policy or {}
406
        if user != account:
407
            raise NotAllowedError
408
        node = self.node.node_lookup(account)
409
        if node is not None:
410
            raise AccountExists('Account already exists')
411
        if policy:
412
            self._check_policy(policy, is_account_policy=True)
413
        node = self._put_path(user, self.ROOTNODE, account,
414
                              update_statistics_ancestors_depth=-1)
415
        self._put_policy(node, policy, True, is_account_policy=True)
416

    
417
    @backend_method
418
    def delete_account(self, user, account):
419
        """Delete the account with the given name."""
420

    
421
        logger.debug("delete_account: %s %s", user, account)
422
        if user != account:
423
            raise NotAllowedError
424
        node = self.node.node_lookup(account)
425
        if node is None:
426
            return
427
        if not self.node.node_remove(node,
428
                                     update_statistics_ancestors_depth=-1):
429
            raise AccountNotEmpty('Account is not empty')
430
        self.permissions.group_destroy(account)
431

    
432
    @backend_method
433
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
434
        """Return a list of containers existing under an account."""
435

    
436
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
437
                     account, marker, limit, shared, until, public)
438
        if user != account:
439
            if until or account not in self._allowed_accounts(user):
440
                raise NotAllowedError
441
            allowed = self._allowed_containers(user, account)
442
            start, limit = self._list_limits(allowed, marker, limit)
443
            return allowed[start:start + limit]
444
        if shared or public:
445
            allowed = set()
446
            if shared:
447
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
448
            if public:
449
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
450
            allowed = sorted(allowed)
451
            start, limit = self._list_limits(allowed, marker, limit)
452
            return allowed[start:start + limit]
453
        node = self.node.node_lookup(account)
454
        containers = [x[0] for x in self._list_object_properties(
455
            node, account, '', '/', marker, limit, False, None, [], until)]
456
        start, limit = self._list_limits(
457
            [x[0] for x in containers], marker, limit)
458
        return containers[start:start + limit]
459

    
460
    @backend_method
461
    def list_container_meta(self, user, account, container, domain, until=None):
462
        """Return a list with all the container's object meta keys for the domain."""
463

    
464
        logger.debug("list_container_meta: %s %s %s %s %s", user,
465
                     account, container, domain, until)
466
        allowed = []
467
        if user != account:
468
            if until:
469
                raise NotAllowedError
470
            allowed = self.permissions.access_list_paths(
471
                user, '/'.join((account, container)))
472
            if not allowed:
473
                raise NotAllowedError
474
        path, node = self._lookup_container(account, container)
475
        before = until if until is not None else inf
476
        allowed = self._get_formatted_paths(allowed)
477
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
478

    
479
    @backend_method
480
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
481
        """Return a dictionary with the container metadata for the domain."""
482

    
483
        logger.debug("get_container_meta: %s %s %s %s %s", user,
484
                     account, container, domain, until)
485
        if user != account:
486
            if until or container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        props = self._get_properties(node, until)
490
        mtime = props[self.MTIME]
491
        count, bytes, tstamp = self._get_statistics(node, until)
492
        tstamp = max(tstamp, mtime)
493
        if until is None:
494
            modified = tstamp
495
        else:
496
            modified = self._get_statistics(
497
                node)[2]  # Overall last modification.
498
            modified = max(modified, mtime)
499

    
500
        if user != account:
501
            meta = {'name': container}
502
        else:
503
            meta = {}
504
            if include_user_defined:
505
                meta.update(
506
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
507
            if until is not None:
508
                meta.update({'until_timestamp': tstamp})
509
            meta.update({'name': container, 'count': count, 'bytes': bytes})
510
        meta.update({'modified': modified})
511
        return meta
512

    
513
    @backend_method
514
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
515
        """Update the metadata associated with the container for the domain."""
516

    
517
        logger.debug("update_container_meta: %s %s %s %s %s %s",
518
                     user, account, container, domain, meta, replace)
519
        if user != account:
520
            raise NotAllowedError
521
        path, node = self._lookup_container(account, container)
522
        src_version_id, dest_version_id = self._put_metadata(
523
            user, node, domain, meta, replace,
524
            update_statistics_ancestors_depth=0)
525
        if src_version_id is not None:
526
            versioning = self._get_policy(
527
                node, is_account_policy=False)['versioning']
528
            if versioning != 'auto':
529
                self.node.version_remove(src_version_id,
530
                                         update_statistics_ancestors_depth=0)
531

    
532
    @backend_method
533
    def get_container_policy(self, user, account, container):
534
        """Return a dictionary with the container policy."""
535

    
536
        logger.debug(
537
            "get_container_policy: %s %s %s", user, account, container)
538
        if user != account:
539
            if container not in self._allowed_containers(user, account):
540
                raise NotAllowedError
541
            return {}
542
        path, node = self._lookup_container(account, container)
543
        return self._get_policy(node, is_account_policy=False)
544

    
545
    @backend_method
546
    def update_container_policy(self, user, account, container, policy, replace=False):
547
        """Update the policy associated with the container."""
548

    
549
        logger.debug("update_container_policy: %s %s %s %s %s",
550
                     user, account, container, policy, replace)
551
        if user != account:
552
            raise NotAllowedError
553
        path, node = self._lookup_container(account, container)
554
        self._check_policy(policy, is_account_policy=False)
555
        self._put_policy(node, policy, replace, is_account_policy=False)
556

    
557
    @backend_method
558
    def put_container(self, user, account, container, policy=None):
559
        """Create a new container with the given name."""
560

    
561
        logger.debug(
562
            "put_container: %s %s %s %s", user, account, container, policy)
563
        policy = policy or {}
564
        if user != account:
565
            raise NotAllowedError
566
        try:
567
            path, node = self._lookup_container(account, container)
568
        except NameError:
569
            pass
570
        else:
571
            raise ContainerExists('Container already exists')
572
        if policy:
573
            self._check_policy(policy, is_account_policy=False)
574
        path = '/'.join((account, container))
575
        node = self._put_path(
576
            user, self._lookup_account(account, True)[1], path,
577
            update_statistics_ancestors_depth=-1)
578
        self._put_policy(node, policy, True, is_account_policy=False)
579

    
580
    @backend_method
581
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
582
        """Delete/purge the container with the given name."""
583

    
584
        logger.debug("delete_container: %s %s %s %s %s %s", user,
585
                     account, container, until, prefix, delimiter)
586
        if user != account:
587
            raise NotAllowedError
588
        path, node = self._lookup_container(account, container)
589

    
590
        if until is not None:
591
            hashes, size, serials = self.node.node_purge_children(
592
                node, until, CLUSTER_HISTORY,
593
                update_statistics_ancestors_depth=0)
594
            for h in hashes:
595
                self.store.map_delete(h)
596
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
597
                                          update_statistics_ancestors_depth=0)
598
            if not self.free_versioning:
599
                self._report_size_change(
600
                    user, account, -size, {
601
                        'action':'container purge',
602
                        'path': path,
603
                        'versions': ','.join(str(i) for i in serials)
604
                    }
605
                )
606
            return
607

    
608
        if not delimiter:
609
            if self._get_statistics(node)[0] > 0:
610
                raise ContainerNotEmpty('Container is not empty')
611
            hashes, size, serials = self.node.node_purge_children(
612
                node, inf, CLUSTER_HISTORY,
613
                update_statistics_ancestors_depth=0)
614
            for h in hashes:
615
                self.store.map_delete(h)
616
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
617
                                          update_statistics_ancestors_depth=0)
618
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
619
            if not self.free_versioning:
620
                self._report_size_change(
621
                    user, account, -size, {
622
                        'action':'container purge',
623
                        'path': path,
624
                        'versions': ','.join(str(i) for i in serials)
625
                    }
626
                )
627
        else:
628
            # remove only contents
629
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
630
            paths = []
631
            for t in src_names:
632
                path = '/'.join((account, container, t[0]))
633
                node = t[2]
634
                src_version_id, dest_version_id = self._put_version_duplicate(
635
                    user, node, size=0, type='', hash=None, checksum='',
636
                    cluster=CLUSTER_DELETED,
637
                    update_statistics_ancestors_depth=1)
638
                del_size = self._apply_versioning(
639
                    account, container, src_version_id,
640
                    update_statistics_ancestors_depth=1)
641
                self._report_size_change(
642
                        user, account, -del_size, {
643
                                'action': 'object delete',
644
                                'path': path,
645
                        'versions': ','.join([str(dest_version_id)])
646
                     }
647
                )
648
                self._report_object_change(
649
                    user, account, path, details={'action': 'object delete'})
650
                paths.append(path)
651
            self.permissions.access_clear_bulk(paths)
652

    
653
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
654
        if user != account and until:
655
            raise NotAllowedError
656
        if shared and public:
657
            # get shared first
658
            shared_paths = self._list_object_permissions(
659
                user, account, container, prefix, shared=True, public=False)
660
            objects = set()
661
            if shared_paths:
662
                path, node = self._lookup_container(account, container)
663
                shared_paths = self._get_formatted_paths(shared_paths)
664
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
665

    
666
            # get public
667
            objects |= set(self._list_public_object_properties(
668
                user, account, container, prefix, all_props))
669
            objects = list(objects)
670

    
671
            objects.sort(key=lambda x: x[0])
672
            start, limit = self._list_limits(
673
                [x[0] for x in objects], marker, limit)
674
            return objects[start:start + limit]
675
        elif public:
676
            objects = self._list_public_object_properties(
677
                user, account, container, prefix, all_props)
678
            start, limit = self._list_limits(
679
                [x[0] for x in objects], marker, limit)
680
            return objects[start:start + limit]
681

    
682
        allowed = self._list_object_permissions(
683
            user, account, container, prefix, shared, public)
684
        if shared and not allowed:
685
            return []
686
        path, node = self._lookup_container(account, container)
687
        allowed = self._get_formatted_paths(allowed)
688
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
689
        start, limit = self._list_limits(
690
            [x[0] for x in objects], marker, limit)
691
        return objects[start:start + limit]
692

    
693
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
694
        public = self._list_object_permissions(
695
            user, account, container, prefix, shared=False, public=True)
696
        paths, nodes = self._lookup_objects(public)
697
        path = '/'.join((account, container))
698
        cont_prefix = path + '/'
699
        paths = [x[len(cont_prefix):] for x in paths]
700
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
701
        objects = [(path,) + props for path, props in zip(paths, props)]
702
        return objects
703

    
704
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
705
        objects = []
706
        while True:
707
            marker = objects[-1] if objects else None
708
            limit = 10000
709
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
710
            objects.extend(l)
711
            if not l or len(l) < limit:
712
                break
713
        return objects
714

    
715
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
716
        allowed = []
717
        path = '/'.join((account, container, prefix)).rstrip('/')
718
        if user != account:
719
            allowed = self.permissions.access_list_paths(user, path)
720
            if not allowed:
721
                raise NotAllowedError
722
        else:
723
            allowed = set()
724
            if shared:
725
                allowed.update(self.permissions.access_list_shared(path))
726
            if public:
727
                allowed.update(
728
                    [x[0] for x in self.permissions.public_list(path)])
729
            allowed = sorted(allowed)
730
            if not allowed:
731
                return []
732
        return allowed
733

    
734
    @backend_method
735
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
736
        """Return a list of object (name, version_id) tuples existing under a container."""
737

    
738
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
739
        keys = keys or []
740
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
741

    
742
    @backend_method
743
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
744
        """Return a list of object metadata dicts existing under a container."""
745

    
746
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
747
        keys = keys or []
748
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
749
        objects = []
750
        for p in props:
751
            if len(p) == 2:
752
                objects.append({'subdir': p[0]})
753
            else:
754
                objects.append({'name': p[0],
755
                                'bytes': p[self.SIZE + 1],
756
                                'type': p[self.TYPE + 1],
757
                                'hash': p[self.HASH + 1],
758
                                'version': p[self.SERIAL + 1],
759
                                'version_timestamp': p[self.MTIME + 1],
760
                                'modified': p[self.MTIME + 1] if until is None else None,
761
                                'modified_by': p[self.MUSER + 1],
762
                                'uuid': p[self.UUID + 1],
763
                                'checksum': p[self.CHECKSUM + 1]})
764
        return objects
765

    
766
    @backend_method
767
    def list_object_permissions(self, user, account, container, prefix=''):
768
        """Return a list of paths that enforce permissions under a container."""
769

    
770
        logger.debug("list_object_permissions: %s %s %s %s", user,
771
                     account, container, prefix)
772
        return self._list_object_permissions(user, account, container, prefix, True, False)
773

    
774
    @backend_method
775
    def list_object_public(self, user, account, container, prefix=''):
776
        """Return a dict mapping paths to public ids for objects that are public under a container."""
777

    
778
        logger.debug("list_object_public: %s %s %s %s", user,
779
                     account, container, prefix)
780
        public = {}
781
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
782
            public[path] = p
783
        return public
784

    
785
    @backend_method
786
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
787
        """Return a dictionary with the object metadata for the domain."""
788

    
789
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
790
                     account, container, name, domain, version)
791
        self._can_read(user, account, container, name)
792
        path, node = self._lookup_object(account, container, name)
793
        props = self._get_version(node, version)
794
        if version is None:
795
            modified = props[self.MTIME]
796
        else:
797
            try:
798
                modified = self._get_version(
799
                    node)[self.MTIME]  # Overall last modification.
800
            except NameError:  # Object may be deleted.
801
                del_props = self.node.version_lookup(
802
                    node, inf, CLUSTER_DELETED)
803
                if del_props is None:
804
                    raise ItemNotExists('Object does not exist')
805
                modified = del_props[self.MTIME]
806

    
807
        meta = {}
808
        if include_user_defined:
809
            meta.update(
810
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
811
        meta.update({'name': name,
812
                     'bytes': props[self.SIZE],
813
                     'type': props[self.TYPE],
814
                     'hash': props[self.HASH],
815
                     'version': props[self.SERIAL],
816
                     'version_timestamp': props[self.MTIME],
817
                     'modified': modified,
818
                     'modified_by': props[self.MUSER],
819
                     'uuid': props[self.UUID],
820
                     'checksum': props[self.CHECKSUM]})
821
        return meta
822

    
823
    @backend_method
824
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
825
        """Update the metadata associated with the object for the domain and return the new version."""
826

    
827
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
828
                     user, account, container, name, domain, meta, replace)
829
        self._can_write(user, account, container, name)
830
        path, node = self._lookup_object(account, container, name)
831
        src_version_id, dest_version_id = self._put_metadata(
832
            user, node, domain, meta, replace,
833
            update_statistics_ancestors_depth=1)
834
        self._apply_versioning(account, container, src_version_id,
835
                               update_statistics_ancestors_depth=1)
836
        return dest_version_id
837

    
838
    @backend_method
839
    def get_object_permissions(self, user, account, container, name):
840
        """Return the action allowed on the object, the path
841
        from which the object gets its permissions from,
842
        along with a dictionary containing the permissions."""
843

    
844
        logger.debug("get_object_permissions: %s %s %s %s", user,
845
                     account, container, name)
846
        allowed = 'write'
847
        permissions_path = self._get_permissions_path(account, container, name)
848
        if user != account:
849
            if self.permissions.access_check(permissions_path, self.WRITE, user):
850
                allowed = 'write'
851
            elif self.permissions.access_check(permissions_path, self.READ, user):
852
                allowed = 'read'
853
            else:
854
                raise NotAllowedError
855
        self._lookup_object(account, container, name)
856
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
857

    
858
    @backend_method
859
    def update_object_permissions(self, user, account, container, name, permissions):
860
        """Update the permissions associated with the object."""
861

    
862
        logger.debug("update_object_permissions: %s %s %s %s %s",
863
                     user, account, container, name, permissions)
864
        if user != account:
865
            raise NotAllowedError
866
        path = self._lookup_object(account, container, name)[0]
867
        self._check_permissions(path, permissions)
868
        self.permissions.access_set(path, permissions)
869
        self._report_sharing_change(user, account, path, {'members':
870
                                    self.permissions.access_members(path)})
871

    
872
    @backend_method
873
    def get_object_public(self, user, account, container, name):
874
        """Return the public id of the object if applicable."""
875

    
876
        logger.debug(
877
            "get_object_public: %s %s %s %s", user, account, container, name)
878
        self._can_read(user, account, container, name)
879
        path = self._lookup_object(account, container, name)[0]
880
        p = self.permissions.public_get(path)
881
        return p
882

    
883
    @backend_method
884
    def update_object_public(self, user, account, container, name, public):
885
        """Update the public status of the object."""
886

    
887
        logger.debug("update_object_public: %s %s %s %s %s", user,
888
                     account, container, name, public)
889
        self._can_write(user, account, container, name)
890
        path = self._lookup_object(account, container, name)[0]
891
        if not public:
892
            self.permissions.public_unset(path)
893
        else:
894
            self.permissions.public_set(
895
                path, self.public_url_security, self.public_url_alphabet
896
            )
897

    
898
    @backend_method
899
    def get_object_hashmap(self, user, account, container, name, version=None):
900
        """Return the object's size and a list with partial hashes."""
901

    
902
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
903
                     account, container, name, version)
904
        self._can_read(user, account, container, name)
905
        path, node = self._lookup_object(account, container, name)
906
        props = self._get_version(node, version)
907
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
908
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
909

    
910
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
911
        if permissions is not None and user != account:
912
            raise NotAllowedError
913
        self._can_write(user, account, container, name)
914
        if permissions is not None:
915
            path = '/'.join((account, container, name))
916
            self._check_permissions(path, permissions)
917

    
918
        account_path, account_node = self._lookup_account(account, True)
919
        container_path, container_node = self._lookup_container(
920
            account, container)
921

    
922
        path, node = self._put_object_node(
923
            container_path, container_node, name)
924
        pre_version_id, dest_version_id = self._put_version_duplicate(
925
            user, node, src_node=src_node, size=size, type=type, hash=hash,
926
            checksum=checksum, is_copy=is_copy,
927
            update_statistics_ancestors_depth=1)
928

    
929
        # Handle meta.
930
        if src_version_id is None:
931
            src_version_id = pre_version_id
932
        self._put_metadata_duplicate(
933
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
934

    
935
        del_size = self._apply_versioning(account, container, pre_version_id,
936
                                          update_statistics_ancestors_depth=1)
937
        size_delta = size - del_size
938
        if size_delta > 0:
939
            # Check account quota.
940
            if not self.using_external_quotaholder:
941
                account_quota = long(
942
                    self._get_policy(account_node, is_account_policy=True
943
                    )['quota']
944
                )
945
                account_usage = self._get_statistics(account_node, compute=True)[1]
946
                if (account_quota > 0 and account_usage > account_quota):
947
                    raise QuotaError(
948
                        'Account quota exceeded: limit: %s, usage: %s' % (
949
                            account_quota, account_usage
950
                        )
951
                    )
952

    
953
            # Check container quota.
954
            container_quota = long(
955
                self._get_policy(container_node, is_account_policy=False
956
                )['quota']
957
            )
958
            container_usage = self._get_statistics(container_node)[1]
959
            if (container_quota > 0 and container_usage > container_quota):
960
                # This must be executed in a transaction, so the version is
961
                # never created if it fails.
962
                raise QuotaError(
963
                    'Container quota exceeded: limit: %s, usage: %s' % (
964
                        container_quota, container_usage
965
                    )
966
                )
967

    
968
        self._report_size_change(user, account, size_delta,
969
                                 {'action': 'object update', 'path': path,
970
                                  'versions': ','.join([str(dest_version_id)])})
971
        if permissions is not None:
972
            self.permissions.access_set(path, permissions)
973
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
974

    
975
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
976
        return dest_version_id
977

    
978
    @backend_method
979
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
980
        """Create/update an object with the specified size and partial hashes."""
981

    
982
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
983
                     account, container, name, size, type, hashmap, checksum)
984
        meta = meta or {}
985
        if size == 0:  # No such thing as an empty hashmap.
986
            hashmap = [self.put_block('')]
987
        map = HashMap(self.block_size, self.hash_algorithm)
988
        map.extend([binascii.unhexlify(x) for x in hashmap])
989
        missing = self.store.block_search(map)
990
        if missing:
991
            ie = IndexError()
992
            ie.data = [binascii.hexlify(x) for x in missing]
993
            raise ie
994

    
995
        hash = map.hash()
996
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
997
        self.store.map_put(hash, map)
998
        return dest_version_id
999

    
1000
    @backend_method
1001
    def update_object_checksum(self, user, account, container, name, version, checksum):
1002
        """Update an object's checksum."""
1003

    
1004
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
1005
                     user, account, container, name, version, checksum)
1006
        # Update objects with greater version and same hashmap and size (fix metadata updates).
1007
        self._can_write(user, account, container, name)
1008
        path, node = self._lookup_object(account, container, name)
1009
        props = self._get_version(node, version)
1010
        versions = self.node.node_get_versions(node)
1011
        for x in versions:
1012
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
1013
                self.node.version_put_property(
1014
                    x[self.SERIAL], 'checksum', checksum)
1015

    
1016
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
1017
        dest_meta = dest_meta or {}
1018
        dest_version_ids = []
1019
        self._can_read(user, src_account, src_container, src_name)
1020
        path, node = self._lookup_object(src_account, src_container, src_name)
1021
        # TODO: Will do another fetch of the properties in duplicate version...
1022
        props = self._get_version(
1023
            node, src_version)  # Check to see if source exists.
1024
        src_version_id = props[self.SERIAL]
1025
        hash = props[self.HASH]
1026
        size = props[self.SIZE]
1027
        is_copy = not is_move and (src_account, src_container, src_name) != (
1028
            dest_account, dest_container, dest_name)  # New uuid.
1029
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1030
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1031
            self._delete_object(user, src_account, src_container, src_name)
1032

    
1033
        if delimiter:
1034
            prefix = src_name + \
1035
                delimiter if not src_name.endswith(delimiter) else src_name
1036
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1037
            src_names.sort(key=lambda x: x[2])  # order by nodes
1038
            paths = [elem[0] for elem in src_names]
1039
            nodes = [elem[2] for elem in src_names]
1040
            # TODO: Will do another fetch of the properties in duplicate version...
1041
            props = self._get_versions(nodes)  # Check to see if source exists.
1042

    
1043
            for prop, path, node in zip(props, paths, nodes):
1044
                src_version_id = prop[self.SERIAL]
1045
                hash = prop[self.HASH]
1046
                vtype = prop[self.TYPE]
1047
                size = prop[self.SIZE]
1048
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1049
                    delimiter) else dest_name
1050
                vdest_name = path.replace(prefix, dest_prefix, 1)
1051
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1052
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1053
                    self._delete_object(user, src_account, src_container, path)
1054
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1055

    
1056
    @backend_method
1057
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1058
        """Copy an object's data and metadata."""
1059

    
1060
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1061
        meta = meta or {}
1062
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1063
        return dest_version_id
1064

    
1065
    @backend_method
1066
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1067
        """Move an object's data and metadata."""
1068

    
1069
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1070
        meta = meta or {}
1071
        if user != src_account:
1072
            raise NotAllowedError
1073
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1074
        return dest_version_id
1075

    
1076
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1077
        if user != account:
1078
            raise NotAllowedError
1079

    
1080
        if until is not None:
1081
            path = '/'.join((account, container, name))
1082
            node = self.node.node_lookup(path)
1083
            if node is None:
1084
                return
1085
            hashes = []
1086
            size = 0
1087
            serials = []
1088
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1089
                                           update_statistics_ancestors_depth=1)
1090
            hashes += h
1091
            size += s
1092
            serials += v
1093
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1094
                                           update_statistics_ancestors_depth=1)
1095
            hashes += h
1096
            if not self.free_versioning:
1097
                size += s
1098
            serials += v
1099
            for h in hashes:
1100
                self.store.map_delete(h)
1101
            self.node.node_purge(node, until, CLUSTER_DELETED,
1102
                                 update_statistics_ancestors_depth=1)
1103
            try:
1104
                props = self._get_version(node)
1105
            except NameError:
1106
                self.permissions.access_clear(path)
1107
            self._report_size_change(
1108
                user, account, -size, {
1109
                    'action': 'object purge',
1110
                    'path': path,
1111
                    'versions': ','.join(str(i) for i in serials)
1112
                }
1113
            )
1114
            return
1115

    
1116
        path, node = self._lookup_object(account, container, name)
1117
        src_version_id, dest_version_id = self._put_version_duplicate(
1118
            user, node, size=0, type='', hash=None, checksum='',
1119
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1120
        del_size = self._apply_versioning(account, container, src_version_id,
1121
                                          update_statistics_ancestors_depth=1)
1122
        self._report_size_change(user, account, -del_size,
1123
                                 {'action': 'object delete', 'path': path,
1124
                                  'versions': ','.join([str(dest_version_id)])})
1125
        self._report_object_change(
1126
            user, account, path, details={'action': 'object delete'})
1127
        self.permissions.access_clear(path)
1128

    
1129
        if delimiter:
1130
            prefix = name + delimiter if not name.endswith(delimiter) else name
1131
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1132
            paths = []
1133
            for t in src_names:
1134
                path = '/'.join((account, container, t[0]))
1135
                node = t[2]
1136
                src_version_id, dest_version_id = self._put_version_duplicate(
1137
                    user, node, size=0, type='', hash=None, checksum='',
1138
                    cluster=CLUSTER_DELETED,
1139
                    update_statistics_ancestors_depth=1)
1140
                del_size = self._apply_versioning(
1141
                    account, container, src_version_id,
1142
                    update_statistics_ancestors_depth=1)
1143
                self._report_size_change(user, account, -del_size,
1144
                                         {'action': 'object delete',
1145
                                          'path': path,
1146
                                          'versions': ','.join([str(dest_version_id)])})
1147
                self._report_object_change(
1148
                    user, account, path, details={'action': 'object delete'})
1149
                paths.append(path)
1150
            self.permissions.access_clear_bulk(paths)
1151

    
1152
    @backend_method
1153
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1154
        """Delete/purge an object."""
1155

    
1156
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1157
                     account, container, name, until, prefix, delimiter)
1158
        self._delete_object(user, account, container, name, until, delimiter)
1159

    
1160
    @backend_method
1161
    def list_versions(self, user, account, container, name):
1162
        """Return a list of all (version, version_timestamp) tuples for an object."""
1163

    
1164
        logger.debug(
1165
            "list_versions: %s %s %s %s", user, account, container, name)
1166
        self._can_read(user, account, container, name)
1167
        path, node = self._lookup_object(account, container, name)
1168
        versions = self.node.node_get_versions(node)
1169
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1170

    
1171
    @backend_method
1172
    def get_uuid(self, user, uuid):
1173
        """Return the (account, container, name) for the UUID given."""
1174

    
1175
        logger.debug("get_uuid: %s %s", user, uuid)
1176
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1177
        if info is None:
1178
            raise NameError
1179
        path, serial = info
1180
        account, container, name = path.split('/', 2)
1181
        self._can_read(user, account, container, name)
1182
        return (account, container, name)
1183

    
1184
    @backend_method
1185
    def get_public(self, user, public):
1186
        """Return the (account, container, name) for the public id given."""
1187

    
1188
        logger.debug("get_public: %s %s", user, public)
1189
        path = self.permissions.public_path(public)
1190
        if path is None:
1191
            raise NameError
1192
        account, container, name = path.split('/', 2)
1193
        self._can_read(user, account, container, name)
1194
        return (account, container, name)
1195

    
1196
    @backend_method(autocommit=0)
1197
    def get_block(self, hash):
1198
        """Return a block's data."""
1199

    
1200
        logger.debug("get_block: %s", hash)
1201
        block = self.store.block_get(binascii.unhexlify(hash))
1202
        if not block:
1203
            raise ItemNotExists('Block does not exist')
1204
        return block
1205

    
1206
    @backend_method(autocommit=0)
1207
    def put_block(self, data):
1208
        """Store a block and return the hash."""
1209

    
1210
        logger.debug("put_block: %s", len(data))
1211
        return binascii.hexlify(self.store.block_put(data))
1212

    
1213
    @backend_method(autocommit=0)
1214
    def update_block(self, hash, data, offset=0):
1215
        """Update a known block and return the hash."""
1216

    
1217
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1218
        if offset == 0 and len(data) == self.block_size:
1219
            return self.put_block(data)
1220
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1221
        return binascii.hexlify(h)
1222

    
1223
    # Path functions.
1224

    
1225
    def _generate_uuid(self):
1226
        return str(uuidlib.uuid4())
1227

    
1228
    def _put_object_node(self, path, parent, name):
1229
        path = '/'.join((path, name))
1230
        node = self.node.node_lookup(path)
1231
        if node is None:
1232
            node = self.node.node_create(parent, path)
1233
        return path, node
1234

    
1235
    def _put_path(self, user, parent, path,
1236
                  update_statistics_ancestors_depth=None):
1237
        node = self.node.node_create(parent, path)
1238
        self.node.version_create(node, None, 0, '', None, user,
1239
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1240
                                 update_statistics_ancestors_depth)
1241
        return node
1242

    
1243
    def _lookup_account(self, account, create=True):
1244
        node = self.node.node_lookup(account)
1245
        if node is None and create:
1246
            node = self._put_path(
1247
                account, self.ROOTNODE, account,
1248
                update_statistics_ancestors_depth=-1)  # User is account.
1249
        return account, node
1250

    
1251
    def _lookup_container(self, account, container):
1252
        path = '/'.join((account, container))
1253
        node = self.node.node_lookup(path, for_update=True)
1254
        if node is None:
1255
            raise ItemNotExists('Container does not exist')
1256
        return path, node
1257

    
1258
    def _lookup_object(self, account, container, name):
1259
        path = '/'.join((account, container, name))
1260
        node = self.node.node_lookup(path)
1261
        if node is None:
1262
            raise ItemNotExists('Object does not exist')
1263
        return path, node
1264

    
1265
    def _lookup_objects(self, paths):
1266
        nodes = self.node.node_lookup_bulk(paths)
1267
        return paths, nodes
1268

    
1269
    def _get_properties(self, node, until=None):
1270
        """Return properties until the timestamp given."""
1271

    
1272
        before = until if until is not None else inf
1273
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1274
        if props is None and until is not None:
1275
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1276
        if props is None:
1277
            raise ItemNotExists('Path does not exist')
1278
        return props
1279

    
1280
    def _get_statistics(self, node, until=None, compute=False):
1281
        """Return count, sum of size and latest timestamp of everything under node."""
1282

    
1283
        if until is not None:
1284
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1285
        elif compute:
1286
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1287
        else:
1288
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1289
        if stats is None:
1290
            stats = (0, 0, 0)
1291
        return stats
1292

    
1293
    def _get_version(self, node, version=None):
1294
        if version is None:
1295
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1296
            if props is None:
1297
                raise ItemNotExists('Object does not exist')
1298
        else:
1299
            try:
1300
                version = int(version)
1301
            except ValueError:
1302
                raise VersionNotExists('Version does not exist')
1303
            props = self.node.version_get_properties(version, node=node)
1304
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1305
                raise VersionNotExists('Version does not exist')
1306
        return props
1307

    
1308
    def _get_versions(self, nodes):
1309
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1310

    
1311
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1312
                               type=None, hash=None, checksum=None,
1313
                               cluster=CLUSTER_NORMAL, is_copy=False,
1314
                               update_statistics_ancestors_depth=None):
1315
        """Create a new version of the node."""
1316

    
1317
        props = self.node.version_lookup(
1318
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1319
        if props is not None:
1320
            src_version_id = props[self.SERIAL]
1321
            src_hash = props[self.HASH]
1322
            src_size = props[self.SIZE]
1323
            src_type = props[self.TYPE]
1324
            src_checksum = props[self.CHECKSUM]
1325
        else:
1326
            src_version_id = None
1327
            src_hash = None
1328
            src_size = 0
1329
            src_type = ''
1330
            src_checksum = ''
1331
        if size is None:  # Set metadata.
1332
            hash = src_hash  # This way hash can be set to None (account or container).
1333
            size = src_size
1334
        if type is None:
1335
            type = src_type
1336
        if checksum is None:
1337
            checksum = src_checksum
1338
        uuid = self._generate_uuid(
1339
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1340

    
1341
        if src_node is None:
1342
            pre_version_id = src_version_id
1343
        else:
1344
            pre_version_id = None
1345
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1346
            if props is not None:
1347
                pre_version_id = props[self.SERIAL]
1348
        if pre_version_id is not None:
1349
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1350
                                        update_statistics_ancestors_depth)
1351

    
1352
        dest_version_id, mtime = self.node.version_create(
1353
            node, hash, size, type, src_version_id, user, uuid, checksum,
1354
            cluster, update_statistics_ancestors_depth)
1355

    
1356
        self.node.attribute_unset_is_latest(node, dest_version_id)
1357

    
1358
        return pre_version_id, dest_version_id
1359

    
1360
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1361
                                node, meta, replace=False):
1362
        if src_version_id is not None:
1363
            self.node.attribute_copy(src_version_id, dest_version_id)
1364
        if not replace:
1365
            self.node.attribute_del(dest_version_id, domain, (
1366
                k for k, v in meta.iteritems() if v == ''))
1367
            self.node.attribute_set(dest_version_id, domain, node, (
1368
                (k, v) for k, v in meta.iteritems() if v != ''))
1369
        else:
1370
            self.node.attribute_del(dest_version_id, domain)
1371
            self.node.attribute_set(dest_version_id, domain, node, ((
1372
                k, v) for k, v in meta.iteritems()))
1373

    
1374
    def _put_metadata(self, user, node, domain, meta, replace=False,
1375
                      update_statistics_ancestors_depth=None):
1376
        """Create a new version and store metadata."""
1377

    
1378
        src_version_id, dest_version_id = self._put_version_duplicate(
1379
            user, node,
1380
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1381
        self._put_metadata_duplicate(
1382
            src_version_id, dest_version_id, domain, node, meta, replace)
1383
        return src_version_id, dest_version_id
1384

    
1385
    def _list_limits(self, listing, marker, limit):
1386
        start = 0
1387
        if marker:
1388
            try:
1389
                start = listing.index(marker) + 1
1390
            except ValueError:
1391
                pass
1392
        if not limit or limit > 10000:
1393
            limit = 10000
1394
        return start, limit
1395

    
1396
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1397
        keys = keys or []
1398
        allowed = allowed or []
1399
        cont_prefix = path + '/'
1400
        prefix = cont_prefix + prefix
1401
        start = cont_prefix + marker if marker else None
1402
        before = until if until is not None else inf
1403
        filterq = keys if domain else []
1404
        sizeq = size_range
1405

    
1406
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1407
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1408
        objects.sort(key=lambda x: x[0])
1409
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1410
        return objects
1411

    
1412
    # Reporting functions.
1413

    
1414
    def _report_size_change(self, user, account, size, details=None):
1415
        details = details or {}
1416

    
1417
        if size == 0:
1418
            return
1419

    
1420
        account_node = self._lookup_account(account, True)[1]
1421
        total = self._get_statistics(account_node, compute=True)[1]
1422
        details.update({'user': user, 'total': total})
1423
        logger.debug(
1424
            "_report_size_change: %s %s %s %s", user, account, size, details)
1425
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1426
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1427
                              float(size), details))
1428

    
1429
        if not self.using_external_quotaholder:
1430
            return
1431

    
1432
        try:
1433
            name = details['path'] if 'path' in details else ''
1434
            serial = self.astakosclient.issue_one_commission(
1435
                token=self.service_token,
1436
                holder=account,
1437
                source=DEFAULT_SOURCE,
1438
                provisions={'pithos.diskspace': size},
1439
                name=name
1440
                )
1441
        except BaseException, e:
1442
            raise QuotaError(e)
1443
        else:
1444
            self.serials.append(serial)
1445

    
1446
    def _report_object_change(self, user, account, path, details=None):
1447
        details = details or {}
1448
        details.update({'user': user})
1449
        logger.debug("_report_object_change: %s %s %s %s", user,
1450
                     account, path, details)
1451
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1452
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1453

    
1454
    def _report_sharing_change(self, user, account, path, details=None):
1455
        logger.debug("_report_permissions_change: %s %s %s %s",
1456
                     user, account, path, details)
1457
        details = details or {}
1458
        details.update({'user': user})
1459
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1460
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1461

    
1462
    # Policy functions.
1463

    
1464
    def _check_policy(self, policy, is_account_policy=True):
1465
        default_policy = self.default_account_policy \
1466
            if is_account_policy else self.default_container_policy
1467
        for k in policy.keys():
1468
            if policy[k] == '':
1469
                policy[k] = default_policy.get(k)
1470
        for k, v in policy.iteritems():
1471
            if k == 'quota':
1472
                q = int(v)  # May raise ValueError.
1473
                if q < 0:
1474
                    raise ValueError
1475
            elif k == 'versioning':
1476
                if v not in ['auto', 'none']:
1477
                    raise ValueError
1478
            else:
1479
                raise ValueError
1480

    
1481
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1482
        default_policy = self.default_account_policy \
1483
            if is_account_policy else self.default_container_policy
1484
        if replace:
1485
            for k, v in default_policy.iteritems():
1486
                if k not in policy:
1487
                    policy[k] = v
1488
        self.node.policy_set(node, policy)
1489

    
1490
    def _get_policy(self, node, is_account_policy=True):
1491
        default_policy = self.default_account_policy \
1492
            if is_account_policy else self.default_container_policy
1493
        policy = default_policy.copy()
1494
        policy.update(self.node.policy_get(node))
1495
        return policy
1496

    
1497
    def _apply_versioning(self, account, container, version_id,
1498
                          update_statistics_ancestors_depth=None):
1499
        """Delete the provided version if such is the policy.
1500
           Return size of object removed.
1501
        """
1502

    
1503
        if version_id is None:
1504
            return 0
1505
        path, node = self._lookup_container(account, container)
1506
        versioning = self._get_policy(
1507
            node, is_account_policy=False)['versioning']
1508
        if versioning != 'auto':
1509
            hash, size = self.node.version_remove(
1510
                version_id, update_statistics_ancestors_depth)
1511
            self.store.map_delete(hash)
1512
            return size
1513
        elif self.free_versioning:
1514
            return self.node.version_get_properties(
1515
                version_id, keys=('size',))[0]
1516
        return 0
1517

    
1518
    # Access control functions.
1519

    
1520
    def _check_groups(self, groups):
1521
        # raise ValueError('Bad characters in groups')
1522
        pass
1523

    
1524
    def _check_permissions(self, path, permissions):
1525
        # raise ValueError('Bad characters in permissions')
1526
        pass
1527

    
1528
    def _get_formatted_paths(self, paths):
1529
        formatted = []
1530
        for p in paths:
1531
            node = self.node.node_lookup(p)
1532
            props = None
1533
            if node is not None:
1534
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1535
            if props is not None:
1536
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1537
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1538
                formatted.append((p, self.MATCH_EXACT))
1539
        return formatted
1540

    
1541
    def _get_permissions_path(self, account, container, name):
1542
        path = '/'.join((account, container, name))
1543
        permission_paths = self.permissions.access_inherit(path)
1544
        permission_paths.sort()
1545
        permission_paths.reverse()
1546
        for p in permission_paths:
1547
            if p == path:
1548
                return p
1549
            else:
1550
                if p.count('/') < 2:
1551
                    continue
1552
                node = self.node.node_lookup(p)
1553
                props = None
1554
                if node is not None:
1555
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1556
                if props is not None:
1557
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1558
                        return p
1559
        return None
1560

    
1561
    def _can_read(self, user, account, container, name):
1562
        if user == account:
1563
            return True
1564
        path = '/'.join((account, container, name))
1565
        if self.permissions.public_get(path) is not None:
1566
            return True
1567
        path = self._get_permissions_path(account, container, name)
1568
        if not path:
1569
            raise NotAllowedError
1570
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1571
            raise NotAllowedError
1572

    
1573
    def _can_write(self, user, account, container, name):
1574
        if user == account:
1575
            return True
1576
        path = '/'.join((account, container, name))
1577
        path = self._get_permissions_path(account, container, name)
1578
        if not path:
1579
            raise NotAllowedError
1580
        if not self.permissions.access_check(path, self.WRITE, user):
1581
            raise NotAllowedError
1582

    
1583
    def _allowed_accounts(self, user):
1584
        allow = set()
1585
        for path in self.permissions.access_list_paths(user):
1586
            allow.add(path.split('/', 1)[0])
1587
        return sorted(allow)
1588

    
1589
    def _allowed_containers(self, user, account):
1590
        allow = set()
1591
        for path in self.permissions.access_list_paths(user, account):
1592
            allow.add(path.split('/', 2)[1])
1593
        return sorted(allow)
1594

    
1595
    # Domain functions
1596

    
1597
    @backend_method
1598
    def get_domain_objects(self, domain, user=None):
1599
        allowed_paths = self.permissions.access_list_paths(
1600
            user, include_owned=user is not None, include_containers=False)
1601
        if not allowed_paths:
1602
            return []
1603
        obj_list = self.node.domain_object_list(
1604
            domain, allowed_paths, CLUSTER_NORMAL)
1605
        return [(path,
1606
                 self._build_metadata(props, user_defined_meta),
1607
                 self.permissions.access_get(path)) for
1608
                path, props, user_defined_meta in obj_list]
1609

    
1610
    # util functions
1611

    
1612
    def _build_metadata(self, props, user_defined=None,
1613
                        include_user_defined=True):
1614
        meta = {'bytes': props[self.SIZE],
1615
                'type': props[self.TYPE],
1616
                'hash': props[self.HASH],
1617
                'version': props[self.SERIAL],
1618
                'version_timestamp': props[self.MTIME],
1619
                'modified_by': props[self.MUSER],
1620
                'uuid': props[self.UUID],
1621
                'checksum': props[self.CHECKSUM]}
1622
        if include_user_defined and user_defined != None:
1623
            meta.update(user_defined)
1624
        return meta
1625

    
1626
    def _has_read_access(self, user, path):
1627
        try:
1628
            account, container, object = path.split('/', 2)
1629
        except ValueError:
1630
            raise ValueError('Invalid object path')
1631

    
1632
        assert isinstance(user, basestring), "Invalid user"
1633

    
1634
        try:
1635
            self._can_read(user, account, container, object)
1636
        except NotAllowedError:
1637
            return False
1638
        else:
1639
            return True