Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 7ee27246

History | View | Annotate | Download (67.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
try:
41
    from astakosclient import AstakosClient
42
except ImportError:
43
    AstakosClient = None
44

    
45
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
46
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
47
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
48
                  ContainerNotEmpty, ItemNotExists, VersionNotExists)
49

    
50

    
51
class DisabledAstakosClient(object):
52
    def __init__(self, *args, **kwargs):
53
        self.args = args
54
        self.kwargs = kwargs
55

    
56
    def __getattr__(self, name):
57
        m = ("AstakosClient has been disabled, "
58
             "yet an attempt to access it was made")
59
        raise AssertionError(m)
60

    
61

    
62
# Stripped-down version of the HashMap class found in tools.
63

    
64
class HashMap(list):
65

    
66
    def __init__(self, blocksize, blockhash):
67
        super(HashMap, self).__init__()
68
        self.blocksize = blocksize
69
        self.blockhash = blockhash
70

    
71
    def _hash_raw(self, v):
72
        h = hashlib.new(self.blockhash)
73
        h.update(v)
74
        return h.digest()
75

    
76
    def hash(self):
77
        if len(self) == 0:
78
            return self._hash_raw('')
79
        if len(self) == 1:
80
            return self.__getitem__(0)
81

    
82
        h = list(self)
83
        s = 2
84
        while s < len(h):
85
            s = s * 2
86
        h += [('\x00' * len(h[0]))] * (s - len(h))
87
        while len(h) > 1:
88
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
89
        return h[0]
90

    
91
# Default modules and settings.
92
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
94
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
95
DEFAULT_BLOCK_PATH = 'data/'
96
DEFAULT_BLOCK_UMASK = 0o022
97
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
98
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
99
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
100
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
101
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
102
                               'abcdefghijklmnopqrstuvwxyz'
103
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
104
DEFAULT_PUBLIC_URL_SECURITY = 16
105

    
106
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
107
QUEUE_CLIENT_ID = 'pithos'
108
QUEUE_INSTANCE_ID = '1'
109

    
110
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
111

    
112
inf = float('inf')
113

    
114
ULTIMATE_ANSWER = 42
115

    
116
DEFAULT_SOURCE = 'system'
117

    
118
logger = logging.getLogger(__name__)
119

    
120

    
121
def backend_method(func=None, autocommit=1):
122
    if func is None:
123
        def fn(func):
124
            return backend_method(func, autocommit)
125
        return fn
126

    
127
    if not autocommit:
128
        return func
129

    
130
    def fn(self, *args, **kw):
131
        self.wrapper.execute()
132
        serials = []
133
        self.serials = serials
134
        self.messages = []
135

    
136
        try:
137
            ret = func(self, *args, **kw)
138
            for m in self.messages:
139
                self.queue.send(*m)
140
            if self.serials:
141
                self.commission_serials.insert_many(self.serials)
142

    
143
                # commit to ensure that the serials are registered
144
                # even if accept commission fails
145
                self.wrapper.commit()
146
                self.wrapper.execute()
147

    
148
                r = self.astakosclient.resolve_commissions(
149
                            token=self.service_token,
150
                            accept_serials=self.serials,
151
                            reject_serials=[])
152
                self.commission_serials.delete_many(r['accepted'])
153

    
154
            self.wrapper.commit()
155
            return ret
156
        except:
157
            if self.serials:
158
                self.astakosclient.resolve_commissions(
159
                    token=self.service_token,
160
                    accept_serials=[],
161
                    reject_serials=self.serials)
162
            self.wrapper.rollback()
163
            raise
164
    return fn
165

    
166

    
167
class ModularBackend(BaseBackend):
168
    """A modular backend.
169

170
    Uses modules for SQL functions and storage.
171
    """
172

    
173
    def __init__(self, db_module=None, db_connection=None,
174
                 block_module=None, block_path=None, block_umask=None,
175
                 queue_module=None, queue_hosts=None, queue_exchange=None,
176
                 astakos_url=None, service_token=None,
177
                 astakosclient_poolsize=None,
178
                 free_versioning=True, block_params=None,
179
                 public_url_security=None,
180
                 public_url_alphabet=None,
181
                 account_quota_policy=None,
182
                 container_quota_policy=None,
183
                 container_versioning_policy=None):
184
        db_module = db_module or DEFAULT_DB_MODULE
185
        db_connection = db_connection or DEFAULT_DB_CONNECTION
186
        block_module = block_module or DEFAULT_BLOCK_MODULE
187
        block_path = block_path or DEFAULT_BLOCK_PATH
188
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
189
        block_params = block_params or DEFAULT_BLOCK_PARAMS
190
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
191
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
192
        container_quota_policy = container_quota_policy \
193
            or DEFAULT_CONTAINER_QUOTA
194
        container_versioning_policy = container_versioning_policy \
195
            or DEFAULT_CONTAINER_VERSIONING
196

    
197
        self.default_account_policy = {'quota': account_quota_policy}
198
        self.default_container_policy = {
199
            'quota': container_quota_policy,
200
            'versioning': container_versioning_policy
201
        }
202
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
203
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
204

    
205
        self.public_url_security = public_url_security or DEFAULT_PUBLIC_URL_SECURITY
206
        self.public_url_alphabet = public_url_alphabet or DEFAULT_PUBLIC_URL_ALPHABET
207

    
208
        self.hash_algorithm = 'sha256'
209
        self.block_size = 4 * 1024 * 1024  # 4MB
210
        self.free_versioning = free_versioning
211

    
212
        def load_module(m):
213
            __import__(m)
214
            return sys.modules[m]
215

    
216
        self.db_module = load_module(db_module)
217
        self.wrapper = self.db_module.DBWrapper(db_connection)
218
        params = {'wrapper': self.wrapper}
219
        self.permissions = self.db_module.Permissions(**params)
220
        self.config = self.db_module.Config(**params)
221
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
222
        for x in ['READ', 'WRITE']:
223
            setattr(self, x, getattr(self.db_module, x))
224
        self.node = self.db_module.Node(**params)
225
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
226
            setattr(self, x, getattr(self.db_module, x))
227

    
228
        self.block_module = load_module(block_module)
229
        self.block_params = block_params
230
        params = {'path': block_path,
231
                  'block_size': self.block_size,
232
                  'hash_algorithm': self.hash_algorithm,
233
                  'umask': block_umask}
234
        params.update(self.block_params)
235
        self.store = self.block_module.Store(**params)
236

    
237
        if queue_module and queue_hosts:
238
            self.queue_module = load_module(queue_module)
239
            params = {'hosts': queue_hosts,
240
                      'exchange': queue_exchange,
241
                      'client_id': QUEUE_CLIENT_ID}
242
            self.queue = self.queue_module.Queue(**params)
243
        else:
244
            class NoQueue:
245
                def send(self, *args):
246
                    pass
247

    
248
                def close(self):
249
                    pass
250

    
251
            self.queue = NoQueue()
252

    
253
        self.astakos_url = astakos_url
254
        self.service_token = service_token
255

    
256
        if not astakos_url or not AstakosClient:
257
            self.astakosclient = DisabledAstakosClient(
258
                astakos_url,
259
                use_pool=True,
260
                pool_size=astakosclient_poolsize)
261
        else:
262
            self.astakosclient = AstakosClient(
263
                astakos_url,
264
                use_pool=True,
265
                pool_size=astakosclient_poolsize)
266

    
267
        self.serials = []
268
        self.messages = []
269

    
270
    def close(self):
271
        self.wrapper.close()
272
        self.queue.close()
273

    
274
    @property
275
    def using_external_quotaholder(self):
276
        return not isinstance(self.astakosclient, DisabledAstakosClient)
277

    
278
    @backend_method
279
    def list_accounts(self, user, marker=None, limit=10000):
280
        """Return a list of accounts the user can access."""
281

    
282
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
283
        allowed = self._allowed_accounts(user)
284
        start, limit = self._list_limits(allowed, marker, limit)
285
        return allowed[start:start + limit]
286

    
287
    @backend_method
288
    def get_account_meta(
289
            self, user, account, domain, until=None, include_user_defined=True,
290
            external_quota=None):
291
        """Return a dictionary with the account metadata for the domain."""
292

    
293
        logger.debug(
294
            "get_account_meta: %s %s %s %s", user, account, domain, until)
295
        path, node = self._lookup_account(account, user == account)
296
        if user != account:
297
            if until or node is None or account not in self._allowed_accounts(user):
298
                raise NotAllowedError
299
        try:
300
            props = self._get_properties(node, until)
301
            mtime = props[self.MTIME]
302
        except NameError:
303
            props = None
304
            mtime = until
305
        count, bytes, tstamp = self._get_statistics(node, until)
306
        tstamp = max(tstamp, mtime)
307
        if until is None:
308
            modified = tstamp
309
        else:
310
            modified = self._get_statistics(
311
                node)[2]  # Overall last modification.
312
            modified = max(modified, mtime)
313

    
314
        if user != account:
315
            meta = {'name': account}
316
        else:
317
            meta = {}
318
            if props is not None and include_user_defined:
319
                meta.update(
320
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
321
            if until is not None:
322
                meta.update({'until_timestamp': tstamp})
323
            meta.update({'name': account, 'count': count, 'bytes': bytes})
324
            if self.using_external_quotaholder:
325
                external_quota = external_quota or {}
326
                meta['bytes'] = external_quota.get('usage', 0)
327
        meta.update({'modified': modified})
328
        return meta
329

    
330
    @backend_method
331
    def update_account_meta(self, user, account, domain, meta, replace=False):
332
        """Update the metadata associated with the account for the domain."""
333

    
334
        logger.debug("update_account_meta: %s %s %s %s %s", user,
335
                     account, domain, meta, replace)
336
        if user != account:
337
            raise NotAllowedError
338
        path, node = self._lookup_account(account, True)
339
        self._put_metadata(user, node, domain, meta, replace)
340

    
341
    @backend_method
342
    def get_account_groups(self, user, account):
343
        """Return a dictionary with the user groups defined for this account."""
344

    
345
        logger.debug("get_account_groups: %s %s", user, account)
346
        if user != account:
347
            if account not in self._allowed_accounts(user):
348
                raise NotAllowedError
349
            return {}
350
        self._lookup_account(account, True)
351
        return self.permissions.group_dict(account)
352

    
353
    @backend_method
354
    def update_account_groups(self, user, account, groups, replace=False):
355
        """Update the groups associated with the account."""
356

    
357
        logger.debug("update_account_groups: %s %s %s %s", user,
358
                     account, groups, replace)
359
        if user != account:
360
            raise NotAllowedError
361
        self._lookup_account(account, True)
362
        self._check_groups(groups)
363
        if replace:
364
            self.permissions.group_destroy(account)
365
        for k, v in groups.iteritems():
366
            if not replace:  # If not already deleted.
367
                self.permissions.group_delete(account, k)
368
            if v:
369
                self.permissions.group_addmany(account, k, v)
370

    
371
    @backend_method
372
    def get_account_policy(self, user, account, external_quota=None):
373
        """Return a dictionary with the account policy."""
374

    
375
        logger.debug("get_account_policy: %s %s", user, account)
376
        if user != account:
377
            if account not in self._allowed_accounts(user):
378
                raise NotAllowedError
379
            return {}
380
        path, node = self._lookup_account(account, True)
381
        policy = self._get_policy(node, is_account_policy=True)
382
        if self.using_external_quotaholder:
383
            external_quota = external_quota or {}
384
            policy['quota'] = external_quota.get('limit', 0)
385
        return policy
386

    
387
    @backend_method
388
    def update_account_policy(self, user, account, policy, replace=False):
389
        """Update the policy associated with the account."""
390

    
391
        logger.debug("update_account_policy: %s %s %s %s", user,
392
                     account, policy, replace)
393
        if user != account:
394
            raise NotAllowedError
395
        path, node = self._lookup_account(account, True)
396
        self._check_policy(policy, is_account_policy=True)
397
        self._put_policy(node, policy, replace, is_account_policy=True)
398

    
399
    @backend_method
400
    def put_account(self, user, account, policy=None):
401
        """Create a new account with the given name."""
402

    
403
        logger.debug("put_account: %s %s %s", user, account, policy)
404
        policy = policy or {}
405
        if user != account:
406
            raise NotAllowedError
407
        node = self.node.node_lookup(account)
408
        if node is not None:
409
            raise AccountExists('Account already exists')
410
        if policy:
411
            self._check_policy(policy, is_account_policy=True)
412
        node = self._put_path(user, self.ROOTNODE, account)
413
        self._put_policy(node, policy, True, is_account_policy=True)
414

    
415
    @backend_method
416
    def delete_account(self, user, account):
417
        """Delete the account with the given name."""
418

    
419
        logger.debug("delete_account: %s %s", user, account)
420
        if user != account:
421
            raise NotAllowedError
422
        node = self.node.node_lookup(account)
423
        if node is None:
424
            return
425
        if not self.node.node_remove(node):
426
            raise AccountNotEmpty('Account is not empty')
427
        self.permissions.group_destroy(account)
428

    
429
    @backend_method
430
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
431
        """Return a list of containers existing under an account."""
432

    
433
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
434
                     account, marker, limit, shared, until, public)
435
        if user != account:
436
            if until or account not in self._allowed_accounts(user):
437
                raise NotAllowedError
438
            allowed = self._allowed_containers(user, account)
439
            start, limit = self._list_limits(allowed, marker, limit)
440
            return allowed[start:start + limit]
441
        if shared or public:
442
            allowed = set()
443
            if shared:
444
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
445
            if public:
446
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
447
            allowed = sorted(allowed)
448
            start, limit = self._list_limits(allowed, marker, limit)
449
            return allowed[start:start + limit]
450
        node = self.node.node_lookup(account)
451
        containers = [x[0] for x in self._list_object_properties(
452
            node, account, '', '/', marker, limit, False, None, [], until)]
453
        start, limit = self._list_limits(
454
            [x[0] for x in containers], marker, limit)
455
        return containers[start:start + limit]
456

    
457
    @backend_method
458
    def list_container_meta(self, user, account, container, domain, until=None):
459
        """Return a list with all the container's object meta keys for the domain."""
460

    
461
        logger.debug("list_container_meta: %s %s %s %s %s", user,
462
                     account, container, domain, until)
463
        allowed = []
464
        if user != account:
465
            if until:
466
                raise NotAllowedError
467
            allowed = self.permissions.access_list_paths(
468
                user, '/'.join((account, container)))
469
            if not allowed:
470
                raise NotAllowedError
471
        path, node = self._lookup_container(account, container)
472
        before = until if until is not None else inf
473
        allowed = self._get_formatted_paths(allowed)
474
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
475

    
476
    @backend_method
477
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
478
        """Return a dictionary with the container metadata for the domain."""
479

    
480
        logger.debug("get_container_meta: %s %s %s %s %s", user,
481
                     account, container, domain, until)
482
        if user != account:
483
            if until or container not in self._allowed_containers(user, account):
484
                raise NotAllowedError
485
        path, node = self._lookup_container(account, container)
486
        props = self._get_properties(node, until)
487
        mtime = props[self.MTIME]
488
        count, bytes, tstamp = self._get_statistics(node, until)
489
        tstamp = max(tstamp, mtime)
490
        if until is None:
491
            modified = tstamp
492
        else:
493
            modified = self._get_statistics(
494
                node)[2]  # Overall last modification.
495
            modified = max(modified, mtime)
496

    
497
        if user != account:
498
            meta = {'name': container}
499
        else:
500
            meta = {}
501
            if include_user_defined:
502
                meta.update(
503
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
504
            if until is not None:
505
                meta.update({'until_timestamp': tstamp})
506
            meta.update({'name': container, 'count': count, 'bytes': bytes})
507
        meta.update({'modified': modified})
508
        return meta
509

    
510
    @backend_method
511
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
512
        """Update the metadata associated with the container for the domain."""
513

    
514
        logger.debug("update_container_meta: %s %s %s %s %s %s",
515
                     user, account, container, domain, meta, replace)
516
        if user != account:
517
            raise NotAllowedError
518
        path, node = self._lookup_container(account, container)
519
        src_version_id, dest_version_id = self._put_metadata(
520
            user, node, domain, meta, replace)
521
        if src_version_id is not None:
522
            versioning = self._get_policy(
523
                node, is_account_policy=False)['versioning']
524
            if versioning != 'auto':
525
                self.node.version_remove(src_version_id)
526

    
527
    @backend_method
528
    def get_container_policy(self, user, account, container):
529
        """Return a dictionary with the container policy."""
530

    
531
        logger.debug(
532
            "get_container_policy: %s %s %s", user, account, container)
533
        if user != account:
534
            if container not in self._allowed_containers(user, account):
535
                raise NotAllowedError
536
            return {}
537
        path, node = self._lookup_container(account, container)
538
        return self._get_policy(node, is_account_policy=False)
539

    
540
    @backend_method
541
    def update_container_policy(self, user, account, container, policy, replace=False):
542
        """Update the policy associated with the container."""
543

    
544
        logger.debug("update_container_policy: %s %s %s %s %s",
545
                     user, account, container, policy, replace)
546
        if user != account:
547
            raise NotAllowedError
548
        path, node = self._lookup_container(account, container)
549
        self._check_policy(policy, is_account_policy=False)
550
        self._put_policy(node, policy, replace, is_account_policy=False)
551

    
552
    @backend_method
553
    def put_container(self, user, account, container, policy=None):
554
        """Create a new container with the given name."""
555

    
556
        logger.debug(
557
            "put_container: %s %s %s %s", user, account, container, policy)
558
        policy = policy or {}
559
        if user != account:
560
            raise NotAllowedError
561
        try:
562
            path, node = self._lookup_container(account, container)
563
        except NameError:
564
            pass
565
        else:
566
            raise ContainerExists('Container already exists')
567
        if policy:
568
            self._check_policy(policy, is_account_policy=False)
569
        path = '/'.join((account, container))
570
        node = self._put_path(
571
            user, self._lookup_account(account, True)[1], path)
572
        self._put_policy(node, policy, True, is_account_policy=False)
573

    
574
    @backend_method
575
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
576
        """Delete/purge the container with the given name."""
577

    
578
        logger.debug("delete_container: %s %s %s %s %s %s", user,
579
                     account, container, until, prefix, delimiter)
580
        if user != account:
581
            raise NotAllowedError
582
        path, node = self._lookup_container(account, container)
583

    
584
        if until is not None:
585
            hashes, size, serials = self.node.node_purge_children(
586
                node, until, CLUSTER_HISTORY)
587
            for h in hashes:
588
                self.store.map_delete(h)
589
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
590
            if not self.free_versioning:
591
                self._report_size_change(
592
                    user, account, -size, {
593
                        'action':'container purge',
594
                        'path': path,
595
                        'versions': ','.join(str(i) for i in serials)
596
                    }
597
                )
598
            return
599

    
600
        if not delimiter:
601
            if self._get_statistics(node)[0] > 0:
602
                raise ContainerNotEmpty('Container is not empty')
603
            hashes, size, serials = self.node.node_purge_children(
604
                node, inf, CLUSTER_HISTORY)
605
            for h in hashes:
606
                self.store.map_delete(h)
607
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
608
            self.node.node_remove(node)
609
            if not self.free_versioning:
610
                self._report_size_change(
611
                    user, account, -size, {
612
                        'action':'container purge',
613
                        'path': path,
614
                        'versions': ','.join(str(i) for i in serials)
615
                    }
616
                )
617
        else:
618
            # remove only contents
619
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
620
            paths = []
621
            for t in src_names:
622
                path = '/'.join((account, container, t[0]))
623
                node = t[2]
624
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
625
                del_size = self._apply_versioning(
626
                    account, container, src_version_id)
627
                self._report_size_change(
628
                        user, account, -del_size, {
629
                                'action': 'object delete',
630
                                'path': path,
631
                        'versions': ','.join([str(dest_version_id)])
632
                     }
633
                )
634
                self._report_object_change(
635
                    user, account, path, details={'action': 'object delete'})
636
                paths.append(path)
637
            self.permissions.access_clear_bulk(paths)
638

    
639
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
640
        if user != account and until:
641
            raise NotAllowedError
642
        if shared and public:
643
            # get shared first
644
            shared_paths = self._list_object_permissions(
645
                user, account, container, prefix, shared=True, public=False)
646
            objects = set()
647
            if shared_paths:
648
                path, node = self._lookup_container(account, container)
649
                shared_paths = self._get_formatted_paths(shared_paths)
650
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared_paths, all_props))
651

    
652
            # get public
653
            objects |= set(self._list_public_object_properties(
654
                user, account, container, prefix, all_props))
655
            objects = list(objects)
656

    
657
            objects.sort(key=lambda x: x[0])
658
            start, limit = self._list_limits(
659
                [x[0] for x in objects], marker, limit)
660
            return objects[start:start + limit]
661
        elif public:
662
            objects = self._list_public_object_properties(
663
                user, account, container, prefix, all_props)
664
            start, limit = self._list_limits(
665
                [x[0] for x in objects], marker, limit)
666
            return objects[start:start + limit]
667

    
668
        allowed = self._list_object_permissions(
669
            user, account, container, prefix, shared, public)
670
        if shared and not allowed:
671
            return []
672
        path, node = self._lookup_container(account, container)
673
        allowed = self._get_formatted_paths(allowed)
674
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
675
        start, limit = self._list_limits(
676
            [x[0] for x in objects], marker, limit)
677
        return objects[start:start + limit]
678

    
679
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
680
        public = self._list_object_permissions(
681
            user, account, container, prefix, shared=False, public=True)
682
        paths, nodes = self._lookup_objects(public)
683
        path = '/'.join((account, container))
684
        cont_prefix = path + '/'
685
        paths = [x[len(cont_prefix):] for x in paths]
686
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
687
        objects = [(path,) + props for path, props in zip(paths, props)]
688
        return objects
689

    
690
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
691
        objects = []
692
        while True:
693
            marker = objects[-1] if objects else None
694
            limit = 10000
695
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
696
            objects.extend(l)
697
            if not l or len(l) < limit:
698
                break
699
        return objects
700

    
701
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
702
        allowed = []
703
        path = '/'.join((account, container, prefix)).rstrip('/')
704
        if user != account:
705
            allowed = self.permissions.access_list_paths(user, path)
706
            if not allowed:
707
                raise NotAllowedError
708
        else:
709
            allowed = set()
710
            if shared:
711
                allowed.update(self.permissions.access_list_shared(path))
712
            if public:
713
                allowed.update(
714
                    [x[0] for x in self.permissions.public_list(path)])
715
            allowed = sorted(allowed)
716
            if not allowed:
717
                return []
718
        return allowed
719

    
720
    @backend_method
721
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
722
        """Return a list of object (name, version_id) tuples existing under a container."""
723

    
724
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
725
        keys = keys or []
726
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
727

    
728
    @backend_method
729
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, shared=False, until=None, size_range=None, public=False):
730
        """Return a list of object metadata dicts existing under a container."""
731

    
732
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
733
        keys = keys or []
734
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
735
        objects = []
736
        for p in props:
737
            if len(p) == 2:
738
                objects.append({'subdir': p[0]})
739
            else:
740
                objects.append({'name': p[0],
741
                                'bytes': p[self.SIZE + 1],
742
                                'type': p[self.TYPE + 1],
743
                                'hash': p[self.HASH + 1],
744
                                'version': p[self.SERIAL + 1],
745
                                'version_timestamp': p[self.MTIME + 1],
746
                                'modified': p[self.MTIME + 1] if until is None else None,
747
                                'modified_by': p[self.MUSER + 1],
748
                                'uuid': p[self.UUID + 1],
749
                                'checksum': p[self.CHECKSUM + 1]})
750
        return objects
751

    
752
    @backend_method
753
    def list_object_permissions(self, user, account, container, prefix=''):
754
        """Return a list of paths that enforce permissions under a container."""
755

    
756
        logger.debug("list_object_permissions: %s %s %s %s", user,
757
                     account, container, prefix)
758
        return self._list_object_permissions(user, account, container, prefix, True, False)
759

    
760
    @backend_method
761
    def list_object_public(self, user, account, container, prefix=''):
762
        """Return a dict mapping paths to public ids for objects that are public under a container."""
763

    
764
        logger.debug("list_object_public: %s %s %s %s", user,
765
                     account, container, prefix)
766
        public = {}
767
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
768
            public[path] = p
769
        return public
770

    
771
    @backend_method
772
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
773
        """Return a dictionary with the object metadata for the domain."""
774

    
775
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
776
                     account, container, name, domain, version)
777
        self._can_read(user, account, container, name)
778
        path, node = self._lookup_object(account, container, name)
779
        props = self._get_version(node, version)
780
        if version is None:
781
            modified = props[self.MTIME]
782
        else:
783
            try:
784
                modified = self._get_version(
785
                    node)[self.MTIME]  # Overall last modification.
786
            except NameError:  # Object may be deleted.
787
                del_props = self.node.version_lookup(
788
                    node, inf, CLUSTER_DELETED)
789
                if del_props is None:
790
                    raise ItemNotExists('Object does not exist')
791
                modified = del_props[self.MTIME]
792

    
793
        meta = {}
794
        if include_user_defined:
795
            meta.update(
796
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
797
        meta.update({'name': name,
798
                     'bytes': props[self.SIZE],
799
                     'type': props[self.TYPE],
800
                     'hash': props[self.HASH],
801
                     'version': props[self.SERIAL],
802
                     'version_timestamp': props[self.MTIME],
803
                     'modified': modified,
804
                     'modified_by': props[self.MUSER],
805
                     'uuid': props[self.UUID],
806
                     'checksum': props[self.CHECKSUM]})
807
        return meta
808

    
809
    @backend_method
810
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
811
        """Update the metadata associated with the object for the domain and return the new version."""
812

    
813
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
814
                     user, account, container, name, domain, meta, replace)
815
        self._can_write(user, account, container, name)
816
        path, node = self._lookup_object(account, container, name)
817
        src_version_id, dest_version_id = self._put_metadata(
818
            user, node, domain, meta, replace)
819
        self._apply_versioning(account, container, src_version_id)
820
        return dest_version_id
821

    
822
    @backend_method
823
    def get_object_permissions(self, user, account, container, name):
824
        """Return the action allowed on the object, the path
825
        from which the object gets its permissions from,
826
        along with a dictionary containing the permissions."""
827

    
828
        logger.debug("get_object_permissions: %s %s %s %s", user,
829
                     account, container, name)
830
        allowed = 'write'
831
        permissions_path = self._get_permissions_path(account, container, name)
832
        if user != account:
833
            if self.permissions.access_check(permissions_path, self.WRITE, user):
834
                allowed = 'write'
835
            elif self.permissions.access_check(permissions_path, self.READ, user):
836
                allowed = 'read'
837
            else:
838
                raise NotAllowedError
839
        self._lookup_object(account, container, name)
840
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
841

    
842
    @backend_method
843
    def update_object_permissions(self, user, account, container, name, permissions):
844
        """Update the permissions associated with the object."""
845

    
846
        logger.debug("update_object_permissions: %s %s %s %s %s",
847
                     user, account, container, name, permissions)
848
        if user != account:
849
            raise NotAllowedError
850
        path = self._lookup_object(account, container, name)[0]
851
        self._check_permissions(path, permissions)
852
        self.permissions.access_set(path, permissions)
853
        self._report_sharing_change(user, account, path, {'members':
854
                                    self.permissions.access_members(path)})
855

    
856
    @backend_method
857
    def get_object_public(self, user, account, container, name):
858
        """Return the public id of the object if applicable."""
859

    
860
        logger.debug(
861
            "get_object_public: %s %s %s %s", user, account, container, name)
862
        self._can_read(user, account, container, name)
863
        path = self._lookup_object(account, container, name)[0]
864
        p = self.permissions.public_get(path)
865
        return p
866

    
867
    @backend_method
868
    def update_object_public(self, user, account, container, name, public):
869
        """Update the public status of the object."""
870

    
871
        logger.debug("update_object_public: %s %s %s %s %s", user,
872
                     account, container, name, public)
873
        self._can_write(user, account, container, name)
874
        path = self._lookup_object(account, container, name)[0]
875
        if not public:
876
            self.permissions.public_unset(path)
877
        else:
878
            self.permissions.public_set(
879
                path, self.public_url_security, self.public_url_alphabet
880
            )
881

    
882
    @backend_method
883
    def get_object_hashmap(self, user, account, container, name, version=None):
884
        """Return the object's size and a list with partial hashes."""
885

    
886
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
887
                     account, container, name, version)
888
        self._can_read(user, account, container, name)
889
        path, node = self._lookup_object(account, container, name)
890
        props = self._get_version(node, version)
891
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
892
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
893

    
894
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
895
        if permissions is not None and user != account:
896
            raise NotAllowedError
897
        self._can_write(user, account, container, name)
898
        if permissions is not None:
899
            path = '/'.join((account, container, name))
900
            self._check_permissions(path, permissions)
901

    
902
        account_path, account_node = self._lookup_account(account, True)
903
        container_path, container_node = self._lookup_container(
904
            account, container)
905

    
906
        path, node = self._put_object_node(
907
            container_path, container_node, name)
908
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
909

    
910
        # Handle meta.
911
        if src_version_id is None:
912
            src_version_id = pre_version_id
913
        self._put_metadata_duplicate(
914
            src_version_id, dest_version_id, domain, meta, replace_meta)
915

    
916
        del_size = self._apply_versioning(account, container, pre_version_id)
917
        size_delta = size - del_size
918
        if size_delta > 0:
919
            # Check account quota.
920
            if not self.using_external_quotaholder:
921
                account_quota = long(
922
                    self._get_policy(account_node, is_account_policy=True
923
                    )['quota']
924
                )
925
                account_usage = self._get_statistics(account_node)[1]
926
                if (account_quota > 0 and account_usage > account_quota):
927
                    raise QuotaError(
928
                        'Account quota exceeded: limit: %s, usage: %s' % (
929
                            account_quota, account_usage
930
                        )
931
                    )
932

    
933
            # Check container quota.
934
            container_quota = long(
935
                self._get_policy(container_node, is_account_policy=False
936
                )['quota']
937
            )
938
            container_usage = self._get_statistics(container_node)[1]
939
            if (container_quota > 0 and container_usage > container_quota):
940
                # This must be executed in a transaction, so the version is
941
                # never created if it fails.
942
                raise QuotaError(
943
                    'Container quota exceeded: limit: %s, usage: %s' % (
944
                        container_quota, container_usage
945
                    )
946
                )
947

    
948
        self._report_size_change(user, account, size_delta,
949
                                 {'action': 'object update', 'path': path,
950
                                  'versions': ','.join([str(dest_version_id)])})
951
        if permissions is not None:
952
            self.permissions.access_set(path, permissions)
953
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
954

    
955
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
956
        return dest_version_id
957

    
958
    @backend_method
959
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta=None, replace_meta=False, permissions=None):
960
        """Create/update an object with the specified size and partial hashes."""
961

    
962
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
963
                     account, container, name, size, type, hashmap, checksum)
964
        meta = meta or {}
965
        if size == 0:  # No such thing as an empty hashmap.
966
            hashmap = [self.put_block('')]
967
        map = HashMap(self.block_size, self.hash_algorithm)
968
        map.extend([binascii.unhexlify(x) for x in hashmap])
969
        missing = self.store.block_search(map)
970
        if missing:
971
            ie = IndexError()
972
            ie.data = [binascii.hexlify(x) for x in missing]
973
            raise ie
974

    
975
        hash = map.hash()
976
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
977
        self.store.map_put(hash, map)
978
        return dest_version_id
979

    
980
    @backend_method
981
    def update_object_checksum(self, user, account, container, name, version, checksum):
982
        """Update an object's checksum."""
983

    
984
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
985
                     user, account, container, name, version, checksum)
986
        # Update objects with greater version and same hashmap and size (fix metadata updates).
987
        self._can_write(user, account, container, name)
988
        path, node = self._lookup_object(account, container, name)
989
        props = self._get_version(node, version)
990
        versions = self.node.node_get_versions(node)
991
        for x in versions:
992
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
993
                self.node.version_put_property(
994
                    x[self.SERIAL], 'checksum', checksum)
995

    
996
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta=None, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
997
        dest_meta = dest_meta or {}
998
        dest_version_ids = []
999
        self._can_read(user, src_account, src_container, src_name)
1000
        path, node = self._lookup_object(src_account, src_container, src_name)
1001
        # TODO: Will do another fetch of the properties in duplicate version...
1002
        props = self._get_version(
1003
            node, src_version)  # Check to see if source exists.
1004
        src_version_id = props[self.SERIAL]
1005
        hash = props[self.HASH]
1006
        size = props[self.SIZE]
1007
        is_copy = not is_move and (src_account, src_container, src_name) != (
1008
            dest_account, dest_container, dest_name)  # New uuid.
1009
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1010
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1011
            self._delete_object(user, src_account, src_container, src_name)
1012

    
1013
        if delimiter:
1014
            prefix = src_name + \
1015
                delimiter if not src_name.endswith(delimiter) else src_name
1016
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1017
            src_names.sort(key=lambda x: x[2])  # order by nodes
1018
            paths = [elem[0] for elem in src_names]
1019
            nodes = [elem[2] for elem in src_names]
1020
            # TODO: Will do another fetch of the properties in duplicate version...
1021
            props = self._get_versions(nodes)  # Check to see if source exists.
1022

    
1023
            for prop, path, node in zip(props, paths, nodes):
1024
                src_version_id = prop[self.SERIAL]
1025
                hash = prop[self.HASH]
1026
                vtype = prop[self.TYPE]
1027
                size = prop[self.SIZE]
1028
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1029
                    delimiter) else dest_name
1030
                vdest_name = path.replace(prefix, dest_prefix, 1)
1031
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
1032
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
1033
                    self._delete_object(user, src_account, src_container, path)
1034
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
1035

    
1036
    @backend_method
1037
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, src_version=None, delimiter=None):
1038
        """Copy an object's data and metadata."""
1039

    
1040
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
1041
        meta = meta or {}
1042
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
1043
        return dest_version_id
1044

    
1045
    @backend_method
1046
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta=None, replace_meta=False, permissions=None, delimiter=None):
1047
        """Move an object's data and metadata."""
1048

    
1049
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
1050
        meta = meta or {}
1051
        if user != src_account:
1052
            raise NotAllowedError
1053
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
1054
        return dest_version_id
1055

    
1056
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
1057
        if user != account:
1058
            raise NotAllowedError
1059

    
1060
        if until is not None:
1061
            path = '/'.join((account, container, name))
1062
            node = self.node.node_lookup(path)
1063
            if node is None:
1064
                return
1065
            hashes = []
1066
            size = 0
1067
            serials = []
1068
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
1069
            hashes += h
1070
            size += s
1071
            serials += v
1072
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
1073
            hashes += h
1074
            if not self.free_versioning:
1075
                size += s
1076
            serials += v
1077
            for h in hashes:
1078
                self.store.map_delete(h)
1079
            self.node.node_purge(node, until, CLUSTER_DELETED)
1080
            try:
1081
                props = self._get_version(node)
1082
            except NameError:
1083
                self.permissions.access_clear(path)
1084
            self._report_size_change(
1085
                user, account, -size, {
1086
                    'action': 'object purge',
1087
                    'path': path,
1088
                    'versions': ','.join(str(i) for i in serials)
1089
                }
1090
            )
1091
            return
1092

    
1093
        path, node = self._lookup_object(account, container, name)
1094
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1095
        del_size = self._apply_versioning(account, container, src_version_id)
1096
        self._report_size_change(user, account, -del_size,
1097
                                 {'action': 'object delete', 'path': path,
1098
                                  'versions': ','.join([str(dest_version_id)])})
1099
        self._report_object_change(
1100
            user, account, path, details={'action': 'object delete'})
1101
        self.permissions.access_clear(path)
1102

    
1103
        if delimiter:
1104
            prefix = name + delimiter if not name.endswith(delimiter) else name
1105
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
1106
            paths = []
1107
            for t in src_names:
1108
                path = '/'.join((account, container, t[0]))
1109
                node = t[2]
1110
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
1111
                del_size = self._apply_versioning(
1112
                    account, container, src_version_id)
1113
                self._report_size_change(user, account, -del_size,
1114
                                         {'action': 'object delete',
1115
                                          'path': path,
1116
                                          'versions': ','.join([str(dest_version_id)])})
1117
                self._report_object_change(
1118
                    user, account, path, details={'action': 'object delete'})
1119
                paths.append(path)
1120
            self.permissions.access_clear_bulk(paths)
1121

    
1122
    @backend_method
1123
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
1124
        """Delete/purge an object."""
1125

    
1126
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1127
                     account, container, name, until, prefix, delimiter)
1128
        self._delete_object(user, account, container, name, until, delimiter)
1129

    
1130
    @backend_method
1131
    def list_versions(self, user, account, container, name):
1132
        """Return a list of all (version, version_timestamp) tuples for an object."""
1133

    
1134
        logger.debug(
1135
            "list_versions: %s %s %s %s", user, account, container, name)
1136
        self._can_read(user, account, container, name)
1137
        path, node = self._lookup_object(account, container, name)
1138
        versions = self.node.node_get_versions(node)
1139
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1140

    
1141
    @backend_method
1142
    def get_uuid(self, user, uuid):
1143
        """Return the (account, container, name) for the UUID given."""
1144

    
1145
        logger.debug("get_uuid: %s %s", user, uuid)
1146
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1147
        if info is None:
1148
            raise NameError
1149
        path, serial = info
1150
        account, container, name = path.split('/', 2)
1151
        self._can_read(user, account, container, name)
1152
        return (account, container, name)
1153

    
1154
    @backend_method
1155
    def get_public(self, user, public):
1156
        """Return the (account, container, name) for the public id given."""
1157

    
1158
        logger.debug("get_public: %s %s", user, public)
1159
        path = self.permissions.public_path(public)
1160
        if path is None:
1161
            raise NameError
1162
        account, container, name = path.split('/', 2)
1163
        self._can_read(user, account, container, name)
1164
        return (account, container, name)
1165

    
1166
    @backend_method(autocommit=0)
1167
    def get_block(self, hash):
1168
        """Return a block's data."""
1169

    
1170
        logger.debug("get_block: %s", hash)
1171
        block = self.store.block_get(binascii.unhexlify(hash))
1172
        if not block:
1173
            raise ItemNotExists('Block does not exist')
1174
        return block
1175

    
1176
    @backend_method(autocommit=0)
1177
    def put_block(self, data):
1178
        """Store a block and return the hash."""
1179

    
1180
        logger.debug("put_block: %s", len(data))
1181
        return binascii.hexlify(self.store.block_put(data))
1182

    
1183
    @backend_method(autocommit=0)
1184
    def update_block(self, hash, data, offset=0):
1185
        """Update a known block and return the hash."""
1186

    
1187
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1188
        if offset == 0 and len(data) == self.block_size:
1189
            return self.put_block(data)
1190
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1191
        return binascii.hexlify(h)
1192

    
1193
    # Path functions.
1194

    
1195
    def _generate_uuid(self):
1196
        return str(uuidlib.uuid4())
1197

    
1198
    def _put_object_node(self, path, parent, name):
1199
        path = '/'.join((path, name))
1200
        node = self.node.node_lookup(path)
1201
        if node is None:
1202
            node = self.node.node_create(parent, path)
1203
        return path, node
1204

    
1205
    def _put_path(self, user, parent, path):
1206
        node = self.node.node_create(parent, path)
1207
        self.node.version_create(node, None, 0, '', None, user,
1208
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1209
        return node
1210

    
1211
    def _lookup_account(self, account, create=True):
1212
        node = self.node.node_lookup(account)
1213
        if node is None and create:
1214
            node = self._put_path(
1215
                account, self.ROOTNODE, account)  # User is account.
1216
        return account, node
1217

    
1218
    def _lookup_container(self, account, container):
1219
        path = '/'.join((account, container))
1220
        node = self.node.node_lookup(path)
1221
        if node is None:
1222
            raise ItemNotExists('Container does not exist')
1223
        return path, node
1224

    
1225
    def _lookup_object(self, account, container, name):
1226
        path = '/'.join((account, container, name))
1227
        node = self.node.node_lookup(path)
1228
        if node is None:
1229
            raise ItemNotExists('Object does not exist')
1230
        return path, node
1231

    
1232
    def _lookup_objects(self, paths):
1233
        nodes = self.node.node_lookup_bulk(paths)
1234
        return paths, nodes
1235

    
1236
    def _get_properties(self, node, until=None):
1237
        """Return properties until the timestamp given."""
1238

    
1239
        before = until if until is not None else inf
1240
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1241
        if props is None and until is not None:
1242
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1243
        if props is None:
1244
            raise ItemNotExists('Path does not exist')
1245
        return props
1246

    
1247
    def _get_statistics(self, node, until=None):
1248
        """Return count, sum of size and latest timestamp of everything under node."""
1249

    
1250
        if until is None:
1251
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1252
        else:
1253
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1254
        if stats is None:
1255
            stats = (0, 0, 0)
1256
        return stats
1257

    
1258
    def _get_version(self, node, version=None):
1259
        if version is None:
1260
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1261
            if props is None:
1262
                raise ItemNotExists('Object does not exist')
1263
        else:
1264
            try:
1265
                version = int(version)
1266
            except ValueError:
1267
                raise VersionNotExists('Version does not exist')
1268
            props = self.node.version_get_properties(version)
1269
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1270
                raise VersionNotExists('Version does not exist')
1271
        return props
1272

    
1273
    def _get_versions(self, nodes):
1274
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1275

    
1276
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1277
        """Create a new version of the node."""
1278

    
1279
        props = self.node.version_lookup(
1280
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1281
        if props is not None:
1282
            src_version_id = props[self.SERIAL]
1283
            src_hash = props[self.HASH]
1284
            src_size = props[self.SIZE]
1285
            src_type = props[self.TYPE]
1286
            src_checksum = props[self.CHECKSUM]
1287
        else:
1288
            src_version_id = None
1289
            src_hash = None
1290
            src_size = 0
1291
            src_type = ''
1292
            src_checksum = ''
1293
        if size is None:  # Set metadata.
1294
            hash = src_hash  # This way hash can be set to None (account or container).
1295
            size = src_size
1296
        if type is None:
1297
            type = src_type
1298
        if checksum is None:
1299
            checksum = src_checksum
1300
        uuid = self._generate_uuid(
1301
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1302

    
1303
        if src_node is None:
1304
            pre_version_id = src_version_id
1305
        else:
1306
            pre_version_id = None
1307
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1308
            if props is not None:
1309
                pre_version_id = props[self.SERIAL]
1310
        if pre_version_id is not None:
1311
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1312

    
1313
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1314
        return pre_version_id, dest_version_id
1315

    
1316
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1317
        if src_version_id is not None:
1318
            self.node.attribute_copy(src_version_id, dest_version_id)
1319
        if not replace:
1320
            self.node.attribute_del(dest_version_id, domain, (
1321
                k for k, v in meta.iteritems() if v == ''))
1322
            self.node.attribute_set(dest_version_id, domain, (
1323
                (k, v) for k, v in meta.iteritems() if v != ''))
1324
        else:
1325
            self.node.attribute_del(dest_version_id, domain)
1326
            self.node.attribute_set(dest_version_id, domain, ((
1327
                k, v) for k, v in meta.iteritems()))
1328

    
1329
    def _put_metadata(self, user, node, domain, meta, replace=False):
1330
        """Create a new version and store metadata."""
1331

    
1332
        src_version_id, dest_version_id = self._put_version_duplicate(
1333
            user, node)
1334
        self._put_metadata_duplicate(
1335
            src_version_id, dest_version_id, domain, meta, replace)
1336
        return src_version_id, dest_version_id
1337

    
1338
    def _list_limits(self, listing, marker, limit):
1339
        start = 0
1340
        if marker:
1341
            try:
1342
                start = listing.index(marker) + 1
1343
            except ValueError:
1344
                pass
1345
        if not limit or limit > 10000:
1346
            limit = 10000
1347
        return start, limit
1348

    
1349
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=None, until=None, size_range=None, allowed=None, all_props=False):
1350
        keys = keys or []
1351
        allowed = allowed or []
1352
        cont_prefix = path + '/'
1353
        prefix = cont_prefix + prefix
1354
        start = cont_prefix + marker if marker else None
1355
        before = until if until is not None else inf
1356
        filterq = keys if domain else []
1357
        sizeq = size_range
1358

    
1359
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1360
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1361
        objects.sort(key=lambda x: x[0])
1362
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1363
        return objects
1364

    
1365
    # Reporting functions.
1366

    
1367
    def _report_size_change(self, user, account, size, details=None):
1368
        details = details or {}
1369

    
1370
        if size == 0:
1371
            return
1372

    
1373
        account_node = self._lookup_account(account, True)[1]
1374
        total = self._get_statistics(account_node)[1]
1375
        details.update({'user': user, 'total': total})
1376
        logger.debug(
1377
            "_report_size_change: %s %s %s %s", user, account, size, details)
1378
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1379
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1380
                              float(size), details))
1381

    
1382
        if not self.using_external_quotaholder:
1383
            return
1384

    
1385
        try:
1386
            name = details['path'] if 'path' in details else ''
1387
            serial = self.astakosclient.issue_one_commission(
1388
                token=self.service_token,
1389
                holder=account,
1390
                source=DEFAULT_SOURCE,
1391
                provisions={'pithos.diskspace': size},
1392
                name=name
1393
                )
1394
        except BaseException, e:
1395
            raise QuotaError(e)
1396
        else:
1397
            self.serials.append(serial)
1398

    
1399
    def _report_object_change(self, user, account, path, details=None):
1400
        details = details or {}
1401
        details.update({'user': user})
1402
        logger.debug("_report_object_change: %s %s %s %s", user,
1403
                     account, path, details)
1404
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1405
                              account, QUEUE_INSTANCE_ID, 'object', path, details))
1406

    
1407
    def _report_sharing_change(self, user, account, path, details=None):
1408
        logger.debug("_report_permissions_change: %s %s %s %s",
1409
                     user, account, path, details)
1410
        details = details or {}
1411
        details.update({'user': user})
1412
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1413
                              account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1414

    
1415
    # Policy functions.
1416

    
1417
    def _check_policy(self, policy, is_account_policy=True):
1418
        default_policy = self.default_account_policy \
1419
            if is_account_policy else self.default_container_policy
1420
        for k in policy.keys():
1421
            if policy[k] == '':
1422
                policy[k] = default_policy.get(k)
1423
        for k, v in policy.iteritems():
1424
            if k == 'quota':
1425
                q = int(v)  # May raise ValueError.
1426
                if q < 0:
1427
                    raise ValueError
1428
            elif k == 'versioning':
1429
                if v not in ['auto', 'none']:
1430
                    raise ValueError
1431
            else:
1432
                raise ValueError
1433

    
1434
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1435
        default_policy = self.default_account_policy \
1436
            if is_account_policy else self.default_container_policy
1437
        if replace:
1438
            for k, v in default_policy.iteritems():
1439
                if k not in policy:
1440
                    policy[k] = v
1441
        self.node.policy_set(node, policy)
1442

    
1443
    def _get_policy(self, node, is_account_policy=True):
1444
        default_policy = self.default_account_policy \
1445
            if is_account_policy else self.default_container_policy
1446
        policy = default_policy.copy()
1447
        policy.update(self.node.policy_get(node))
1448
        return policy
1449

    
1450
    def _apply_versioning(self, account, container, version_id):
1451
        """Delete the provided version if such is the policy.
1452
           Return size of object removed.
1453
        """
1454

    
1455
        if version_id is None:
1456
            return 0
1457
        path, node = self._lookup_container(account, container)
1458
        versioning = self._get_policy(
1459
            node, is_account_policy=False)['versioning']
1460
        if versioning != 'auto':
1461
            hash, size = self.node.version_remove(version_id)
1462
            self.store.map_delete(hash)
1463
            return size
1464
        elif self.free_versioning:
1465
            return self.node.version_get_properties(
1466
                version_id, keys=('size',))[0]
1467
        return 0
1468

    
1469
    # Access control functions.
1470

    
1471
    def _check_groups(self, groups):
1472
        # raise ValueError('Bad characters in groups')
1473
        pass
1474

    
1475
    def _check_permissions(self, path, permissions):
1476
        # raise ValueError('Bad characters in permissions')
1477
        pass
1478

    
1479
    def _get_formatted_paths(self, paths):
1480
        formatted = []
1481
        for p in paths:
1482
            node = self.node.node_lookup(p)
1483
            props = None
1484
            if node is not None:
1485
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1486
            if props is not None:
1487
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1488
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1489
                formatted.append((p, self.MATCH_EXACT))
1490
        return formatted
1491

    
1492
    def _get_permissions_path(self, account, container, name):
1493
        path = '/'.join((account, container, name))
1494
        permission_paths = self.permissions.access_inherit(path)
1495
        permission_paths.sort()
1496
        permission_paths.reverse()
1497
        for p in permission_paths:
1498
            if p == path:
1499
                return p
1500
            else:
1501
                if p.count('/') < 2:
1502
                    continue
1503
                node = self.node.node_lookup(p)
1504
                props = None
1505
                if node is not None:
1506
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1507
                if props is not None:
1508
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1509
                        return p
1510
        return None
1511

    
1512
    def _can_read(self, user, account, container, name):
1513
        if user == account:
1514
            return True
1515
        path = '/'.join((account, container, name))
1516
        if self.permissions.public_get(path) is not None:
1517
            return True
1518
        path = self._get_permissions_path(account, container, name)
1519
        if not path:
1520
            raise NotAllowedError
1521
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1522
            raise NotAllowedError
1523

    
1524
    def _can_write(self, user, account, container, name):
1525
        if user == account:
1526
            return True
1527
        path = '/'.join((account, container, name))
1528
        path = self._get_permissions_path(account, container, name)
1529
        if not path:
1530
            raise NotAllowedError
1531
        if not self.permissions.access_check(path, self.WRITE, user):
1532
            raise NotAllowedError
1533

    
1534
    def _allowed_accounts(self, user):
1535
        allow = set()
1536
        for path in self.permissions.access_list_paths(user):
1537
            allow.add(path.split('/', 1)[0])
1538
        return sorted(allow)
1539

    
1540
    def _allowed_containers(self, user, account):
1541
        allow = set()
1542
        for path in self.permissions.access_list_paths(user, account):
1543
            allow.add(path.split('/', 2)[1])
1544
        return sorted(allow)
1545

    
1546
    # Domain functions
1547

    
1548
    @backend_method
1549
    def get_domain_objects(self, domain, user=None):
1550
        obj_list = self.node.domain_object_list(domain, CLUSTER_NORMAL)
1551
        if user != None:
1552
            obj_list = [t for t in obj_list \
1553
                if self._has_read_access(user, t[0])]
1554
        return [(path,
1555
                 self._build_metadata(props, user_defined_meta),
1556
                 self.permissions.access_get(path)) \
1557
            for path, props, user_defined_meta in obj_list]
1558

    
1559
    # util functions
1560

    
1561
    def _build_metadata(self, props, user_defined=None,
1562
                        include_user_defined=True):
1563
        meta = {'bytes': props[self.SIZE],
1564
                'type': props[self.TYPE],
1565
                'hash': props[self.HASH],
1566
                'version': props[self.SERIAL],
1567
                'version_timestamp': props[self.MTIME],
1568
                'modified_by': props[self.MUSER],
1569
                'uuid': props[self.UUID],
1570
                'checksum': props[self.CHECKSUM]}
1571
        if include_user_defined and user_defined != None:
1572
            meta.update(user_defined)
1573
        return meta
1574

    
1575
    def _has_read_access(self, user, path):
1576
        try:
1577
            account, container, object = path.split('/', 2)
1578
        except ValueError:
1579
            raise ValueError('Invalid object path')
1580

    
1581
        assert isinstance(user, basestring), "Invalid user"
1582

    
1583
        try:
1584
            self._can_read(user, account, container, object)
1585
        except NotAllowedError:
1586
            return False
1587
        else:
1588
            return True