Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ c69d8202

History | View | Annotate | Download (71.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
98
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
99
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
100
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
101
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
102
                               'abcdefghijklmnopqrstuvwxyz'
103
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
104
DEFAULT_PUBLIC_URL_SECURITY = 16
105

    
106
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
107
QUEUE_CLIENT_ID = 'pithos'
108
QUEUE_INSTANCE_ID = '1'
109

    
110
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
111

    
112
inf = float('inf')
113

    
114
ULTIMATE_ANSWER = 42
115

    
116
DEFAULT_SOURCE = 'system'
117

    
118
logger = logging.getLogger(__name__)
119

    
120

    
121
def backend_method(func=None, autocommit=1):
122
    if func is None:
123
        def fn(func):
124
            return backend_method(func, autocommit)
125
        return fn
126

    
127
    if not autocommit:
128
        return func
129

    
130
    def fn(self, *args, **kw):
131
        self.wrapper.execute()
132
        serials = []
133
        self.serials = serials
134
        self.messages = []
135

    
136
        try:
137
            ret = func(self, *args, **kw)
138
            for m in self.messages:
139
                self.queue.send(*m)
140
            if self.serials:
141
                self.commission_serials.insert_many(self.serials)
142

    
143
                # commit to ensure that the serials are registered
144
                # even if accept commission fails
145
                self.wrapper.commit()
146
                self.wrapper.execute()
147

    
148
                r = self.astakosclient.resolve_commissions(
149
                            token=self.service_token,
150
                            accept_serials=self.serials,
151
                            reject_serials=[])
152
                self.commission_serials.delete_many(r['accepted'])
153

    
154
            self.wrapper.commit()
155
            return ret
156
        except:
157
            if self.serials:
158
                self.astakosclient.resolve_commissions(
159
                    token=self.service_token,
160
                    accept_serials=[],
161
                    reject_serials=self.serials)
162
            self.wrapper.rollback()
163
            raise
164
    return fn
165

    
166

    
167
class ModularBackend(BaseBackend):
168
    """A modular backend.
169

170
    Uses modules for SQL functions and storage.
171
    """
172

    
173
    def __init__(self, db_module=None, db_connection=None,
174
                 block_module=None, block_path=None, block_umask=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
191
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
192
        container_quota_policy = container_quota_policy \
193
            or DEFAULT_CONTAINER_QUOTA
194
        container_versioning_policy = container_versioning_policy \
195
            or DEFAULT_CONTAINER_VERSIONING
196

    
197
        self.default_account_policy = {'quota': account_quota_policy}
198
        self.default_container_policy = {
199
            'quota': container_quota_policy,
200
            'versioning': container_versioning_policy
201
        }
202
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
203
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
204

    
205
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
206
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
207

    
208
        self.hash_algorithm = 'sha256'
209
        self.block_size = 4 * 1024 * 1024  # 4MB
210
        self.free_versioning = free_versioning
211

    
212
        def load_module(m):
213
            __import__(m)
214
            return sys.modules[m]
215

    
216
        self.db_module = load_module(db_module)
217
        self.wrapper = self.db_module.DBWrapper(db_connection)
218
        params = {'wrapper': self.wrapper}
219
        self.permissions = self.db_module.Permissions(**params)
220
        self.config = self.db_module.Config(**params)
221
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
222
        for x in ['READ', 'WRITE']:
223
            setattr(self, x, getattr(self.db_module, x))
224
        self.node = self.db_module.Node(**params)
225
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
226
            setattr(self, x, getattr(self.db_module, x))
227

    
228
        self.block_module = load_module(block_module)
229
        self.block_params = block_params
230
        params = {'path': block_path,
231
                  'block_size': self.block_size,
232
                  'hash_algorithm': self.hash_algorithm,
233
                  'umask': block_umask}
234
        params.update(self.block_params)
235
        self.store = self.block_module.Store(**params)
236

    
237
        if queue_module and queue_hosts:
238
            self.queue_module = load_module(queue_module)
239
            params = {'hosts': queue_hosts,
240
                      'exchange': queue_exchange,
241
                      'client_id': QUEUE_CLIENT_ID}
242
            self.queue = self.queue_module.Queue(**params)
243
        else:
244
            class NoQueue:
245
                def send(self, *args):
246
                    pass
247

    
248
                def close(self):
249
                    pass
250

    
251
            self.queue = NoQueue()
252

    
253
        self.astakos_url = astakos_url
254
        self.service_token = service_token
255

    
256
        if not astakos_url or not AstakosClient:
257
            self.astakosclient = DisabledAstakosClient(
258
                astakos_url,
259
                use_pool=True,
260
                pool_size=astakosclient_poolsize)
261
        else:
262
            self.astakosclient = AstakosClient(
263
                astakos_url,
264
                use_pool=True,
265
                pool_size=astakosclient_poolsize)
266

    
267
        self.serials = []
268
        self.messages = []
269

    
270
    def close(self):
271
        self.wrapper.close()
272
        self.queue.close()
273

    
274
    @property
275
    def using_external_quotaholder(self):
276
        return not isinstance(self.astakosclient, DisabledAstakosClient)
277

    
278
    @backend_method
279
    def list_accounts(self, user, marker=None, limit=10000):
280
        """Return a list of accounts the user can access."""
281

    
282
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
283
        allowed = self._allowed_accounts(user)
284
        start, limit = self._list_limits(allowed, marker, limit)
285
        return allowed[start:start + limit]
286

    
287
    @backend_method
288
    def get_account_meta(
289
            self, user, account, domain, until=None, include_user_defined=True,
290
            external_quota=None):
291
        """Return a dictionary with the account metadata for the domain."""
292

    
293
        logger.debug(
294
            "get_account_meta: %s %s %s %s", user, account, domain, until)
295
        path, node = self._lookup_account(account, user == account)
296
        if user != account:
297
            if until or node is None or account not in self._allowed_accounts(user):
298
                raise NotAllowedError
299
        try:
300
            props = self._get_properties(node, until)
301
            mtime = props[self.MTIME]
302
        except NameError:
303
            props = None
304
            mtime = until
305
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
306
        tstamp = max(tstamp, mtime)
307
        if until is None:
308
            modified = tstamp
309
        else:
310
            modified = self._get_statistics(
311
                node, compute=True)[2]  # Overall last modification.
312
            modified = max(modified, mtime)
313

    
314
        if user != account:
315
            meta = {'name': account}
316
        else:
317
            meta = {}
318
            if props is not None and include_user_defined:
319
                meta.update(
320
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
321
            if until is not None:
322
                meta.update({'until_timestamp': tstamp})
323
            meta.update({'name': account, 'count': count, 'bytes': bytes})
324
            if self.using_external_quotaholder:
325
                external_quota = external_quota or {}
326
                meta['bytes'] = external_quota.get('usage', 0)
327
        meta.update({'modified': modified})
328
        return meta
329

    
330
    @backend_method
331
    def update_account_meta(self, user, account, domain, meta, replace=False):
332
        """Update the metadata associated with the account for the domain."""
333

    
334
        logger.debug("update_account_meta: %s %s %s %s %s", user,
335
                     account, domain, meta, replace)
336
        if user != account:
337
            raise NotAllowedError
338
        path, node = self._lookup_account(account, True)
339
        self._put_metadata(user, node, domain, meta, replace,
340
                           update_statistics_ancestors_depth=-1)
341

    
342
    @backend_method
343
    def get_account_groups(self, user, account):
344
        """Return a dictionary with the user groups defined for this account."""
345

    
346
        logger.debug("get_account_groups: %s %s", user, account)
347
        if user != account:
348
            if account not in self._allowed_accounts(user):
349
                raise NotAllowedError
350
            return {}
351
        self._lookup_account(account, True)
352
        return self.permissions.group_dict(account)
353

    
354
    @backend_method
355
    def update_account_groups(self, user, account, groups, replace=False):
356
        """Update the groups associated with the account."""
357

    
358
        logger.debug("update_account_groups: %s %s %s %s", user,
359
                     account, groups, replace)
360
        if user != account:
361
            raise NotAllowedError
362
        self._lookup_account(account, True)
363
        self._check_groups(groups)
364
        if replace:
365
            self.permissions.group_destroy(account)
366
        for k, v in groups.iteritems():
367
            if not replace:  # If not already deleted.
368
                self.permissions.group_delete(account, k)
369
            if v:
370
                self.permissions.group_addmany(account, k, v)
371

    
372
    @backend_method
373
    def get_account_policy(self, user, account, external_quota=None):
374
        """Return a dictionary with the account policy."""
375

    
376
        logger.debug("get_account_policy: %s %s", user, account)
377
        if user != account:
378
            if account not in self._allowed_accounts(user):
379
                raise NotAllowedError
380
            return {}
381
        path, node = self._lookup_account(account, True)
382
        policy = self._get_policy(node, is_account_policy=True)
383
        if self.using_external_quotaholder:
384
            external_quota = external_quota or {}
385
            policy['quota'] = external_quota.get('limit', 0)
386
        return policy
387

    
388
    @backend_method
389
    def update_account_policy(self, user, account, policy, replace=False):
390
        """Update the policy associated with the account."""
391

    
392
        logger.debug("update_account_policy: %s %s %s %s", user,
393
                     account, policy, replace)
394
        if user != account:
395
            raise NotAllowedError
396
        path, node = self._lookup_account(account, True)
397
        self._check_policy(policy, is_account_policy=True)
398
        self._put_policy(node, policy, replace, is_account_policy=True)
399

    
400
    @backend_method
401
    def put_account(self, user, account, policy=None):
402
        """Create a new account with the given name."""
403

    
404
        logger.debug("put_account: %s %s %s", user, account, policy)
405
        policy = policy or {}
406
        if user != account:
407
            raise NotAllowedError
408
        node = self.node.node_lookup(account)
409
        if node is not None:
410
            raise AccountExists('Account already exists')
411
        if policy:
412
            self._check_policy(policy, is_account_policy=True)
413
        node = self._put_path(user, self.ROOTNODE, account,
414
                              update_statistics_ancestors_depth=-1)
415
        self._put_policy(node, policy, True, is_account_policy=True)
416

    
417
    @backend_method
418
    def delete_account(self, user, account):
419
        """Delete the account with the given name."""
420

    
421
        logger.debug("delete_account: %s %s", user, account)
422
        if user != account:
423
            raise NotAllowedError
424
        node = self.node.node_lookup(account)
425
        if node is None:
426
            return
427
        if not self.node.node_remove(node,
428
                                     update_statistics_ancestors_depth=-1):
429
            raise AccountNotEmpty('Account is not empty')
430
        self.permissions.group_destroy(account)
431

    
432
    @backend_method
433
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
434
        """Return a list of containers existing under an account."""
435

    
436
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
437
                     account, marker, limit, shared, until, public)
438
        if user != account:
439
            if until or account not in self._allowed_accounts(user):
440
                raise NotAllowedError
441
            allowed = self._allowed_containers(user, account)
442
            start, limit = self._list_limits(allowed, marker, limit)
443
            return allowed[start:start + limit]
444
        if shared or public:
445
            allowed = set()
446
            if shared:
447
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
448
            if public:
449
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
450
            allowed = sorted(allowed)
451
            start, limit = self._list_limits(allowed, marker, limit)
452
            return allowed[start:start + limit]
453
        node = self.node.node_lookup(account)
454
        containers = [x[0] for x in self._list_object_properties(
455
            node, account, '', '/', marker, limit, False, None, [], until)]
456
        start, limit = self._list_limits(
457
            [x[0] for x in containers], marker, limit)
458
        return containers[start:start + limit]
459

    
460
    @backend_method
461
    def list_container_meta(self, user, account, container, domain, until=None):
462
        """Return a list with all the container's object meta keys for the domain."""
463

    
464
        logger.debug("list_container_meta: %s %s %s %s %s", user,
465
                     account, container, domain, until)
466
        allowed = []
467
        if user != account:
468
            if until:
469
                raise NotAllowedError
470
            allowed = self.permissions.access_list_paths(
471
                user, '/'.join((account, container)))
472
            if not allowed:
473
                raise NotAllowedError
474
        path, node = self._lookup_container(account, container)
475
        before = until if until is not None else inf
476
        allowed = self._get_formatted_paths(allowed)
477
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
478

    
479
    @backend_method
480
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
481
        """Return a dictionary with the container metadata for the domain."""
482

    
483
        logger.debug("get_container_meta: %s %s %s %s %s", user,
484
                     account, container, domain, until)
485
        if user != account:
486
            if until or container not in self._allowed_containers(user, account):
487
                raise NotAllowedError
488
        path, node = self._lookup_container(account, container)
489
        props = self._get_properties(node, until)
490
        mtime = props[self.MTIME]
491
        count, bytes, tstamp = self._get_statistics(node, until)
492
        tstamp = max(tstamp, mtime)
493
        if until is None:
494
            modified = tstamp
495
        else:
496
            modified = self._get_statistics(
497
                node)[2]  # Overall last modification.
498
            modified = max(modified, mtime)
499

    
500
        if user != account:
501
            meta = {'name': container}
502
        else:
503
            meta = {}
504
            if include_user_defined:
505
                meta.update(
506
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
507
            if until is not None:
508
                meta.update({'until_timestamp': tstamp})
509
            meta.update({'name': container, 'count': count, 'bytes': bytes})
510
        meta.update({'modified': modified})
511
        return meta
512

    
513
    @backend_method
514
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
515
        """Update the metadata associated with the container for the domain."""
516

    
517
        logger.debug("update_container_meta: %s %s %s %s %s %s",
518
                     user, account, container, domain, meta, replace)
519
        if user != account:
520
            raise NotAllowedError
521
        path, node = self._lookup_container(account, container)
522
        src_version_id, dest_version_id = self._put_metadata(
523
            user, node, domain, meta, replace,
524
            update_statistics_ancestors_depth=0)
525
        if src_version_id is not None:
526
            versioning = self._get_policy(
527
                node, is_account_policy=False)['versioning']
528
            if versioning != 'auto':
529
                self.node.version_remove(src_version_id,
530
                                         update_statistics_ancestors_depth=0)
531

    
532
    @backend_method
533
    def get_container_policy(self, user, account, container):
534
        """Return a dictionary with the container policy."""
535

    
536
        logger.debug(
537
            "get_container_policy: %s %s %s", user, account, container)
538
        if user != account:
539
            if container not in self._allowed_containers(user, account):
540
                raise NotAllowedError
541
            return {}
542
        path, node = self._lookup_container(account, container)
543
        return self._get_policy(node, is_account_policy=False)
544

    
545
    @backend_method
546
    def update_container_policy(self, user, account, container, policy, replace=False):
547
        """Update the policy associated with the container."""
548

    
549
        logger.debug("update_container_policy: %s %s %s %s %s",
550
                     user, account, container, policy, replace)
551
        if user != account:
552
            raise NotAllowedError
553
        path, node = self._lookup_container(account, container)
554
        self._check_policy(policy, is_account_policy=False)
555
        self._put_policy(node, policy, replace, is_account_policy=False)
556

    
557
    @backend_method
558
    def put_container(self, user, account, container, policy=None):
559
        """Create a new container with the given name."""
560

    
561
        logger.debug(
562
            "put_container: %s %s %s %s", user, account, container, policy)
563
        policy = policy or {}
564
        if user != account:
565
            raise NotAllowedError
566
        try:
567
            path, node = self._lookup_container(account, container)
568
        except NameError:
569
            pass
570
        else:
571
            raise ContainerExists('Container already exists')
572
        if policy:
573
            self._check_policy(policy, is_account_policy=False)
574
        path = '/'.join((account, container))
575
        node = self._put_path(
576
            user, self._lookup_account(account, True)[1], path,
577
            update_statistics_ancestors_depth=-1)
578
        self._put_policy(node, policy, True, is_account_policy=False)
579

    
580
    @backend_method
581
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
582
        """Delete/purge the container with the given name."""
583

    
584
        logger.debug("delete_container: %s %s %s %s %s %s", user,
585
                     account, container, until, prefix, delimiter)
586
        if user != account:
587
            raise NotAllowedError
588
        path, node = self._lookup_container(account, container)
589

    
590
        if until is not None:
591
            hashes, size, serials = self.node.node_purge_children(
592
                node, until, CLUSTER_HISTORY,
593
                update_statistics_ancestors_depth=0)
594
            for h in hashes:
595
                self.store.map_delete(h)
596
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
597
                                          update_statistics_ancestors_depth=0)
598
            if not self.free_versioning:
599
                self._report_size_change(
600
                    user, account, -size, {
601
                        'action':'container purge',
602
                        'path': path,
603
                        'versions': ','.join(str(i) for i in serials)
604
                    }
605
                )
606
            return
607

    
608
        if not delimiter:
609
            if self._get_statistics(node)[0] > 0:
610
                raise ContainerNotEmpty('Container is not empty')
611
            hashes, size, serials = self.node.node_purge_children(
612
                node, inf, CLUSTER_HISTORY,
613
                update_statistics_ancestors_depth=0)
614
            for h in hashes:
615
                self.store.map_delete(h)
616
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
617
                                          update_statistics_ancestors_depth=0)
618
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
619
            if not self.free_versioning:
620
                self._report_size_change(
621
                    user, account, -size, {
622
                        'action':'container purge',
623
                        'path': path,
624
                        'versions': ','.join(str(i) for i in serials)
625
                    }
626
                )
627
        else:
628
            # remove only contents
629
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
630
            paths = []
631
            for t in src_names:
632
                path = '/'.join((account, container, t[0]))
633
                node = t[2]
634
                src_version_id, dest_version_id = self._put_version_duplicate(
635
                    user, node, size=0, type='', hash=None, checksum='',
636
                    cluster=CLUSTER_DELETED,
637
                    update_statistics_ancestors_depth=1)
638
                del_size = self._apply_versioning(
639
                    account, container, src_version_id,
640
                    update_statistics_ancestors_depth=1)
641
                self._report_size_change(
642
                        user, account, -del_size, {
643
                                'action': 'object delete',
644
                                'path': path,
645
                        'versions': ','.join([str(dest_version_id)])
646
                     }
647
                )
648
                self._report_object_change(
649
                    user, account, path, details={'action': 'object delete'})
650
                paths.append(path)
651
            self.permissions.access_clear_bulk(paths)
652

    
653
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
654
        if user != account and until:
655
            raise NotAllowedError
656
        if shared and public:
657
            # get shared first
658
            shared_paths = self._list_object_permissions(
659
                user, account, container, prefix, shared=True, public=False)
660
            objects = set()
661
            if shared_paths:
662
                path, node = self._lookup_container(account, container)
663
                shared_paths = self._get_formatted_paths(shared_paths)
664
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
665

    
666
            # get public
667
            objects |= set(self._list_public_object_properties(
668
                user, account, container, prefix, all_props))
669
            objects = list(objects)
670

    
671
            objects.sort(key=lambda x: x[0])
672
            start, limit = self._list_limits(
673
                [x[0] for x in objects], marker, limit)
674
            return objects[start:start + limit]
675
        elif public:
676
            objects = self._list_public_object_properties(
677
                user, account, container, prefix, all_props)
678
            start, limit = self._list_limits(
679
                [x[0] for x in objects], marker, limit)
680
            return objects[start:start + limit]
681

    
682
        allowed = self._list_object_permissions(
683
            user, account, container, prefix, shared, public)
684
        if shared and not allowed:
685
            return []
686
        path, node = self._lookup_container(account, container)
687
        allowed = self._get_formatted_paths(allowed)
688
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
689
        start, limit = self._list_limits(
690
            [x[0] for x in objects], marker, limit)
691
        return objects[start:start + limit]
692

    
693
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
694
        public = self._list_object_permissions(
695
            user, account, container, prefix, shared=False, public=True)
696
        paths, nodes = self._lookup_objects(public)
697
        path = '/'.join((account, container))
698
        cont_prefix = path + '/'
699
        paths = [x[len(cont_prefix):] for x in paths]
700
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
701
        objects = [(path,) + props for path, props in zip(paths, props)]
702
        return objects
703

    
704
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
705
        objects = []
706
        while True:
707
            marker = objects[-1] if objects else None
708
            limit = 10000
709
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
710
            objects.extend(l)
711
            if not l or len(l) < limit:
712
                break
713
        return objects
714

    
715
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
716
        allowed = []
717
        path = '/'.join((account, container, prefix)).rstrip('/')
718
        if user != account:
719
            allowed = self.permissions.access_list_paths(user, path)
720
            if not allowed:
721
                raise NotAllowedError
722
        else:
723
            allowed = set()
724
            if shared:
725
                allowed.update(self.permissions.access_list_shared(path))
726
            if public:
727
                allowed.update(
728
                    [x[0] for x in self.permissions.public_list(path)])
729
            allowed = sorted(allowed)
730
            if not allowed:
731
                return []
732
        return allowed
733

    
734
    @backend_method
735
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
736
        """Return a list of object (name, version_id) tuples existing under a container."""
737

    
738
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
739
        keys = keys or []
740
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
741

    
742
    @backend_method
743
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
744
        """Return a list of object metadata dicts existing under a container."""
745

    
746
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
747
        keys = keys or []
748
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
749
        objects = []
750
        for p in props:
751
            if len(p) == 2:
752
                objects.append({'subdir': p[0]})
753
            else:
754
                objects.append({'name': p[0],
755
                                'bytes': p[self.SIZE + 1],
756
                                'type': p[self.TYPE + 1],
757
                                'hash': p[self.HASH + 1],
758
                                'version': p[self.SERIAL + 1],
759
                                'version_timestamp': p[self.MTIME + 1],
760
                                'modified': p[self.MTIME + 1] if until is None else None,
761
                                'modified_by': p[self.MUSER + 1],
762
                                'uuid': p[self.UUID + 1],
763
                                'checksum': p[self.CHECKSUM + 1]})
764
        return objects
765

    
766
    @backend_method
767
    def list_object_permissions(self, user, account, container, prefix=''):
768
        """Return a list of paths that enforce permissions under a container."""
769

    
770
        logger.debug("list_object_permissions: %s %s %s %s", user,
771
                     account, container, prefix)
772
        return self._list_object_permissions(user, account, container, prefix, True, False)
773

    
774
    @backend_method
775
    def list_object_public(self, user, account, container, prefix=''):
776
        """Return a dict mapping paths to public ids for objects that are public under a container."""
777

    
778
        logger.debug("list_object_public: %s %s %s %s", user,
779
                     account, container, prefix)
780
        public = {}
781
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
782
            public[path] = p
783
        return public
784

    
785
    @backend_method
786
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
787
        """Return a dictionary with the object metadata for the domain."""
788

    
789
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
790
                     account, container, name, domain, version)
791
        self._can_read(user, account, container, name)
792
        path, node = self._lookup_object(account, container, name)
793
        props = self._get_version(node, version)
794
        if version is None:
795
            modified = props[self.MTIME]
796
        else:
797
            try:
798
                modified = self._get_version(
799
                    node)[self.MTIME]  # Overall last modification.
800
            except NameError:  # Object may be deleted.
801
                del_props = self.node.version_lookup(
802
                    node, inf, CLUSTER_DELETED)
803
                if del_props is None:
804
                    raise ItemNotExists('Object does not exist')
805
                modified = del_props[self.MTIME]
806

    
807
        meta = {}
808
        if include_user_defined:
809
            meta.update(
810
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
811
        meta.update({'name': name,
812
                     'bytes': props[self.SIZE],
813
                     'type': props[self.TYPE],
814
                     'hash': props[self.HASH],
815
                     'version': props[self.SERIAL],
816
                     'version_timestamp': props[self.MTIME],
817
                     'modified': modified,
818
                     'modified_by': props[self.MUSER],
819
                     'uuid': props[self.UUID],
820
                     'checksum': props[self.CHECKSUM]})
821
        return meta
822

    
823
    @backend_method
824
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
825
        """Update the metadata associated with the object for the domain and return the new version."""
826

    
827
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
828
                     user, account, container, name, domain, meta, replace)
829
        self._can_write(user, account, container, name)
830

    
831
        path, node = self._lookup_object(account, container, name,
832
                                         lock_container=True)
833
        src_version_id, dest_version_id = self._put_metadata(
834
            user, node, domain, meta, replace,
835
            update_statistics_ancestors_depth=1)
836
        self._apply_versioning(account, container, src_version_id,
837
                               update_statistics_ancestors_depth=1)
838
        return dest_version_id
839

    
840
    @backend_method
841
    def get_object_permissions(self, user, account, container, name):
842
        """Return the action allowed on the object, the path
843
        from which the object gets its permissions from,
844
        along with a dictionary containing the permissions."""
845

    
846
        logger.debug("get_object_permissions: %s %s %s %s", user,
847
                     account, container, name)
848
        allowed = 'write'
849
        permissions_path = self._get_permissions_path(account, container, name)
850
        if user != account:
851
            if self.permissions.access_check(permissions_path, self.WRITE, user):
852
                allowed = 'write'
853
            elif self.permissions.access_check(permissions_path, self.READ, user):
854
                allowed = 'read'
855
            else:
856
                raise NotAllowedError
857
        self._lookup_object(account, container, name)
858
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
859

    
860
    @backend_method
861
    def update_object_permissions(self, user, account, container, name, permissions):
862
        """Update the permissions associated with the object."""
863

    
864
        logger.debug("update_object_permissions: %s %s %s %s %s",
865
                     user, account, container, name, permissions)
866
        if user != account:
867
            raise NotAllowedError
868
        path = self._lookup_object(account, container, name,
869
                                   lock_container=True)[0]
870
        self._check_permissions(path, permissions)
871
        self.permissions.access_set(path, permissions)
872
        self._report_sharing_change(user, account, path, {'members':
873
                                    self.permissions.access_members(path)})
874

    
875
    @backend_method
876
    def get_object_public(self, user, account, container, name):
877
        """Return the public id of the object if applicable."""
878

    
879
        logger.debug(
880
            "get_object_public: %s %s %s %s", user, account, container, name)
881
        self._can_read(user, account, container, name)
882
        path = self._lookup_object(account, container, name)[0]
883
        p = self.permissions.public_get(path)
884
        return p
885

    
886
    @backend_method
887
    def update_object_public(self, user, account, container, name, public):
888
        """Update the public status of the object."""
889

    
890
        logger.debug("update_object_public: %s %s %s %s %s", user,
891
                     account, container, name, public)
892
        self._can_write(user, account, container, name)
893
        path = self._lookup_object(account, container, name,
894
                                   lock_container=True)[0]
895
        if not public:
896
            self.permissions.public_unset(path)
897
        else:
898
            self.permissions.public_set(
899
                path, self.public_url_security, self.public_url_alphabet
900
            )
901

    
902
    @backend_method
903
    def get_object_hashmap(self, user, account, container, name, version=None):
904
        """Return the object's size and a list with partial hashes."""
905

    
906
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
907
                     account, container, name, version)
908
        self._can_read(user, account, container, name)
909
        path, node = self._lookup_object(account, container, name)
910
        props = self._get_version(node, version)
911
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
912
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
913

    
914
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
915
        if permissions is not None and user != account:
916
            raise NotAllowedError
917
        self._can_write(user, account, container, name)
918
        if permissions is not None:
919
            path = '/'.join((account, container, name))
920
            self._check_permissions(path, permissions)
921

    
922
        account_path, account_node = self._lookup_account(account, True)
923
        container_path, container_node = self._lookup_container(
924
            account, container)
925

    
926
        path, node = self._put_object_node(
927
            container_path, container_node, name)
928
        pre_version_id, dest_version_id = self._put_version_duplicate(
929
            user, node, src_node=src_node, size=size, type=type, hash=hash,
930
            checksum=checksum, is_copy=is_copy,
931
            update_statistics_ancestors_depth=1)
932

    
933
        # Handle meta.
934
        if src_version_id is None:
935
            src_version_id = pre_version_id
936
        self._put_metadata_duplicate(
937
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
938

    
939
        del_size = self._apply_versioning(account, container, pre_version_id,
940
                                          update_statistics_ancestors_depth=1)
941
        size_delta = size - del_size
942
        if size_delta > 0:
943
            # Check account quota.
944
            if not self.using_external_quotaholder:
945
                account_quota = long(
946
                    self._get_policy(account_node, is_account_policy=True
947
                    )['quota']
948
                )
949
                account_usage = self._get_statistics(account_node, compute=True)[1]
950
                if (account_quota > 0 and account_usage > account_quota):
951
                    raise QuotaError(
952
                        'Account quota exceeded: limit: %s, usage: %s' % (
953
                            account_quota, account_usage
954
                        )
955
                    )
956

    
957
            # Check container quota.
958
            container_quota = long(
959
                self._get_policy(container_node, is_account_policy=False
960
                )['quota']
961
            )
962
            container_usage = self._get_statistics(container_node)[1]
963
            if (container_quota > 0 and container_usage > container_quota):
964
                # This must be executed in a transaction, so the version is
965
                # never created if it fails.
966
                raise QuotaError(
967
                    'Container quota exceeded: limit: %s, usage: %s' % (
968
                        container_quota, container_usage
969
                    )
970
                )
971

    
972
        self._report_size_change(user, account, size_delta,
973
                                 {'action': 'object update', 'path': path,
974
                                  'versions': ','.join([str(dest_version_id)])})
975
        if permissions is not None:
976
            self.permissions.access_set(path, permissions)
977
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
978

    
979
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
980
        return dest_version_id
981

    
982
    @backend_method
983
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
984
        """Create/update an object with the specified size and partial hashes."""
985

    
986
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
987
                     account, container, name, size, type, hashmap, checksum)
988
        meta = meta or {}
989
        if size == 0:  # No such thing as an empty hashmap.
990
            hashmap = [self.put_block('')]
991
        map = HashMap(self.block_size, self.hash_algorithm)
992
        map.extend([binascii.unhexlify(x) for x in hashmap])
993
        missing = self.store.block_search(map)
994
        if missing:
995
            ie = IndexError()
996
            ie.data = [binascii.hexlify(x) for x in missing]
997
            raise ie
998

    
999
        hash = map.hash()
1000
        # _update_object_hash() locks destination path
1001
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
1002
        self.store.map_put(hash, map)
1003
        return dest_version_id
1004

    
1005
    @backend_method
1006
    def update_object_checksum(self, user, account, container, name, version, checksum):
1007
        """Update an object's checksum."""
1008

    
1009
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
1010
                     user, account, container, name, version, checksum)
1011
        # Update objects with greater version and same hashmap and size (fix metadata updates).
1012
        self._can_write(user, account, container, name)
1013
        path, node = self._lookup_object(account, container, name,
1014
                                         lock_container=True)
1015
        props = self._get_version(node, version)
1016
        versions = self.node.node_get_versions(node)
1017
        for x in versions:
1018
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
1019
                self.node.version_put_property(
1020
                    x[self.SERIAL], 'checksum', checksum)
1021

    
1022
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
1023
        dest_meta = dest_meta or {}
1024
        dest_version_ids = []
1025
        self._can_read(user, src_account, src_container, src_name)
1026

    
1027
        src_container_path = '/'.join((src_account, src_container))
1028
        dest_container_path = '/'.join((src_account, src_container))
1029
        # Lock container paths in alphabetical order
1030
        if src_container_path < dest_container_path:
1031
            path, node = self._lookup_object(src_account, src_container,
1032
                                             src_name,
1033
                                             lock_container=True)
1034
            self._lookup_container(dest_account, dest_container)
1035
        else:
1036
            self._lookup_container(dest_account, dest_container)
1037
            path, node = self._lookup_object(src_account, src_container,
1038
                                             src_name,
1039
                                             lock_container=True)
1040

    
1041
        # TODO: Will do another fetch of the properties in duplicate version...
1042
        props = self._get_version(
1043
            node, src_version)  # Check to see if source exists.
1044
        src_version_id = props[self.SERIAL]
1045
        hash = props[self.HASH]
1046
        size = props[self.SIZE]
1047
        is_copy = not is_move and (src_account, src_container, src_name) != (
1048
            dest_account, dest_container, dest_name)  # New uuid.
1049
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1050
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1051
            self._delete_object(user, src_account, src_container, src_name)
1052

    
1053
        if delimiter:
1054
            prefix = src_name + \
1055
                delimiter if not src_name.endswith(delimiter) else src_name
1056
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1057
            src_names.sort(key=lambda x: x[2])  # order by nodes
1058
            paths = [elem[0] for elem in src_names]
1059
            nodes = [elem[2] for elem in src_names]
1060
            # TODO: Will do another fetch of the properties in duplicate version...
1061
            props = self._get_versions(nodes)  # Check to see if source exists.
1062

    
1063
            for prop, path, node in zip(props, paths, nodes):
1064
                src_version_id = prop[self.SERIAL]
1065
                hash = prop[self.HASH]
1066
                vtype = prop[self.TYPE]
1067
                size = prop[self.SIZE]
1068
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1069
                    delimiter) else dest_name
1070
                vdest_name = path.replace(prefix, dest_prefix, 1)
1071
                # _update_object_hash() locks destination path
1072
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1073
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1074
                    self._delete_object(user, src_account, src_container, path)
1075
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1076

    
1077
    @backend_method
1078
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1079
        """Copy an object's data and metadata."""
1080

    
1081
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1082
        meta = meta or {}
1083
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1084
        return dest_version_id
1085

    
1086
    @backend_method
1087
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1088
        """Move an object's data and metadata."""
1089

    
1090
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1091
        meta = meta or {}
1092
        if user != src_account:
1093
            raise NotAllowedError
1094
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1095
        return dest_version_id
1096

    
1097
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1098
        if user != account:
1099
            raise NotAllowedError
1100

    
1101
        # lookup object and lock container path also
1102
        path, node = self._lookup_object(account, container, name,
1103
                                         lock_container=True)
1104

    
1105
        if until is not None:
1106
            path = '/'.join((account, container, name))
1107
            node = self.node.node_lookup(path)
1108
            if node is None:
1109
                return
1110
            hashes = []
1111
            size = 0
1112
            serials = []
1113
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1114
                                           update_statistics_ancestors_depth=1)
1115
            hashes += h
1116
            size += s
1117
            serials += v
1118
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1119
                                           update_statistics_ancestors_depth=1)
1120
            hashes += h
1121
            if not self.free_versioning:
1122
                size += s
1123
            serials += v
1124
            for h in hashes:
1125
                self.store.map_delete(h)
1126
            self.node.node_purge(node, until, CLUSTER_DELETED,
1127
                                 update_statistics_ancestors_depth=1)
1128
            try:
1129
                props = self._get_version(node)
1130
            except NameError:
1131
                self.permissions.access_clear(path)
1132
            self._report_size_change(
1133
                user, account, -size, {
1134
                    'action': 'object purge',
1135
                    'path': path,
1136
                    'versions': ','.join(str(i) for i in serials)
1137
                }
1138
            )
1139
            return
1140

    
1141
        src_version_id, dest_version_id = self._put_version_duplicate(
1142
            user, node, size=0, type='', hash=None, checksum='',
1143
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1144
        del_size = self._apply_versioning(account, container, src_version_id,
1145
                                          update_statistics_ancestors_depth=1)
1146
        self._report_size_change(user, account, -del_size,
1147
                                 {'action': 'object delete', 'path': path,
1148
                                  'versions': ','.join([str(dest_version_id)])})
1149
        self._report_object_change(
1150
            user, account, path, details={'action': 'object delete'})
1151
        self.permissions.access_clear(path)
1152

    
1153
        if delimiter:
1154
            prefix = name + delimiter if not name.endswith(delimiter) else name
1155
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1156
            paths = []
1157
            for t in src_names:
1158
                path = '/'.join((account, container, t[0]))
1159
                node = t[2]
1160
                src_version_id, dest_version_id = self._put_version_duplicate(
1161
                    user, node, size=0, type='', hash=None, checksum='',
1162
                    cluster=CLUSTER_DELETED,
1163
                    update_statistics_ancestors_depth=1)
1164
                del_size = self._apply_versioning(
1165
                    account, container, src_version_id,
1166
                    update_statistics_ancestors_depth=1)
1167
                self._report_size_change(user, account, -del_size,
1168
                                         {'action': 'object delete',
1169
                                          'path': path,
1170
                                          'versions': ','.join([str(dest_version_id)])})
1171
                self._report_object_change(
1172
                    user, account, path, details={'action': 'object delete'})
1173
                paths.append(path)
1174
            self.permissions.access_clear_bulk(paths)
1175

    
1176
    @backend_method
1177
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1178
        """Delete/purge an object."""
1179

    
1180
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1181
                     account, container, name, until, prefix, delimiter)
1182
        self._delete_object(user, account, container, name, until, delimiter)
1183

    
1184
    @backend_method
1185
    def list_versions(self, user, account, container, name):
1186
        """Return a list of all (version, version_timestamp) tuples for an object."""
1187

    
1188
        logger.debug(
1189
            "list_versions: %s %s %s %s", user, account, container, name)
1190
        self._can_read(user, account, container, name)
1191
        path, node = self._lookup_object(account, container, name)
1192
        versions = self.node.node_get_versions(node)
1193
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1194

    
1195
    @backend_method
1196
    def get_uuid(self, user, uuid):
1197
        """Return the (account, container, name) for the UUID given."""
1198

    
1199
        logger.debug("get_uuid: %s %s", user, uuid)
1200
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1201
        if info is None:
1202
            raise NameError
1203
        path, serial = info
1204
        account, container, name = path.split('/', 2)
1205
        self._can_read(user, account, container, name)
1206
        return (account, container, name)
1207

    
1208
    @backend_method
1209
    def get_public(self, user, public):
1210
        """Return the (account, container, name) for the public id given."""
1211

    
1212
        logger.debug("get_public: %s %s", user, public)
1213
        path = self.permissions.public_path(public)
1214
        if path is None:
1215
            raise NameError
1216
        account, container, name = path.split('/', 2)
1217
        self._can_read(user, account, container, name)
1218
        return (account, container, name)
1219

    
1220
    @backend_method(autocommit=0)
1221
    def get_block(self, hash):
1222
        """Return a block's data."""
1223

    
1224
        logger.debug("get_block: %s", hash)
1225
        block = self.store.block_get(binascii.unhexlify(hash))
1226
        if not block:
1227
            raise ItemNotExists('Block does not exist')
1228
        return block
1229

    
1230
    @backend_method(autocommit=0)
1231
    def put_block(self, data):
1232
        """Store a block and return the hash."""
1233

    
1234
        logger.debug("put_block: %s", len(data))
1235
        return binascii.hexlify(self.store.block_put(data))
1236

    
1237
    @backend_method(autocommit=0)
1238
    def update_block(self, hash, data, offset=0):
1239
        """Update a known block and return the hash."""
1240

    
1241
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1242
        if offset == 0 and len(data) == self.block_size:
1243
            return self.put_block(data)
1244
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1245
        return binascii.hexlify(h)
1246

    
1247
    # Path functions.
1248

    
1249
    def _generate_uuid(self):
1250
        return str(uuidlib.uuid4())
1251

    
1252
    def _put_object_node(self, path, parent, name):
1253
        path = '/'.join((path, name))
1254
        node = self.node.node_lookup(path)
1255
        if node is None:
1256
            node = self.node.node_create(parent, path)
1257
        return path, node
1258

    
1259
    def _put_path(self, user, parent, path,
1260
                  update_statistics_ancestors_depth=None):
1261
        node = self.node.node_create(parent, path)
1262
        self.node.version_create(node, None, 0, '', None, user,
1263
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1264
                                 update_statistics_ancestors_depth)
1265
        return node
1266

    
1267
    def _lookup_account(self, account, create=True):
1268
        node = self.node.node_lookup(account)
1269
        if node is None and create:
1270
            node = self._put_path(
1271
                account, self.ROOTNODE, account,
1272
                update_statistics_ancestors_depth=-1)  # User is account.
1273
        return account, node
1274

    
1275
    def _lookup_container(self, account, container):
1276
        path = '/'.join((account, container))
1277
        node = self.node.node_lookup(path, for_update=True)
1278
        if node is None:
1279
            raise ItemNotExists('Container does not exist')
1280
        return path, node
1281

    
1282
    def _lookup_object(self, account, container, name, lock_container=False):
1283
        if lock_container:
1284
            self._lookup_container(account, container)
1285

    
1286
        path = '/'.join((account, container, name))
1287
        node = self.node.node_lookup(path)
1288
        if node is None:
1289
            raise ItemNotExists('Object does not exist')
1290
        return path, node
1291

    
1292
    def _lookup_objects(self, paths):
1293
        nodes = self.node.node_lookup_bulk(paths)
1294
        return paths, nodes
1295

    
1296
    def _get_properties(self, node, until=None):
1297
        """Return properties until the timestamp given."""
1298

    
1299
        before = until if until is not None else inf
1300
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1301
        if props is None and until is not None:
1302
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1303
        if props is None:
1304
            raise ItemNotExists('Path does not exist')
1305
        return props
1306

    
1307
    def _get_statistics(self, node, until=None, compute=False):
1308
        """Return count, sum of size and latest timestamp of everything under node."""
1309

    
1310
        if until is not None:
1311
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1312
        elif compute:
1313
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1314
        else:
1315
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1316
        if stats is None:
1317
            stats = (0, 0, 0)
1318
        return stats
1319

    
1320
    def _get_version(self, node, version=None):
1321
        if version is None:
1322
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1323
            if props is None:
1324
                raise ItemNotExists('Object does not exist')
1325
        else:
1326
            try:
1327
                version = int(version)
1328
            except ValueError:
1329
                raise VersionNotExists('Version does not exist')
1330
            props = self.node.version_get_properties(version, node=node)
1331
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1332
                raise VersionNotExists('Version does not exist')
1333
        return props
1334

    
1335
    def _get_versions(self, nodes):
1336
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1337

    
1338
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1339
                               type=None, hash=None, checksum=None,
1340
                               cluster=CLUSTER_NORMAL, is_copy=False,
1341
                               update_statistics_ancestors_depth=None):
1342
        """Create a new version of the node."""
1343

    
1344
        props = self.node.version_lookup(
1345
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1346
        if props is not None:
1347
            src_version_id = props[self.SERIAL]
1348
            src_hash = props[self.HASH]
1349
            src_size = props[self.SIZE]
1350
            src_type = props[self.TYPE]
1351
            src_checksum = props[self.CHECKSUM]
1352
        else:
1353
            src_version_id = None
1354
            src_hash = None
1355
            src_size = 0
1356
            src_type = ''
1357
            src_checksum = ''
1358
        if size is None:  # Set metadata.
1359
            hash = src_hash  # This way hash can be set to None (account or container).
1360
            size = src_size
1361
        if type is None:
1362
            type = src_type
1363
        if checksum is None:
1364
            checksum = src_checksum
1365
        uuid = self._generate_uuid(
1366
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1367

    
1368
        if src_node is None:
1369
            pre_version_id = src_version_id
1370
        else:
1371
            pre_version_id = None
1372
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1373
            if props is not None:
1374
                pre_version_id = props[self.SERIAL]
1375
        if pre_version_id is not None:
1376
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1377
                                        update_statistics_ancestors_depth)
1378

    
1379
        dest_version_id, mtime = self.node.version_create(
1380
            node, hash, size, type, src_version_id, user, uuid, checksum,
1381
            cluster, update_statistics_ancestors_depth)
1382

    
1383
        self.node.attribute_unset_is_latest(node, dest_version_id)
1384

    
1385
        return pre_version_id, dest_version_id
1386

    
1387
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1388
                                node, meta, replace=False):
1389
        if src_version_id is not None:
1390
            self.node.attribute_copy(src_version_id, dest_version_id)
1391
        if not replace:
1392
            self.node.attribute_del(dest_version_id, domain, (
1393
                k for k, v in meta.iteritems() if v == ''))
1394
            self.node.attribute_set(dest_version_id, domain, node, (
1395
                (k, v) for k, v in meta.iteritems() if v != ''))
1396
        else:
1397
            self.node.attribute_del(dest_version_id, domain)
1398
            self.node.attribute_set(dest_version_id, domain, node, ((
1399
                k, v) for k, v in meta.iteritems()))
1400

    
1401
    def _put_metadata(self, user, node, domain, meta, replace=False,
1402
                      update_statistics_ancestors_depth=None):
1403
        """Create a new version and store metadata."""
1404

    
1405
        src_version_id, dest_version_id = self._put_version_duplicate(
1406
            user, node,
1407
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1408
        self._put_metadata_duplicate(
1409
            src_version_id, dest_version_id, domain, node, meta, replace)
1410
        return src_version_id, dest_version_id
1411

    
1412
    def _list_limits(self, listing, marker, limit):
1413
        start = 0
1414
        if marker:
1415
            try:
1416
                start = listing.index(marker) + 1
1417
            except ValueError:
1418
                pass
1419
        if not limit or limit > 10000:
1420
            limit = 10000
1421
        return start, limit
1422

    
1423
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1424
        keys = keys or []
1425
        allowed = allowed or []
1426
        cont_prefix = path + '/'
1427
        prefix = cont_prefix + prefix
1428
        start = cont_prefix + marker if marker else None
1429
        before = until if until is not None else inf
1430
        filterq = keys if domain else []
1431
        sizeq = size_range
1432

    
1433
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1434
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1435
        objects.sort(key=lambda x: x[0])
1436
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1437
        return objects
1438

    
1439
    # Reporting functions.
1440

    
1441
    def _report_size_change(self, user, account, size, details=None):
1442
        details = details or {}
1443

    
1444
        if size == 0:
1445
            return
1446

    
1447
        account_node = self._lookup_account(account, True)[1]
1448
        total = self._get_statistics(account_node, compute=True)[1]
1449
        details.update({'user': user, 'total': total})
1450
        logger.debug(
1451
            "_report_size_change: %s %s %s %s", user, account, size, details)
1452
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1453
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1454
                              float(size), details))
1455

    
1456
        if not self.using_external_quotaholder:
1457
            return
1458

    
1459
        try:
1460
            name = details['path'] if 'path' in details else ''
1461
            serial = self.astakosclient.issue_one_commission(
1462
                token=self.service_token,
1463
                holder=account,
1464
                source=DEFAULT_SOURCE,
1465
                provisions={'pithos.diskspace': size},
1466
                name=name
1467
                )
1468
        except BaseException, e:
1469
            raise QuotaError(e)
1470
        else:
1471
            self.serials.append(serial)
1472

    
1473
    def _report_object_change(self, user, account, path, details=None):
1474
        details = details or {}
1475
        details.update({'user': user})
1476
        logger.debug("_report_object_change: %s %s %s %s", user,
1477
                     account, path, details)
1478
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1479
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1480

    
1481
    def _report_sharing_change(self, user, account, path, details=None):
1482
        logger.debug("_report_permissions_change: %s %s %s %s",
1483
                     user, account, path, details)
1484
        details = details or {}
1485
        details.update({'user': user})
1486
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1487
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1488

    
1489
    # Policy functions.
1490

    
1491
    def _check_policy(self, policy, is_account_policy=True):
1492
        default_policy = self.default_account_policy \
1493
            if is_account_policy else self.default_container_policy
1494
        for k in policy.keys():
1495
            if policy[k] == '':
1496
                policy[k] = default_policy.get(k)
1497
        for k, v in policy.iteritems():
1498
            if k == 'quota':
1499
                q = int(v)  # May raise ValueError.
1500
                if q < 0:
1501
                    raise ValueError
1502
            elif k == 'versioning':
1503
                if v not in ['auto', 'none']:
1504
                    raise ValueError
1505
            else:
1506
                raise ValueError
1507

    
1508
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1509
        default_policy = self.default_account_policy \
1510
            if is_account_policy else self.default_container_policy
1511
        if replace:
1512
            for k, v in default_policy.iteritems():
1513
                if k not in policy:
1514
                    policy[k] = v
1515
        self.node.policy_set(node, policy)
1516

    
1517
    def _get_policy(self, node, is_account_policy=True):
1518
        default_policy = self.default_account_policy \
1519
            if is_account_policy else self.default_container_policy
1520
        policy = default_policy.copy()
1521
        policy.update(self.node.policy_get(node))
1522
        return policy
1523

    
1524
    def _apply_versioning(self, account, container, version_id,
1525
                          update_statistics_ancestors_depth=None):
1526
        """Delete the provided version if such is the policy.
1527
           Return size of object removed.
1528
        """
1529

    
1530
        if version_id is None:
1531
            return 0
1532
        path, node = self._lookup_container(account, container)
1533
        versioning = self._get_policy(
1534
            node, is_account_policy=False)['versioning']
1535
        if versioning != 'auto':
1536
            hash, size = self.node.version_remove(
1537
                version_id, update_statistics_ancestors_depth)
1538
            self.store.map_delete(hash)
1539
            return size
1540
        elif self.free_versioning:
1541
            return self.node.version_get_properties(
1542
                version_id, keys=('size',))[0]
1543
        return 0
1544

    
1545
    # Access control functions.
1546

    
1547
    def _check_groups(self, groups):
1548
        # raise ValueError('Bad characters in groups')
1549
        pass
1550

    
1551
    def _check_permissions(self, path, permissions):
1552
        # raise ValueError('Bad characters in permissions')
1553
        pass
1554

    
1555
    def _get_formatted_paths(self, paths):
1556
        formatted = []
1557
        for p in paths:
1558
            node = self.node.node_lookup(p)
1559
            props = None
1560
            if node is not None:
1561
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1562
            if props is not None:
1563
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1564
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1565
                formatted.append((p, self.MATCH_EXACT))
1566
        return formatted
1567

    
1568
    def _get_permissions_path(self, account, container, name):
1569
        path = '/'.join((account, container, name))
1570
        permission_paths = self.permissions.access_inherit(path)
1571
        permission_paths.sort()
1572
        permission_paths.reverse()
1573
        for p in permission_paths:
1574
            if p == path:
1575
                return p
1576
            else:
1577
                if p.count('/') < 2:
1578
                    continue
1579
                node = self.node.node_lookup(p)
1580
                props = None
1581
                if node is not None:
1582
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1583
                if props is not None:
1584
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1585
                        return p
1586
        return None
1587

    
1588
    def _can_read(self, user, account, container, name):
1589
        if user == account:
1590
            return True
1591
        path = '/'.join((account, container, name))
1592
        if self.permissions.public_get(path) is not None:
1593
            return True
1594
        path = self._get_permissions_path(account, container, name)
1595
        if not path:
1596
            raise NotAllowedError
1597
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1598
            raise NotAllowedError
1599

    
1600
    def _can_write(self, user, account, container, name):
1601
        if user == account:
1602
            return True
1603
        path = '/'.join((account, container, name))
1604
        path = self._get_permissions_path(account, container, name)
1605
        if not path:
1606
            raise NotAllowedError
1607
        if not self.permissions.access_check(path, self.WRITE, user):
1608
            raise NotAllowedError
1609

    
1610
    def _allowed_accounts(self, user):
1611
        allow = set()
1612
        for path in self.permissions.access_list_paths(user):
1613
            allow.add(path.split('/', 1)[0])
1614
        return sorted(allow)
1615

    
1616
    def _allowed_containers(self, user, account):
1617
        allow = set()
1618
        for path in self.permissions.access_list_paths(user, account):
1619
            allow.add(path.split('/', 2)[1])
1620
        return sorted(allow)
1621

    
1622
    # Domain functions
1623

    
1624
    @backend_method
1625
    def get_domain_objects(self, domain, user=None):
1626
        allowed_paths = self.permissions.access_list_paths(
1627
            user, include_owned=user is not None, include_containers=False)
1628
        if not allowed_paths:
1629
            return []
1630
        obj_list = self.node.domain_object_list(
1631
            domain, allowed_paths, CLUSTER_NORMAL)
1632
        return [(path,
1633
                 self._build_metadata(props, user_defined_meta),
1634
                 self.permissions.access_get(path)) for
1635
                path, props, user_defined_meta in obj_list]
1636

    
1637
    # util functions
1638

    
1639
    def _build_metadata(self, props, user_defined=None,
1640
                        include_user_defined=True):
1641
        meta = {'bytes': props[self.SIZE],
1642
                'type': props[self.TYPE],
1643
                'hash': props[self.HASH],
1644
                'version': props[self.SERIAL],
1645
                'version_timestamp': props[self.MTIME],
1646
                'modified_by': props[self.MUSER],
1647
                'uuid': props[self.UUID],
1648
                'checksum': props[self.CHECKSUM]}
1649
        if include_user_defined and user_defined != None:
1650
            meta.update(user_defined)
1651
        return meta
1652

    
1653
    def _has_read_access(self, user, path):
1654
        try:
1655
            account, container, object = path.split('/', 2)
1656
        except ValueError:
1657
            raise ValueError('Invalid object path')
1658

    
1659
        assert isinstance(user, basestring), "Invalid user"
1660

    
1661
        try:
1662
            self._can_read(user, account, container, object)
1663
        except NotAllowedError:
1664
            return False
1665
        else:
1666
            return True