Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 3248f20a

History | View | Annotate | Download (69.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
98
DEFAULT_HASH_ALGORITHM = 'sha256'
99
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
100
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
101
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
102
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
103
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
104
                               'abcdefghijklmnopqrstuvwxyz'
105
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
106
DEFAULT_PUBLIC_URL_SECURITY = 16
107

    
108
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
109
QUEUE_CLIENT_ID = 'pithos'
110
QUEUE_INSTANCE_ID = '1'
111

    
112
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
113

    
114
inf = float('inf')
115

    
116
ULTIMATE_ANSWER = 42
117

    
118
DEFAULT_SOURCE = 'system'
119

    
120
logger = logging.getLogger(__name__)
121

    
122

    
123
class ModularBackend(BaseBackend):
124
    """A modular backend.
125

126
    Uses modules for SQL functions and storage.
127
    """
128

    
129
    def __init__(self, db_module=None, db_connection=None,
130
                 block_module=None, block_path=None, block_umask=None,
131
                 block_size=None, hash_algorithm=None,
132
                 queue_module=None, queue_hosts=None, queue_exchange=None,
133
                 astakos_url=None, service_token=None,
134
                 astakosclient_poolsize=None,
135
                 free_versioning=True, block_params=None,
136
                 public_url_security=None,
137
                 public_url_alphabet=None,
138
                 account_quota_policy=None,
139
                 container_quota_policy=None,
140
                 container_versioning_policy=None):
141
        db_module = db_module or DEFAULT_DB_MODULE
142
        db_connection = db_connection or DEFAULT_DB_CONNECTION
143
        block_module = block_module or DEFAULT_BLOCK_MODULE
144
        block_path = block_path or DEFAULT_BLOCK_PATH
145
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
146
        block_params = block_params or DEFAULT_BLOCK_PARAMS
147
        block_size = block_size or DEFAULT_BLOCK_SIZE
148
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
149
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
150
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
151
        container_quota_policy = container_quota_policy \
152
            or DEFAULT_CONTAINER_QUOTA
153
        container_versioning_policy = container_versioning_policy \
154
            or DEFAULT_CONTAINER_VERSIONING
155

    
156
        self.default_account_policy = {'quota': account_quota_policy}
157
        self.default_container_policy = {
158
            'quota': container_quota_policy,
159
            'versioning': container_versioning_policy
160
        }
161
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
162
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
163

    
164
        self.public_url_security = (public_url_security or
165
            DEFAULT_PUBLIC_URL_SECURITY)
166
        self.public_url_alphabet = (public_url_alphabet or
167
            DEFAULT_PUBLIC_URL_ALPHABET)
168

    
169
        self.hash_algorithm = hash_algorithm
170
        self.block_size = block_size
171
        self.free_versioning = free_versioning
172

    
173
        def load_module(m):
174
            __import__(m)
175
            return sys.modules[m]
176

    
177
        self.db_module = load_module(db_module)
178
        self.wrapper = self.db_module.DBWrapper(db_connection)
179
        params = {'wrapper': self.wrapper}
180
        self.permissions = self.db_module.Permissions(**params)
181
        self.config = self.db_module.Config(**params)
182
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
183
        for x in ['READ', 'WRITE']:
184
            setattr(self, x, getattr(self.db_module, x))
185
        self.node = self.db_module.Node(**params)
186
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
187
            setattr(self, x, getattr(self.db_module, x))
188

    
189
        self.block_module = load_module(block_module)
190
        self.block_params = block_params
191
        params = {'path': block_path,
192
                  'block_size': self.block_size,
193
                  'hash_algorithm': self.hash_algorithm,
194
                  'umask': block_umask}
195
        params.update(self.block_params)
196
        self.store = self.block_module.Store(**params)
197

    
198
        if queue_module and queue_hosts:
199
            self.queue_module = load_module(queue_module)
200
            params = {'hosts': queue_hosts,
201
                      'exchange': queue_exchange,
202
                      'client_id': QUEUE_CLIENT_ID}
203
            self.queue = self.queue_module.Queue(**params)
204
        else:
205
            class NoQueue:
206
                def send(self, *args):
207
                    pass
208

    
209
                def close(self):
210
                    pass
211

    
212
            self.queue = NoQueue()
213

    
214
        self.astakos_url = astakos_url
215
        self.service_token = service_token
216

    
217
        if not astakos_url or not AstakosClient:
218
            self.astakosclient = DisabledAstakosClient(
219
                astakos_url,
220
                use_pool=True,
221
                pool_size=astakosclient_poolsize)
222
        else:
223
            self.astakosclient = AstakosClient(
224
                astakos_url,
225
                use_pool=True,
226
                pool_size=astakosclient_poolsize)
227

    
228
        self.serials = []
229
        self.messages = []
230

    
231
    def pre_exec(self, lock_container_path=False):
232
        self.lock_container_path = lock_container_path
233
        self.wrapper.execute()
234

    
235
    def post_exec(self, success_status=True):
236
        if success_status:
237
            # send messages produced
238
            for m in self.messages:
239
                self.queue.send(*m)
240

    
241
            # register serials
242
            if self.serials:
243
                self.commission_serials.insert_many(
244
                    self.serials)
245

    
246
                # commit to ensure that the serials are registered
247
                # even if resolve commission fails
248
                self.wrapper.commit()
249

    
250
                # start new transaction
251
                self.wrapper.execute()
252

    
253
                r = self.astakosclient.resolve_commissions(
254
                            token=self.service_token,
255
                            accept_serials=self.serials,
256
                            reject_serials=[])
257
                self.commission_serials.delete_many(
258
                    r['accepted'])
259

    
260
            self.wrapper.commit()
261
        else:
262
            if self.serials:
263
                self.astakosclient.resolve_commissions(
264
                    token=self.service_token,
265
                    accept_serials=[],
266
                    reject_serials=self.serials)
267
            self.wrapper.rollback()
268

    
269
    def close(self):
270
        self.wrapper.close()
271
        self.queue.close()
272

    
273
    @property
274
    def using_external_quotaholder(self):
275
        return not isinstance(self.astakosclient, DisabledAstakosClient)
276

    
277
    def list_accounts(self, user, marker=None, limit=10000):
278
        """Return a list of accounts the user can access."""
279

    
280
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
281
        allowed = self._allowed_accounts(user)
282
        start, limit = self._list_limits(allowed, marker, limit)
283
        return allowed[start:start + limit]
284

    
285
    def get_account_meta(
286
            self, user, account, domain, until=None, include_user_defined=True,
287
            external_quota=None):
288
        """Return a dictionary with the account metadata for the domain."""
289

    
290
        logger.debug(
291
            "get_account_meta: %s %s %s %s", user, account, domain, until)
292
        path, node = self._lookup_account(account, user == account)
293
        if user != account:
294
            if until or node is None or account not in self._allowed_accounts(user):
295
                raise NotAllowedError
296
        try:
297
            props = self._get_properties(node, until)
298
            mtime = props[self.MTIME]
299
        except NameError:
300
            props = None
301
            mtime = until
302
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
303
        tstamp = max(tstamp, mtime)
304
        if until is None:
305
            modified = tstamp
306
        else:
307
            modified = self._get_statistics(
308
                node, compute=True)[2]  # Overall last modification.
309
            modified = max(modified, mtime)
310

    
311
        if user != account:
312
            meta = {'name': account}
313
        else:
314
            meta = {}
315
            if props is not None and include_user_defined:
316
                meta.update(
317
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
318
            if until is not None:
319
                meta.update({'until_timestamp': tstamp})
320
            meta.update({'name': account, 'count': count, 'bytes': bytes})
321
            if self.using_external_quotaholder:
322
                external_quota = external_quota or {}
323
                meta['bytes'] = external_quota.get('usage', 0)
324
        meta.update({'modified': modified})
325
        return meta
326

    
327
    def update_account_meta(self, user, account, domain, meta, replace=False):
328
        """Update the metadata associated with the account for the domain."""
329

    
330
        logger.debug("update_account_meta: %s %s %s %s %s", user,
331
                     account, domain, meta, replace)
332
        if user != account:
333
            raise NotAllowedError
334
        path, node = self._lookup_account(account, True)
335
        self._put_metadata(user, node, domain, meta, replace,
336
                           update_statistics_ancestors_depth=-1)
337

    
338
    def get_account_groups(self, user, account):
339
        """Return a dictionary with the user groups defined for this account."""
340

    
341
        logger.debug("get_account_groups: %s %s", user, account)
342
        if user != account:
343
            if account not in self._allowed_accounts(user):
344
                raise NotAllowedError
345
            return {}
346
        self._lookup_account(account, True)
347
        return self.permissions.group_dict(account)
348

    
349
    def update_account_groups(self, user, account, groups, replace=False):
350
        """Update the groups associated with the account."""
351

    
352
        logger.debug("update_account_groups: %s %s %s %s", user,
353
                     account, groups, replace)
354
        if user != account:
355
            raise NotAllowedError
356
        self._lookup_account(account, True)
357
        self._check_groups(groups)
358
        if replace:
359
            self.permissions.group_destroy(account)
360
        for k, v in groups.iteritems():
361
            if not replace:  # If not already deleted.
362
                self.permissions.group_delete(account, k)
363
            if v:
364
                self.permissions.group_addmany(account, k, v)
365

    
366
    def get_account_policy(self, user, account, external_quota=None):
367
        """Return a dictionary with the account policy."""
368

    
369
        logger.debug("get_account_policy: %s %s", user, account)
370
        if user != account:
371
            if account not in self._allowed_accounts(user):
372
                raise NotAllowedError
373
            return {}
374
        path, node = self._lookup_account(account, True)
375
        policy = self._get_policy(node, is_account_policy=True)
376
        if self.using_external_quotaholder:
377
            external_quota = external_quota or {}
378
            policy['quota'] = external_quota.get('limit', 0)
379
        return policy
380

    
381
    def update_account_policy(self, user, account, policy, replace=False):
382
        """Update the policy associated with the account."""
383

    
384
        logger.debug("update_account_policy: %s %s %s %s", user,
385
                     account, policy, replace)
386
        if user != account:
387
            raise NotAllowedError
388
        path, node = self._lookup_account(account, True)
389
        self._check_policy(policy, is_account_policy=True)
390
        self._put_policy(node, policy, replace, is_account_policy=True)
391

    
392
    def put_account(self, user, account, policy=None):
393
        """Create a new account with the given name."""
394

    
395
        logger.debug("put_account: %s %s %s", user, account, policy)
396
        policy = policy or {}
397
        if user != account:
398
            raise NotAllowedError
399
        node = self.node.node_lookup(account)
400
        if node is not None:
401
            raise AccountExists('Account already exists')
402
        if policy:
403
            self._check_policy(policy, is_account_policy=True)
404
        node = self._put_path(user, self.ROOTNODE, account,
405
                              update_statistics_ancestors_depth=-1)
406
        self._put_policy(node, policy, True, is_account_policy=True)
407

    
408
    def delete_account(self, user, account):
409
        """Delete the account with the given name."""
410

    
411
        logger.debug("delete_account: %s %s", user, account)
412
        if user != account:
413
            raise NotAllowedError
414
        node = self.node.node_lookup(account)
415
        if node is None:
416
            return
417
        if not self.node.node_remove(node,
418
                                     update_statistics_ancestors_depth=-1):
419
            raise AccountNotEmpty('Account is not empty')
420
        self.permissions.group_destroy(account)
421

    
422
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
423
        """Return a list of containers existing under an account."""
424

    
425
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
426
                     account, marker, limit, shared, until, public)
427
        if user != account:
428
            if until or account not in self._allowed_accounts(user):
429
                raise NotAllowedError
430
            allowed = self._allowed_containers(user, account)
431
            start, limit = self._list_limits(allowed, marker, limit)
432
            return allowed[start:start + limit]
433
        if shared or public:
434
            allowed = set()
435
            if shared:
436
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
437
            if public:
438
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
439
            allowed = sorted(allowed)
440
            start, limit = self._list_limits(allowed, marker, limit)
441
            return allowed[start:start + limit]
442
        node = self.node.node_lookup(account)
443
        containers = [x[0] for x in self._list_object_properties(
444
            node, account, '', '/', marker, limit, False, None, [], until)]
445
        start, limit = self._list_limits(
446
            [x[0] for x in containers], marker, limit)
447
        return containers[start:start + limit]
448

    
449
    def list_container_meta(self, user, account, container, domain, until=None):
450
        """Return a list with all the container's object meta keys for the domain."""
451

    
452
        logger.debug("list_container_meta: %s %s %s %s %s", user,
453
                     account, container, domain, until)
454
        allowed = []
455
        if user != account:
456
            if until:
457
                raise NotAllowedError
458
            allowed = self.permissions.access_list_paths(
459
                user, '/'.join((account, container)))
460
            if not allowed:
461
                raise NotAllowedError
462
        path, node = self._lookup_container(account, container)
463
        before = until if until is not None else inf
464
        allowed = self._get_formatted_paths(allowed)
465
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
466

    
467
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
468
        """Return a dictionary with the container metadata for the domain."""
469

    
470
        logger.debug("get_container_meta: %s %s %s %s %s", user,
471
                     account, container, domain, until)
472
        if user != account:
473
            if until or container not in self._allowed_containers(user, account):
474
                raise NotAllowedError
475
        path, node = self._lookup_container(account, container)
476
        props = self._get_properties(node, until)
477
        mtime = props[self.MTIME]
478
        count, bytes, tstamp = self._get_statistics(node, until)
479
        tstamp = max(tstamp, mtime)
480
        if until is None:
481
            modified = tstamp
482
        else:
483
            modified = self._get_statistics(
484
                node)[2]  # Overall last modification.
485
            modified = max(modified, mtime)
486

    
487
        if user != account:
488
            meta = {'name': container}
489
        else:
490
            meta = {}
491
            if include_user_defined:
492
                meta.update(
493
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
494
            if until is not None:
495
                meta.update({'until_timestamp': tstamp})
496
            meta.update({'name': container, 'count': count, 'bytes': bytes})
497
        meta.update({'modified': modified})
498
        return meta
499

    
500
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
501
        """Update the metadata associated with the container for the domain."""
502

    
503
        logger.debug("update_container_meta: %s %s %s %s %s %s",
504
                     user, account, container, domain, meta, replace)
505
        if user != account:
506
            raise NotAllowedError
507
        path, node = self._lookup_container(account, container)
508
        src_version_id, dest_version_id = self._put_metadata(
509
            user, node, domain, meta, replace,
510
            update_statistics_ancestors_depth=0)
511
        if src_version_id is not None:
512
            versioning = self._get_policy(
513
                node, is_account_policy=False)['versioning']
514
            if versioning != 'auto':
515
                self.node.version_remove(src_version_id,
516
                                         update_statistics_ancestors_depth=0)
517

    
518
    def get_container_policy(self, user, account, container):
519
        """Return a dictionary with the container policy."""
520

    
521
        logger.debug(
522
            "get_container_policy: %s %s %s", user, account, container)
523
        if user != account:
524
            if container not in self._allowed_containers(user, account):
525
                raise NotAllowedError
526
            return {}
527
        path, node = self._lookup_container(account, container)
528
        return self._get_policy(node, is_account_policy=False)
529

    
530
    def update_container_policy(self, user, account, container, policy, replace=False):
531
        """Update the policy associated with the container."""
532

    
533
        logger.debug("update_container_policy: %s %s %s %s %s",
534
                     user, account, container, policy, replace)
535
        if user != account:
536
            raise NotAllowedError
537
        path, node = self._lookup_container(account, container)
538
        self._check_policy(policy, is_account_policy=False)
539
        self._put_policy(node, policy, replace, is_account_policy=False)
540

    
541
    def put_container(self, user, account, container, policy=None):
542
        """Create a new container with the given name."""
543

    
544
        logger.debug(
545
            "put_container: %s %s %s %s", user, account, container, policy)
546
        policy = policy or {}
547
        if user != account:
548
            raise NotAllowedError
549
        try:
550
            path, node = self._lookup_container(account, container)
551
        except NameError:
552
            pass
553
        else:
554
            raise ContainerExists('Container already exists')
555
        if policy:
556
            self._check_policy(policy, is_account_policy=False)
557
        path = '/'.join((account, container))
558
        node = self._put_path(
559
            user, self._lookup_account(account, True)[1], path,
560
            update_statistics_ancestors_depth=-1)
561
        self._put_policy(node, policy, True, is_account_policy=False)
562

    
563
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
564
        """Delete/purge the container with the given name."""
565

    
566
        logger.debug("delete_container: %s %s %s %s %s %s", user,
567
                     account, container, until, prefix, delimiter)
568
        if user != account:
569
            raise NotAllowedError
570
        path, node = self._lookup_container(account, container)
571

    
572
        if until is not None:
573
            hashes, size, serials = self.node.node_purge_children(
574
                node, until, CLUSTER_HISTORY,
575
                update_statistics_ancestors_depth=0)
576
            for h in hashes:
577
                self.store.map_delete(h)
578
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
579
                                          update_statistics_ancestors_depth=0)
580
            if not self.free_versioning:
581
                self._report_size_change(
582
                    user, account, -size, {
583
                        'action':'container purge',
584
                        'path': path,
585
                        'versions': ','.join(str(i) for i in serials)
586
                    }
587
                )
588
            return
589

    
590
        if not delimiter:
591
            if self._get_statistics(node)[0] > 0:
592
                raise ContainerNotEmpty('Container is not empty')
593
            hashes, size, serials = self.node.node_purge_children(
594
                node, inf, CLUSTER_HISTORY,
595
                update_statistics_ancestors_depth=0)
596
            for h in hashes:
597
                self.store.map_delete(h)
598
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
599
                                          update_statistics_ancestors_depth=0)
600
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
601
            if not self.free_versioning:
602
                self._report_size_change(
603
                    user, account, -size, {
604
                        'action':'container purge',
605
                        'path': path,
606
                        'versions': ','.join(str(i) for i in serials)
607
                    }
608
                )
609
        else:
610
            # remove only contents
611
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
612
            paths = []
613
            for t in src_names:
614
                path = '/'.join((account, container, t[0]))
615
                node = t[2]
616
                src_version_id, dest_version_id = self._put_version_duplicate(
617
                    user, node, size=0, type='', hash=None, checksum='',
618
                    cluster=CLUSTER_DELETED,
619
                    update_statistics_ancestors_depth=1)
620
                del_size = self._apply_versioning(
621
                    account, container, src_version_id,
622
                    update_statistics_ancestors_depth=1)
623
                self._report_size_change(
624
                        user, account, -del_size, {
625
                                'action': 'object delete',
626
                                'path': path,
627
                        'versions': ','.join([str(dest_version_id)])
628
                     }
629
                )
630
                self._report_object_change(
631
                    user, account, path, details={'action': 'object delete'})
632
                paths.append(path)
633
            self.permissions.access_clear_bulk(paths)
634

    
635
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
636
        if user != account and until:
637
            raise NotAllowedError
638
        if shared and public:
639
            # get shared first
640
            shared_paths = self._list_object_permissions(
641
                user, account, container, prefix, shared=True, public=False)
642
            objects = set()
643
            if shared_paths:
644
                path, node = self._lookup_container(account, container)
645
                shared_paths = self._get_formatted_paths(shared_paths)
646
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
647

    
648
            # get public
649
            objects |= set(self._list_public_object_properties(
650
                user, account, container, prefix, all_props))
651
            objects = list(objects)
652

    
653
            objects.sort(key=lambda x: x[0])
654
            start, limit = self._list_limits(
655
                [x[0] for x in objects], marker, limit)
656
            return objects[start:start + limit]
657
        elif public:
658
            objects = self._list_public_object_properties(
659
                user, account, container, prefix, all_props)
660
            start, limit = self._list_limits(
661
                [x[0] for x in objects], marker, limit)
662
            return objects[start:start + limit]
663

    
664
        allowed = self._list_object_permissions(
665
            user, account, container, prefix, shared, public)
666
        if shared and not allowed:
667
            return []
668
        path, node = self._lookup_container(account, container)
669
        allowed = self._get_formatted_paths(allowed)
670
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
671
        start, limit = self._list_limits(
672
            [x[0] for x in objects], marker, limit)
673
        return objects[start:start + limit]
674

    
675
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
676
        public = self._list_object_permissions(
677
            user, account, container, prefix, shared=False, public=True)
678
        paths, nodes = self._lookup_objects(public)
679
        path = '/'.join((account, container))
680
        cont_prefix = path + '/'
681
        paths = [x[len(cont_prefix):] for x in paths]
682
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
683
        objects = [(path,) + props for path, props in zip(paths, props)]
684
        return objects
685

    
686
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
687
        objects = []
688
        while True:
689
            marker = objects[-1] if objects else None
690
            limit = 10000
691
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
692
            objects.extend(l)
693
            if not l or len(l) < limit:
694
                break
695
        return objects
696

    
697
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
698
        allowed = []
699
        path = '/'.join((account, container, prefix)).rstrip('/')
700
        if user != account:
701
            allowed = self.permissions.access_list_paths(user, path)
702
            if not allowed:
703
                raise NotAllowedError
704
        else:
705
            allowed = set()
706
            if shared:
707
                allowed.update(self.permissions.access_list_shared(path))
708
            if public:
709
                allowed.update(
710
                    [x[0] for x in self.permissions.public_list(path)])
711
            allowed = sorted(allowed)
712
            if not allowed:
713
                return []
714
        return allowed
715

    
716
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
717
        """Return a list of object (name, version_id) tuples existing under a container."""
718

    
719
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
720
        keys = keys or []
721
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
722

    
723
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
724
        """Return a list of object metadata dicts existing under a container."""
725

    
726
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
727
        keys = keys or []
728
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
729
        objects = []
730
        for p in props:
731
            if len(p) == 2:
732
                objects.append({'subdir': p[0]})
733
            else:
734
                objects.append({'name': p[0],
735
                                'bytes': p[self.SIZE + 1],
736
                                'type': p[self.TYPE + 1],
737
                                'hash': p[self.HASH + 1],
738
                                'version': p[self.SERIAL + 1],
739
                                'version_timestamp': p[self.MTIME + 1],
740
                                'modified': p[self.MTIME + 1] if until is None else None,
741
                                'modified_by': p[self.MUSER + 1],
742
                                'uuid': p[self.UUID + 1],
743
                                'checksum': p[self.CHECKSUM + 1]})
744
        return objects
745

    
746
    def list_object_permissions(self, user, account, container, prefix=''):
747
        """Return a list of paths that enforce permissions under a container."""
748

    
749
        logger.debug("list_object_permissions: %s %s %s %s", user,
750
                     account, container, prefix)
751
        return self._list_object_permissions(user, account, container, prefix, True, False)
752

    
753
    def list_object_public(self, user, account, container, prefix=''):
754
        """Return a dict mapping paths to public ids for objects that are public under a container."""
755

    
756
        logger.debug("list_object_public: %s %s %s %s", user,
757
                     account, container, prefix)
758
        public = {}
759
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
760
            public[path] = p
761
        return public
762

    
763
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
764
        """Return a dictionary with the object metadata for the domain."""
765

    
766
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
767
                     account, container, name, domain, version)
768
        self._can_read(user, account, container, name)
769
        path, node = self._lookup_object(account, container, name)
770
        props = self._get_version(node, version)
771
        if version is None:
772
            modified = props[self.MTIME]
773
        else:
774
            try:
775
                modified = self._get_version(
776
                    node)[self.MTIME]  # Overall last modification.
777
            except NameError:  # Object may be deleted.
778
                del_props = self.node.version_lookup(
779
                    node, inf, CLUSTER_DELETED)
780
                if del_props is None:
781
                    raise ItemNotExists('Object does not exist')
782
                modified = del_props[self.MTIME]
783

    
784
        meta = {}
785
        if include_user_defined:
786
            meta.update(
787
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
788
        meta.update({'name': name,
789
                     'bytes': props[self.SIZE],
790
                     'type': props[self.TYPE],
791
                     'hash': props[self.HASH],
792
                     'version': props[self.SERIAL],
793
                     'version_timestamp': props[self.MTIME],
794
                     'modified': modified,
795
                     'modified_by': props[self.MUSER],
796
                     'uuid': props[self.UUID],
797
                     'checksum': props[self.CHECKSUM]})
798
        return meta
799

    
800
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
801
        """Update the metadata associated with the object for the domain and return the new version."""
802

    
803
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
804
                     user, account, container, name, domain, meta, replace)
805
        self._can_write(user, account, container, name)
806
        path, node = self._lookup_object(account, container, name)
807
        src_version_id, dest_version_id = self._put_metadata(
808
            user, node, domain, meta, replace,
809
            update_statistics_ancestors_depth=1)
810
        self._apply_versioning(account, container, src_version_id,
811
                               update_statistics_ancestors_depth=1)
812
        return dest_version_id
813

    
814
    def get_object_permissions(self, user, account, container, name):
815
        """Return the action allowed on the object, the path
816
        from which the object gets its permissions from,
817
        along with a dictionary containing the permissions."""
818

    
819
        logger.debug("get_object_permissions: %s %s %s %s", user,
820
                     account, container, name)
821
        allowed = 'write'
822
        permissions_path = self._get_permissions_path(account, container, name)
823
        if user != account:
824
            if self.permissions.access_check(permissions_path, self.WRITE, user):
825
                allowed = 'write'
826
            elif self.permissions.access_check(permissions_path, self.READ, user):
827
                allowed = 'read'
828
            else:
829
                raise NotAllowedError
830
        self._lookup_object(account, container, name)
831
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
832

    
833
    def update_object_permissions(self, user, account, container, name, permissions):
834
        """Update the permissions associated with the object."""
835

    
836
        logger.debug("update_object_permissions: %s %s %s %s %s",
837
                     user, account, container, name, permissions)
838
        if user != account:
839
            raise NotAllowedError
840
        path = self._lookup_object(account, container, name)[0]
841
        self._check_permissions(path, permissions)
842
        self.permissions.access_set(path, permissions)
843
        self._report_sharing_change(user, account, path, {'members':
844
                                    self.permissions.access_members(path)})
845

    
846
    def get_object_public(self, user, account, container, name):
847
        """Return the public id of the object if applicable."""
848

    
849
        logger.debug(
850
            "get_object_public: %s %s %s %s", user, account, container, name)
851
        self._can_read(user, account, container, name)
852
        path = self._lookup_object(account, container, name)[0]
853
        p = self.permissions.public_get(path)
854
        return p
855

    
856
    def update_object_public(self, user, account, container, name, public):
857
        """Update the public status of the object."""
858

    
859
        logger.debug("update_object_public: %s %s %s %s %s", user,
860
                     account, container, name, public)
861
        self._can_write(user, account, container, name)
862
        path = self._lookup_object(account, container, name)[0]
863
        if not public:
864
            self.permissions.public_unset(path)
865
        else:
866
            self.permissions.public_set(
867
                path, self.public_url_security, self.public_url_alphabet
868
            )
869

    
870
    def get_object_hashmap(self, user, account, container, name, version=None):
871
        """Return the object's size and a list with partial hashes."""
872

    
873
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
874
                     account, container, name, version)
875
        self._can_read(user, account, container, name)
876
        path, node = self._lookup_object(account, container, name)
877
        props = self._get_version(node, version)
878
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
879
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
880

    
881
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
882
        if permissions is not None and user != account:
883
            raise NotAllowedError
884
        self._can_write(user, account, container, name)
885
        if permissions is not None:
886
            path = '/'.join((account, container, name))
887
            self._check_permissions(path, permissions)
888

    
889
        account_path, account_node = self._lookup_account(account, True)
890
        container_path, container_node = self._lookup_container(
891
            account, container)
892

    
893
        path, node = self._put_object_node(
894
            container_path, container_node, name)
895
        pre_version_id, dest_version_id = self._put_version_duplicate(
896
            user, node, src_node=src_node, size=size, type=type, hash=hash,
897
            checksum=checksum, is_copy=is_copy,
898
            update_statistics_ancestors_depth=1)
899

    
900
        # Handle meta.
901
        if src_version_id is None:
902
            src_version_id = pre_version_id
903
        self._put_metadata_duplicate(
904
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
905

    
906
        del_size = self._apply_versioning(account, container, pre_version_id,
907
                                          update_statistics_ancestors_depth=1)
908
        size_delta = size - del_size
909
        if size_delta > 0:
910
            # Check account quota.
911
            if not self.using_external_quotaholder:
912
                account_quota = long(
913
                    self._get_policy(account_node, is_account_policy=True
914
                    )['quota']
915
                )
916
                account_usage = self._get_statistics(account_node, compute=True)[1]
917
                if (account_quota > 0 and account_usage > account_quota):
918
                    raise QuotaError(
919
                        'Account quota exceeded: limit: %s, usage: %s' % (
920
                            account_quota, account_usage
921
                        )
922
                    )
923

    
924
            # Check container quota.
925
            container_quota = long(
926
                self._get_policy(container_node, is_account_policy=False
927
                )['quota']
928
            )
929
            container_usage = self._get_statistics(container_node)[1]
930
            if (container_quota > 0 and container_usage > container_quota):
931
                # This must be executed in a transaction, so the version is
932
                # never created if it fails.
933
                raise QuotaError(
934
                    'Container quota exceeded: limit: %s, usage: %s' % (
935
                        container_quota, container_usage
936
                    )
937
                )
938

    
939
        self._report_size_change(user, account, size_delta,
940
                                 {'action': 'object update', 'path': path,
941
                                  'versions': ','.join([str(dest_version_id)])})
942
        if permissions is not None:
943
            self.permissions.access_set(path, permissions)
944
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
945

    
946
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
947
        return dest_version_id
948

    
949
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
950
        """Create/update an object with the specified size and partial hashes."""
951

    
952
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
953
                     account, container, name, size, type, hashmap, checksum)
954
        meta = meta or {}
955
        if size == 0:  # No such thing as an empty hashmap.
956
            hashmap = [self.put_block('')]
957
        map = HashMap(self.block_size, self.hash_algorithm)
958
        map.extend([binascii.unhexlify(x) for x in hashmap])
959
        missing = self.store.block_search(map)
960
        if missing:
961
            ie = IndexError()
962
            ie.data = [binascii.hexlify(x) for x in missing]
963
            raise ie
964

    
965
        hash = map.hash()
966
        hexlified = binascii.hexlify(hash)
967
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, hexlified, checksum, domain, meta, replace_meta, permissions)
968
        self.store.map_put(hash, map)
969
        return dest_version_id, hexlified
970

    
971
    def update_object_checksum(self, user, account, container, name, version, checksum):
972
        """Update an object's checksum."""
973

    
974
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
975
                     user, account, container, name, version, checksum)
976
        # Update objects with greater version and same hashmap and size (fix metadata updates).
977
        self._can_write(user, account, container, name)
978
        path, node = self._lookup_object(account, container, name)
979
        props = self._get_version(node, version)
980
        versions = self.node.node_get_versions(node)
981
        for x in versions:
982
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
983
                self.node.version_put_property(
984
                    x[self.SERIAL], 'checksum', checksum)
985

    
986
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
987
        dest_meta = dest_meta or {}
988
        dest_version_ids = []
989
        self._can_read(user, src_account, src_container, src_name)
990
        path, node = self._lookup_object(src_account, src_container, src_name)
991
        # TODO: Will do another fetch of the properties in duplicate version...
992
        props = self._get_version(
993
            node, src_version)  # Check to see if source exists.
994
        src_version_id = props[self.SERIAL]
995
        hash = props[self.HASH]
996
        size = props[self.SIZE]
997
        is_copy = not is_move and (src_account, src_container, src_name) != (
998
            dest_account, dest_container, dest_name)  # New uuid.
999
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1000
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1001
            self._delete_object(user, src_account, src_container, src_name)
1002

    
1003
        if delimiter:
1004
            prefix = src_name + \
1005
                delimiter if not src_name.endswith(delimiter) else src_name
1006
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1007
            src_names.sort(key=lambda x: x[2])  # order by nodes
1008
            paths = [elem[0] for elem in src_names]
1009
            nodes = [elem[2] for elem in src_names]
1010
            # TODO: Will do another fetch of the properties in duplicate version...
1011
            props = self._get_versions(nodes)  # Check to see if source exists.
1012

    
1013
            for prop, path, node in zip(props, paths, nodes):
1014
                src_version_id = prop[self.SERIAL]
1015
                hash = prop[self.HASH]
1016
                vtype = prop[self.TYPE]
1017
                size = prop[self.SIZE]
1018
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1019
                    delimiter) else dest_name
1020
                vdest_name = path.replace(prefix, dest_prefix, 1)
1021
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1022
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1023
                    self._delete_object(user, src_account, src_container, path)
1024
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1025

    
1026
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1027
        """Copy an object's data and metadata."""
1028

    
1029
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1030
        meta = meta or {}
1031
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1032
        return dest_version_id
1033

    
1034
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1035
        """Move an object's data and metadata."""
1036

    
1037
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1038
        meta = meta or {}
1039
        if user != src_account:
1040
            raise NotAllowedError
1041
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1042
        return dest_version_id
1043

    
1044
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1045
        if user != account:
1046
            raise NotAllowedError
1047

    
1048
        if until is not None:
1049
            path = '/'.join((account, container, name))
1050
            node = self.node.node_lookup(path)
1051
            if node is None:
1052
                return
1053
            hashes = []
1054
            size = 0
1055
            serials = []
1056
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1057
                                           update_statistics_ancestors_depth=1)
1058
            hashes += h
1059
            size += s
1060
            serials += v
1061
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1062
                                           update_statistics_ancestors_depth=1)
1063
            hashes += h
1064
            if not self.free_versioning:
1065
                size += s
1066
            serials += v
1067
            for h in hashes:
1068
                self.store.map_delete(h)
1069
            self.node.node_purge(node, until, CLUSTER_DELETED,
1070
                                 update_statistics_ancestors_depth=1)
1071
            try:
1072
                props = self._get_version(node)
1073
            except NameError:
1074
                self.permissions.access_clear(path)
1075
            self._report_size_change(
1076
                user, account, -size, {
1077
                    'action': 'object purge',
1078
                    'path': path,
1079
                    'versions': ','.join(str(i) for i in serials)
1080
                }
1081
            )
1082
            return
1083

    
1084
        path, node = self._lookup_object(account, container, name)
1085
        src_version_id, dest_version_id = self._put_version_duplicate(
1086
            user, node, size=0, type='', hash=None, checksum='',
1087
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1088
        del_size = self._apply_versioning(account, container, src_version_id,
1089
                                          update_statistics_ancestors_depth=1)
1090
        self._report_size_change(user, account, -del_size,
1091
                                 {'action': 'object delete', 'path': path,
1092
                                  'versions': ','.join([str(dest_version_id)])})
1093
        self._report_object_change(
1094
            user, account, path, details={'action': 'object delete'})
1095
        self.permissions.access_clear(path)
1096

    
1097
        if delimiter:
1098
            prefix = name + delimiter if not name.endswith(delimiter) else name
1099
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1100
            paths = []
1101
            for t in src_names:
1102
                path = '/'.join((account, container, t[0]))
1103
                node = t[2]
1104
                src_version_id, dest_version_id = self._put_version_duplicate(
1105
                    user, node, size=0, type='', hash=None, checksum='',
1106
                    cluster=CLUSTER_DELETED,
1107
                    update_statistics_ancestors_depth=1)
1108
                del_size = self._apply_versioning(
1109
                    account, container, src_version_id,
1110
                    update_statistics_ancestors_depth=1)
1111
                self._report_size_change(user, account, -del_size,
1112
                                         {'action': 'object delete',
1113
                                          'path': path,
1114
                                          'versions': ','.join([str(dest_version_id)])})
1115
                self._report_object_change(
1116
                    user, account, path, details={'action': 'object delete'})
1117
                paths.append(path)
1118
            self.permissions.access_clear_bulk(paths)
1119

    
1120
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1121
        """Delete/purge an object."""
1122

    
1123
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1124
                     account, container, name, until, prefix, delimiter)
1125
        self._delete_object(user, account, container, name, until, delimiter)
1126

    
1127
    def list_versions(self, user, account, container, name):
1128
        """Return a list of all (version, version_timestamp) tuples for an object."""
1129

    
1130
        logger.debug(
1131
            "list_versions: %s %s %s %s", user, account, container, name)
1132
        self._can_read(user, account, container, name)
1133
        path, node = self._lookup_object(account, container, name)
1134
        versions = self.node.node_get_versions(node)
1135
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1136

    
1137
    def get_uuid(self, user, uuid):
1138
        """Return the (account, container, name) for the UUID given."""
1139

    
1140
        logger.debug("get_uuid: %s %s", user, uuid)
1141
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1142
        if info is None:
1143
            raise NameError
1144
        path, serial = info
1145
        account, container, name = path.split('/', 2)
1146
        self._can_read(user, account, container, name)
1147
        return (account, container, name)
1148

    
1149
    def get_public(self, user, public):
1150
        """Return the (account, container, name) for the public id given."""
1151

    
1152
        logger.debug("get_public: %s %s", user, public)
1153
        path = self.permissions.public_path(public)
1154
        if path is None:
1155
            raise NameError
1156
        account, container, name = path.split('/', 2)
1157
        self._can_read(user, account, container, name)
1158
        return (account, container, name)
1159

    
1160
    def get_block(self, hash):
1161
        """Return a block's data."""
1162

    
1163
        logger.debug("get_block: %s", hash)
1164
        block = self.store.block_get(binascii.unhexlify(hash))
1165
        if not block:
1166
            raise ItemNotExists('Block does not exist')
1167
        return block
1168

    
1169
    def put_block(self, data):
1170
        """Store a block and return the hash."""
1171

    
1172
        logger.debug("put_block: %s", len(data))
1173
        return binascii.hexlify(self.store.block_put(data))
1174

    
1175
    def update_block(self, hash, data, offset=0):
1176
        """Update a known block and return the hash."""
1177

    
1178
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1179
        if offset == 0 and len(data) == self.block_size:
1180
            return self.put_block(data)
1181
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1182
        return binascii.hexlify(h)
1183

    
1184
    # Path functions.
1185

    
1186
    def _generate_uuid(self):
1187
        return str(uuidlib.uuid4())
1188

    
1189
    def _put_object_node(self, path, parent, name):
1190
        path = '/'.join((path, name))
1191
        node = self.node.node_lookup(path)
1192
        if node is None:
1193
            node = self.node.node_create(parent, path)
1194
        return path, node
1195

    
1196
    def _put_path(self, user, parent, path,
1197
                  update_statistics_ancestors_depth=None):
1198
        node = self.node.node_create(parent, path)
1199
        self.node.version_create(node, None, 0, '', None, user,
1200
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1201
                                 update_statistics_ancestors_depth)
1202
        return node
1203

    
1204
    def _lookup_account(self, account, create=True):
1205
        for_update = True if create else False
1206
        node = self.node.node_lookup(account, for_update=for_update)
1207
        if node is None and create:
1208
            node = self._put_path(
1209
                account, self.ROOTNODE, account,
1210
                update_statistics_ancestors_depth=-1)  # User is account.
1211
        return account, node
1212

    
1213
    def _lookup_container(self, account, container):
1214
        for_update = True if self.lock_container_path else False
1215
        path = '/'.join((account, container))
1216
        node = self.node.node_lookup(path, for_update)
1217
        if node is None:
1218
            raise ItemNotExists('Container does not exist')
1219
        return path, node
1220

    
1221
    def _lookup_object(self, account, container, name):
1222
        path = '/'.join((account, container, name))
1223
        node = self.node.node_lookup(path)
1224
        if node is None:
1225
            raise ItemNotExists('Object does not exist')
1226
        return path, node
1227

    
1228
    def _lookup_objects(self, paths):
1229
        nodes = self.node.node_lookup_bulk(paths)
1230
        return paths, nodes
1231

    
1232
    def _get_properties(self, node, until=None):
1233
        """Return properties until the timestamp given."""
1234

    
1235
        before = until if until is not None else inf
1236
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1237
        if props is None and until is not None:
1238
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1239
        if props is None:
1240
            raise ItemNotExists('Path does not exist')
1241
        return props
1242

    
1243
    def _get_statistics(self, node, until=None, compute=False):
1244
        """Return count, sum of size and latest timestamp of everything under node."""
1245

    
1246
        if until is not None:
1247
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1248
        elif compute:
1249
            stats = self.node.statistics_latest(node, except_cluster=CLUSTER_DELETED)
1250
        else:
1251
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1252
        if stats is None:
1253
            stats = (0, 0, 0)
1254
        return stats
1255

    
1256
    def _get_version(self, node, version=None):
1257
        if version is None:
1258
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1259
            if props is None:
1260
                raise ItemNotExists('Object does not exist')
1261
        else:
1262
            try:
1263
                version = int(version)
1264
            except ValueError:
1265
                raise VersionNotExists('Version does not exist')
1266
            props = self.node.version_get_properties(version)
1267
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1268
                raise VersionNotExists('Version does not exist')
1269
        return props
1270

    
1271
    def _get_versions(self, nodes):
1272
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1273

    
1274
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1275
                               type=None, hash=None, checksum=None,
1276
                               cluster=CLUSTER_NORMAL, is_copy=False,
1277
                               update_statistics_ancestors_depth=None):
1278
        """Create a new version of the node."""
1279

    
1280
        props = self.node.version_lookup(
1281
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1282
        if props is not None:
1283
            src_version_id = props[self.SERIAL]
1284
            src_hash = props[self.HASH]
1285
            src_size = props[self.SIZE]
1286
            src_type = props[self.TYPE]
1287
            src_checksum = props[self.CHECKSUM]
1288
        else:
1289
            src_version_id = None
1290
            src_hash = None
1291
            src_size = 0
1292
            src_type = ''
1293
            src_checksum = ''
1294
        if size is None:  # Set metadata.
1295
            hash = src_hash  # This way hash can be set to None (account or container).
1296
            size = src_size
1297
        if type is None:
1298
            type = src_type
1299
        if checksum is None:
1300
            checksum = src_checksum
1301
        uuid = self._generate_uuid(
1302
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1303

    
1304
        if src_node is None:
1305
            pre_version_id = src_version_id
1306
        else:
1307
            pre_version_id = None
1308
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1309
            if props is not None:
1310
                pre_version_id = props[self.SERIAL]
1311
        if pre_version_id is not None:
1312
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1313
                                        update_statistics_ancestors_depth)
1314

    
1315
        dest_version_id, mtime = self.node.version_create(
1316
            node, hash, size, type, src_version_id, user, uuid, checksum,
1317
            cluster, update_statistics_ancestors_depth)
1318

    
1319
        self.node.attribute_unset_is_latest(node, dest_version_id)
1320

    
1321
        return pre_version_id, dest_version_id
1322

    
1323
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1324
                                node, meta, replace=False):
1325
        if src_version_id is not None:
1326
            self.node.attribute_copy(src_version_id, dest_version_id)
1327
        if not replace:
1328
            self.node.attribute_del(dest_version_id, domain, (
1329
                k for k, v in meta.iteritems() if v == ''))
1330
            self.node.attribute_set(dest_version_id, domain, node, (
1331
                (k, v) for k, v in meta.iteritems() if v != ''))
1332
        else:
1333
            self.node.attribute_del(dest_version_id, domain)
1334
            self.node.attribute_set(dest_version_id, domain, node, ((
1335
                k, v) for k, v in meta.iteritems()))
1336

    
1337
    def _put_metadata(self, user, node, domain, meta, replace=False,
1338
                      update_statistics_ancestors_depth=None):
1339
        """Create a new version and store metadata."""
1340

    
1341
        src_version_id, dest_version_id = self._put_version_duplicate(
1342
            user, node,
1343
            update_statistics_ancestors_depth=update_statistics_ancestors_depth)
1344
        self._put_metadata_duplicate(
1345
            src_version_id, dest_version_id, domain, node, meta, replace)
1346
        return src_version_id, dest_version_id
1347

    
1348
    def _list_limits(self, listing, marker, limit):
1349
        start = 0
1350
        if marker:
1351
            try:
1352
                start = listing.index(marker) + 1
1353
            except ValueError:
1354
                pass
1355
        if not limit or limit > 10000:
1356
            limit = 10000
1357
        return start, limit
1358

    
1359
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1360
        keys = keys or []
1361
        allowed = allowed or []
1362
        cont_prefix = path + '/'
1363
        prefix = cont_prefix + prefix
1364
        start = cont_prefix + marker if marker else None
1365
        before = until if until is not None else inf
1366
        filterq = keys if domain else []
1367
        sizeq = size_range
1368

    
1369
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1370
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1371
        objects.sort(key=lambda x: x[0])
1372
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1373
        return objects
1374

    
1375
    # Reporting functions.
1376

    
1377
    def _report_size_change(self, user, account, size, details=None):
1378
        details = details or {}
1379

    
1380
        if size == 0:
1381
            return
1382

    
1383
        account_node = self._lookup_account(account, True)[1]
1384
        total = self._get_statistics(account_node, compute=True)[1]
1385
        details.update({'user': user, 'total': total})
1386
        logger.debug(
1387
            "_report_size_change: %s %s %s %s", user, account, size, details)
1388
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1389
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1390
                              float(size), details))
1391

    
1392
        if not self.using_external_quotaholder:
1393
            return
1394

    
1395
        try:
1396
            name = details['path'] if 'path' in details else ''
1397
            serial = self.astakosclient.issue_one_commission(
1398
                token=self.service_token,
1399
                holder=account,
1400
                source=DEFAULT_SOURCE,
1401
                provisions={'pithos.diskspace': size},
1402
                name=name
1403
                )
1404
        except BaseException, e:
1405
            raise QuotaError(e)
1406
        else:
1407
            self.serials.append(serial)
1408

    
1409
    def _report_object_change(self, user, account, path, details=None):
1410
        details = details or {}
1411
        details.update({'user': user})
1412
        logger.debug("_report_object_change: %s %s %s %s", user,
1413
                     account, path, details)
1414
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1415
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1416

    
1417
    def _report_sharing_change(self, user, account, path, details=None):
1418
        logger.debug("_report_permissions_change: %s %s %s %s",
1419
                     user, account, path, details)
1420
        details = details or {}
1421
        details.update({'user': user})
1422
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1423
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1424

    
1425
    # Policy functions.
1426

    
1427
    def _check_policy(self, policy, is_account_policy=True):
1428
        default_policy = self.default_account_policy \
1429
            if is_account_policy else self.default_container_policy
1430
        for k in policy.keys():
1431
            if policy[k] == '':
1432
                policy[k] = default_policy.get(k)
1433
        for k, v in policy.iteritems():
1434
            if k == 'quota':
1435
                q = int(v)  # May raise ValueError.
1436
                if q < 0:
1437
                    raise ValueError
1438
            elif k == 'versioning':
1439
                if v not in ['auto', 'none']:
1440
                    raise ValueError
1441
            else:
1442
                raise ValueError
1443

    
1444
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1445
        default_policy = self.default_account_policy \
1446
            if is_account_policy else self.default_container_policy
1447
        if replace:
1448
            for k, v in default_policy.iteritems():
1449
                if k not in policy:
1450
                    policy[k] = v
1451
        self.node.policy_set(node, policy)
1452

    
1453
    def _get_policy(self, node, is_account_policy=True):
1454
        default_policy = self.default_account_policy \
1455
            if is_account_policy else self.default_container_policy
1456
        policy = default_policy.copy()
1457
        policy.update(self.node.policy_get(node))
1458
        return policy
1459

    
1460
    def _apply_versioning(self, account, container, version_id,
1461
                          update_statistics_ancestors_depth=None):
1462
        """Delete the provided version if such is the policy.
1463
           Return size of object removed.
1464
        """
1465

    
1466
        if version_id is None:
1467
            return 0
1468
        path, node = self._lookup_container(account, container)
1469
        versioning = self._get_policy(
1470
            node, is_account_policy=False)['versioning']
1471
        if versioning != 'auto':
1472
            hash, size = self.node.version_remove(
1473
                version_id, update_statistics_ancestors_depth)
1474
            self.store.map_delete(hash)
1475
            return size
1476
        elif self.free_versioning:
1477
            return self.node.version_get_properties(
1478
                version_id, keys=('size',))[0]
1479
        return 0
1480

    
1481
    # Access control functions.
1482

    
1483
    def _check_groups(self, groups):
1484
        # raise ValueError('Bad characters in groups')
1485
        pass
1486

    
1487
    def _check_permissions(self, path, permissions):
1488
        # raise ValueError('Bad characters in permissions')
1489
        pass
1490

    
1491
    def _get_formatted_paths(self, paths):
1492
        formatted = []
1493
        for p in paths:
1494
            node = self.node.node_lookup(p)
1495
            props = None
1496
            if node is not None:
1497
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1498
            if props is not None:
1499
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1500
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1501
                formatted.append((p, self.MATCH_EXACT))
1502
        return formatted
1503

    
1504
    def _get_permissions_path(self, account, container, name):
1505
        path = '/'.join((account, container, name))
1506
        permission_paths = self.permissions.access_inherit(path)
1507
        permission_paths.sort()
1508
        permission_paths.reverse()
1509
        for p in permission_paths:
1510
            if p == path:
1511
                return p
1512
            else:
1513
                if p.count('/') < 2:
1514
                    continue
1515
                node = self.node.node_lookup(p)
1516
                props = None
1517
                if node is not None:
1518
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1519
                if props is not None:
1520
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1521
                        return p
1522
        return None
1523

    
1524
    def _can_read(self, user, account, container, name):
1525
        if user == account:
1526
            return True
1527
        path = '/'.join((account, container, name))
1528
        if self.permissions.public_get(path) is not None:
1529
            return True
1530
        path = self._get_permissions_path(account, container, name)
1531
        if not path:
1532
            raise NotAllowedError
1533
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1534
            raise NotAllowedError
1535

    
1536
    def _can_write(self, user, account, container, name):
1537
        if user == account:
1538
            return True
1539
        path = '/'.join((account, container, name))
1540
        path = self._get_permissions_path(account, container, name)
1541
        if not path:
1542
            raise NotAllowedError
1543
        if not self.permissions.access_check(path, self.WRITE, user):
1544
            raise NotAllowedError
1545

    
1546
    def _allowed_accounts(self, user):
1547
        allow = set()
1548
        for path in self.permissions.access_list_paths(user):
1549
            allow.add(path.split('/', 1)[0])
1550
        return sorted(allow)
1551

    
1552
    def _allowed_containers(self, user, account):
1553
        allow = set()
1554
        for path in self.permissions.access_list_paths(user, account):
1555
            allow.add(path.split('/', 2)[1])
1556
        return sorted(allow)
1557

    
1558
    # Domain functions
1559

    
1560
    def get_domain_objects(self, domain, user=None):
1561
        allowed_paths = self.permissions.access_list_paths(
1562
            user, include_owned=user is not None, include_containers=False)
1563
        if not allowed_paths:
1564
            return []
1565
        obj_list = self.node.domain_object_list(
1566
            domain, allowed_paths, CLUSTER_NORMAL)
1567
        return [(path,
1568
                 self._build_metadata(props, user_defined_meta),
1569
                 self.permissions.access_get(path)) for
1570
                path, props, user_defined_meta in obj_list]
1571

    
1572
    # util functions
1573

    
1574
    def _build_metadata(self, props, user_defined=None,
1575
                        include_user_defined=True):
1576
        meta = {'bytes': props[self.SIZE],
1577
                'type': props[self.TYPE],
1578
                'hash': props[self.HASH],
1579
                'version': props[self.SERIAL],
1580
                'version_timestamp': props[self.MTIME],
1581
                'modified_by': props[self.MUSER],
1582
                'uuid': props[self.UUID],
1583
                'checksum': props[self.CHECKSUM]}
1584
        if include_user_defined and user_defined != None:
1585
            meta.update(user_defined)
1586
        return meta
1587

    
1588
    def _has_read_access(self, user, path):
1589
        try:
1590
            account, container, object = path.split('/', 2)
1591
        except ValueError:
1592
            raise ValueError('Invalid object path')
1593

    
1594
        assert isinstance(user, basestring), "Invalid user"
1595

    
1596
        try:
1597
            self._can_read(user, account, container, object)
1598
        except NotAllowedError:
1599
            return False
1600
        else:
1601
            return True