Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 16b0ed4a

History | View | Annotate | Download (73.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        def load_module(m):
195
            __import__(m)
196
            return sys.modules[m]
197

    
198
        self.db_module = load_module(db_module)
199
        self.wrapper = self.db_module.DBWrapper(db_connection)
200
        params = {'wrapper': self.wrapper}
201
        self.permissions = self.db_module.Permissions(**params)
202
        self.config = self.db_module.Config(**params)
203
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
204
        for x in ['READ', 'WRITE']:
205
            setattr(self, x, getattr(self.db_module, x))
206
        self.node = self.db_module.Node(**params)
207
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
208
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
209
                  'MATCH_PREFIX', 'MATCH_EXACT']:
210
            setattr(self, x, getattr(self.db_module, x))
211

    
212
        self.ALLOWED =  ['read','write']
213

    
214
        self.block_module = load_module(block_module)
215
        self.block_params = block_params
216
        params = {'path': block_path,
217
                  'block_size': self.block_size,
218
                  'hash_algorithm': self.hash_algorithm,
219
                  'umask': block_umask}
220
        params.update(self.block_params)
221
        self.store = self.block_module.Store(**params)
222

    
223
        if queue_module and queue_hosts:
224
            self.queue_module = load_module(queue_module)
225
            params = {'hosts': queue_hosts,
226
                      'exchange': queue_exchange,
227
                      'client_id': QUEUE_CLIENT_ID}
228
            self.queue = self.queue_module.Queue(**params)
229
        else:
230
            class NoQueue:
231
                def send(self, *args):
232
                    pass
233

    
234
                def close(self):
235
                    pass
236

    
237
            self.queue = NoQueue()
238

    
239
        self.astakos_url = astakos_url
240
        self.service_token = service_token
241

    
242
        if not astakos_url or not AstakosClient:
243
            self.astakosclient = DisabledAstakosClient(
244
                astakos_url,
245
                use_pool=True,
246
                pool_size=astakosclient_poolsize)
247
        else:
248
            self.astakosclient = AstakosClient(
249
                astakos_url,
250
                use_pool=True,
251
                pool_size=astakosclient_poolsize)
252

    
253
        self.serials = []
254
        self.messages = []
255

    
256
        self._move_object = partial(self._copy_object, is_move=True)
257

    
258
        self.lock_container_path = False
259

    
260
    def pre_exec(self, lock_container_path=False):
261
        self.lock_container_path = lock_container_path
262
        self.wrapper.execute()
263

    
264
    def post_exec(self, success_status=True):
265
        if success_status:
266
            # send messages produced
267
            for m in self.messages:
268
                self.queue.send(*m)
269

    
270
            # register serials
271
            if self.serials:
272
                self.commission_serials.insert_many(
273
                    self.serials)
274

    
275
                # commit to ensure that the serials are registered
276
                # even if resolve commission fails
277
                self.wrapper.commit()
278

    
279
                # start new transaction
280
                self.wrapper.execute()
281

    
282
                r = self.astakosclient.resolve_commissions(
283
                    token=self.service_token,
284
                    accept_serials=self.serials,
285
                    reject_serials=[])
286
                self.commission_serials.delete_many(
287
                    r['accepted'])
288

    
289
            self.wrapper.commit()
290
        else:
291
            if self.serials:
292
                self.astakosclient.resolve_commissions(
293
                    token=self.service_token,
294
                    accept_serials=[],
295
                    reject_serials=self.serials)
296
            self.wrapper.rollback()
297

    
298
    def close(self):
299
        self.wrapper.close()
300
        self.queue.close()
301

    
302
    @property
303
    def using_external_quotaholder(self):
304
        return not isinstance(self.astakosclient, DisabledAstakosClient)
305

    
306
    @debug_method
307
    def list_accounts(self, user, marker=None, limit=10000):
308
        """Return a list of accounts the user can access."""
309

    
310
        allowed = self._allowed_accounts(user)
311
        start, limit = self._list_limits(allowed, marker, limit)
312
        return allowed[start:start + limit]
313

    
314
    @debug_method
315
    def get_account_meta(
316
            self, user, account, domain, until=None, include_user_defined=True,
317
            external_quota=None):
318
        """Return a dictionary with the account metadata for the domain."""
319

    
320
        path, node = self._lookup_account(account, user == account)
321
        if user != account:
322
            if until or (node is None) or (account not
323
                                           in self._allowed_accounts(user)):
324
                raise NotAllowedError
325
        try:
326
            props = self._get_properties(node, until)
327
            mtime = props[self.MTIME]
328
        except NameError:
329
            props = None
330
            mtime = until
331
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
332
        tstamp = max(tstamp, mtime)
333
        if until is None:
334
            modified = tstamp
335
        else:
336
            modified = self._get_statistics(
337
                node, compute=True)[2]  # Overall last modification.
338
            modified = max(modified, mtime)
339

    
340
        if user != account:
341
            meta = {'name': account}
342
        else:
343
            meta = {}
344
            if props is not None and include_user_defined:
345
                meta.update(
346
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
347
            if until is not None:
348
                meta.update({'until_timestamp': tstamp})
349
            meta.update({'name': account, 'count': count, 'bytes': bytes})
350
            if self.using_external_quotaholder:
351
                external_quota = external_quota or {}
352
                meta['bytes'] = external_quota.get('usage', 0)
353
        meta.update({'modified': modified})
354
        return meta
355

    
356
    @debug_method
357
    def update_account_meta(self, user, account, domain, meta, replace=False):
358
        """Update the metadata associated with the account for the domain."""
359

    
360
        if user != account:
361
            raise NotAllowedError
362
        path, node = self._lookup_account(account, True)
363
        self._put_metadata(user, node, domain, meta, replace,
364
                           update_statistics_ancestors_depth=-1)
365

    
366
    @debug_method
367
    def get_account_groups(self, user, account):
368
        """Return a dictionary with the user groups defined for the account."""
369

    
370
        if user != account:
371
            if account not in self._allowed_accounts(user):
372
                raise NotAllowedError
373
            return {}
374
        self._lookup_account(account, True)
375
        return self.permissions.group_dict(account)
376

    
377
    @debug_method
378
    def update_account_groups(self, user, account, groups, replace=False):
379
        """Update the groups associated with the account."""
380

    
381
        if user != account:
382
            raise NotAllowedError
383
        self._lookup_account(account, True)
384
        self._check_groups(groups)
385
        if replace:
386
            self.permissions.group_destroy(account)
387
        for k, v in groups.iteritems():
388
            if not replace:  # If not already deleted.
389
                self.permissions.group_delete(account, k)
390
            if v:
391
                self.permissions.group_addmany(account, k, v)
392

    
393
    @debug_method
394
    def get_account_policy(self, user, account, external_quota=None):
395
        """Return a dictionary with the account policy."""
396

    
397
        if user != account:
398
            if account not in self._allowed_accounts(user):
399
                raise NotAllowedError
400
            return {}
401
        path, node = self._lookup_account(account, True)
402
        policy = self._get_policy(node, is_account_policy=True)
403
        if self.using_external_quotaholder:
404
            external_quota = external_quota or {}
405
            policy['quota'] = external_quota.get('limit', 0)
406
        return policy
407

    
408
    @debug_method
409
    def update_account_policy(self, user, account, policy, replace=False):
410
        """Update the policy associated with the account."""
411

    
412
        if user != account:
413
            raise NotAllowedError
414
        path, node = self._lookup_account(account, True)
415
        self._check_policy(policy, is_account_policy=True)
416
        self._put_policy(node, policy, replace, is_account_policy=True)
417

    
418
    @debug_method
419
    def put_account(self, user, account, policy=None):
420
        """Create a new account with the given name."""
421

    
422
        policy = policy or {}
423
        if user != account:
424
            raise NotAllowedError
425
        node = self.node.node_lookup(account)
426
        if node is not None:
427
            raise AccountExists('Account already exists')
428
        if policy:
429
            self._check_policy(policy, is_account_policy=True)
430
        node = self._put_path(user, self.ROOTNODE, account,
431
                              update_statistics_ancestors_depth=-1)
432
        self._put_policy(node, policy, True, is_account_policy=True)
433

    
434
    @debug_method
435
    def delete_account(self, user, account):
436
        """Delete the account with the given name."""
437

    
438
        if user != account:
439
            raise NotAllowedError
440
        node = self.node.node_lookup(account)
441
        if node is None:
442
            return
443
        if not self.node.node_remove(node,
444
                                     update_statistics_ancestors_depth=-1):
445
            raise AccountNotEmpty('Account is not empty')
446
        self.permissions.group_destroy(account)
447

    
448
    @debug_method
449
    def list_containers(self, user, account, marker=None, limit=10000,
450
                        shared=False, until=None, public=False):
451
        """Return a list of containers existing under an account."""
452

    
453
        if user != account:
454
            if until or account not in self._allowed_accounts(user):
455
                raise NotAllowedError
456
            allowed = self._allowed_containers(user, account)
457
            start, limit = self._list_limits(allowed, marker, limit)
458
            return allowed[start:start + limit]
459
        if shared or public:
460
            allowed = set()
461
            if shared:
462
                allowed.update([x.split('/', 2)[1] for x in
463
                               self.permissions.access_list_shared(account)])
464
            if public:
465
                allowed.update([x[0].split('/', 2)[1] for x in
466
                               self.permissions.public_list(account)])
467
            allowed = sorted(allowed)
468
            start, limit = self._list_limits(allowed, marker, limit)
469
            return allowed[start:start + limit]
470
        node = self.node.node_lookup(account)
471
        containers = [x[0] for x in self._list_object_properties(
472
            node, account, '', '/', marker, limit, False, None, [], until)]
473
        start, limit = self._list_limits(
474
            [x[0] for x in containers], marker, limit)
475
        return containers[start:start + limit]
476

    
477
    @debug_method
478
    def list_container_meta(self, user, account, container, domain,
479
                            until=None):
480
        """Return a list of the container's object meta keys for a domain."""
481

    
482
        allowed = []
483
        if user != account:
484
            if until:
485
                raise NotAllowedError
486
            allowed = self.permissions.access_list_paths(
487
                user, '/'.join((account, container)))
488
            if not allowed:
489
                raise NotAllowedError
490
        path, node = self._lookup_container(account, container)
491
        before = until if until is not None else inf
492
        allowed = self._get_formatted_paths(allowed)
493
        return self.node.latest_attribute_keys(node, domain, before,
494
                                               CLUSTER_DELETED, allowed)
495

    
496
    @debug_method
497
    def get_container_meta(self, user, account, container, domain, until=None,
498
                           include_user_defined=True):
499
        """Return a dictionary with the container metadata for the domain."""
500

    
501
        if user != account:
502
            if until or container not in self._allowed_containers(user,
503
                                                                  account):
504
                raise NotAllowedError
505
        path, node = self._lookup_container(account, container)
506
        props = self._get_properties(node, until)
507
        mtime = props[self.MTIME]
508
        count, bytes, tstamp = self._get_statistics(node, until)
509
        tstamp = max(tstamp, mtime)
510
        if until is None:
511
            modified = tstamp
512
        else:
513
            modified = self._get_statistics(
514
                node)[2]  # Overall last modification.
515
            modified = max(modified, mtime)
516

    
517
        if user != account:
518
            meta = {'name': container}
519
        else:
520
            meta = {}
521
            if include_user_defined:
522
                meta.update(
523
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
524
            if until is not None:
525
                meta.update({'until_timestamp': tstamp})
526
            meta.update({'name': container, 'count': count, 'bytes': bytes})
527
        meta.update({'modified': modified})
528
        return meta
529

    
530
    @debug_method
531
    def update_container_meta(self, user, account, container, domain, meta,
532
                              replace=False):
533
        """Update the metadata associated with the container for the domain."""
534

    
535
        if user != account:
536
            raise NotAllowedError
537
        path, node = self._lookup_container(account, container)
538
        src_version_id, dest_version_id = self._put_metadata(
539
            user, node, domain, meta, replace,
540
            update_statistics_ancestors_depth=0)
541
        if src_version_id is not None:
542
            versioning = self._get_policy(
543
                node, is_account_policy=False)['versioning']
544
            if versioning != 'auto':
545
                self.node.version_remove(src_version_id,
546
                                         update_statistics_ancestors_depth=0)
547

    
548
    @debug_method
549
    def get_container_policy(self, user, account, container):
550
        """Return a dictionary with the container policy."""
551

    
552
        if user != account:
553
            if container not in self._allowed_containers(user, account):
554
                raise NotAllowedError
555
            return {}
556
        path, node = self._lookup_container(account, container)
557
        return self._get_policy(node, is_account_policy=False)
558

    
559
    @debug_method
560
    def update_container_policy(self, user, account, container, policy,
561
                                replace=False):
562
        """Update the policy associated with the container."""
563

    
564
        if user != account:
565
            raise NotAllowedError
566
        path, node = self._lookup_container(account, container)
567
        self._check_policy(policy, is_account_policy=False)
568
        self._put_policy(node, policy, replace, is_account_policy=False)
569

    
570
    @debug_method
571
    def put_container(self, user, account, container, policy=None):
572
        """Create a new container with the given name."""
573

    
574
        policy = policy or {}
575
        if user != account:
576
            raise NotAllowedError
577
        try:
578
            path, node = self._lookup_container(account, container)
579
        except NameError:
580
            pass
581
        else:
582
            raise ContainerExists('Container already exists')
583
        if policy:
584
            self._check_policy(policy, is_account_policy=False)
585
        path = '/'.join((account, container))
586
        node = self._put_path(
587
            user, self._lookup_account(account, True)[1], path,
588
            update_statistics_ancestors_depth=-1)
589
        self._put_policy(node, policy, True, is_account_policy=False)
590

    
591
    @debug_method
592
    def delete_container(self, user, account, container, until=None, prefix='',
593
                         delimiter=None):
594
        """Delete/purge the container with the given name."""
595

    
596
        if user != account:
597
            raise NotAllowedError
598
        path, node = self._lookup_container(account, container)
599

    
600
        if until is not None:
601
            hashes, size, serials = self.node.node_purge_children(
602
                node, until, CLUSTER_HISTORY,
603
                update_statistics_ancestors_depth=0)
604
            for h in hashes:
605
                self.store.map_delete(h)
606
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
607
                                          update_statistics_ancestors_depth=0)
608
            if not self.free_versioning:
609
                self._report_size_change(
610
                    user, account, -size, {
611
                        'action': 'container purge',
612
                        'path': path,
613
                        'versions': ','.join(str(i) for i in serials)
614
                    }
615
                )
616
            return
617

    
618
        if not delimiter:
619
            if self._get_statistics(node)[0] > 0:
620
                raise ContainerNotEmpty('Container is not empty')
621
            hashes, size, serials = self.node.node_purge_children(
622
                node, inf, CLUSTER_HISTORY,
623
                update_statistics_ancestors_depth=0)
624
            for h in hashes:
625
                self.store.map_delete(h)
626
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
627
                                          update_statistics_ancestors_depth=0)
628
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
629
            if not self.free_versioning:
630
                self._report_size_change(
631
                    user, account, -size, {
632
                        'action': 'container purge',
633
                        'path': path,
634
                        'versions': ','.join(str(i) for i in serials)
635
                    }
636
                )
637
        else:
638
            # remove only contents
639
            src_names = self._list_objects_no_limit(
640
                user, account, container, prefix='', delimiter=None,
641
                virtual=False, domain=None, keys=[], shared=False, until=None,
642
                size_range=None, all_props=True, public=False)
643
            paths = []
644
            for t in src_names:
645
                path = '/'.join((account, container, t[0]))
646
                node = t[2]
647
                if not self._exists(node):
648
                    continue
649
                src_version_id, dest_version_id = self._put_version_duplicate(
650
                    user, node, size=0, type='', hash=None, checksum='',
651
                    cluster=CLUSTER_DELETED,
652
                    update_statistics_ancestors_depth=1)
653
                del_size = self._apply_versioning(
654
                    account, container, src_version_id,
655
                    update_statistics_ancestors_depth=1)
656
                self._report_size_change(
657
                    user, account, -del_size, {
658
                        'action': 'object delete',
659
                        'path': path,
660
                        'versions': ','.join([str(dest_version_id)])})
661
                self._report_object_change(
662
                    user, account, path, details={'action': 'object delete'})
663
                paths.append(path)
664
            self.permissions.access_clear_bulk(paths)
665

    
666
    def _list_objects(self, user, account, container, prefix, delimiter,
667
                      marker, limit, virtual, domain, keys, shared, until,
668
                      size_range, all_props, public):
669
        if user != account and until:
670
            raise NotAllowedError
671
        if shared and public:
672
            # get shared first
673
            shared_paths = self._list_object_permissions(
674
                user, account, container, prefix, shared=True, public=False)
675
            objects = set()
676
            if shared_paths:
677
                path, node = self._lookup_container(account, container)
678
                shared_paths = self._get_formatted_paths(shared_paths)
679
                objects |= set(self._list_object_properties(
680
                    node, path, prefix, delimiter, marker, limit, virtual,
681
                    domain, keys, until, size_range, shared_paths, all_props))
682

    
683
            # get public
684
            objects |= set(self._list_public_object_properties(
685
                user, account, container, prefix, all_props))
686
            objects = list(objects)
687

    
688
            objects.sort(key=lambda x: x[0])
689
            start, limit = self._list_limits(
690
                [x[0] for x in objects], marker, limit)
691
            return objects[start:start + limit]
692
        elif public:
693
            objects = self._list_public_object_properties(
694
                user, account, container, prefix, all_props)
695
            start, limit = self._list_limits(
696
                [x[0] for x in objects], marker, limit)
697
            return objects[start:start + limit]
698

    
699
        allowed = self._list_object_permissions(
700
            user, account, container, prefix, shared, public)
701
        if shared and not allowed:
702
            return []
703
        path, node = self._lookup_container(account, container)
704
        allowed = self._get_formatted_paths(allowed)
705
        objects = self._list_object_properties(
706
            node, path, prefix, delimiter, marker, limit, virtual, domain,
707
            keys, until, size_range, allowed, all_props)
708
        start, limit = self._list_limits(
709
            [x[0] for x in objects], marker, limit)
710
        return objects[start:start + limit]
711

    
712
    def _list_public_object_properties(self, user, account, container, prefix,
713
                                       all_props):
714
        public = self._list_object_permissions(
715
            user, account, container, prefix, shared=False, public=True)
716
        paths, nodes = self._lookup_objects(public)
717
        path = '/'.join((account, container))
718
        cont_prefix = path + '/'
719
        paths = [x[len(cont_prefix):] for x in paths]
720
        objects = [(p,) + props for p, props in
721
                   zip(paths, self.node.version_lookup_bulk(
722
                       nodes, all_props=all_props))]
723
        return objects
724

    
725
    def _list_objects_no_limit(self, user, account, container, prefix,
726
                               delimiter, virtual, domain, keys, shared, until,
727
                               size_range, all_props, public):
728
        objects = []
729
        while True:
730
            marker = objects[-1] if objects else None
731
            limit = 10000
732
            l = self._list_objects(
733
                user, account, container, prefix, delimiter, marker, limit,
734
                virtual, domain, keys, shared, until, size_range, all_props,
735
                public)
736
            objects.extend(l)
737
            if not l or len(l) < limit:
738
                break
739
        return objects
740

    
741
    def _list_object_permissions(self, user, account, container, prefix,
742
                                 shared, public):
743
        allowed = []
744
        path = '/'.join((account, container, prefix)).rstrip('/')
745
        if user != account:
746
            allowed = self.permissions.access_list_paths(user, path)
747
            if not allowed:
748
                raise NotAllowedError
749
        else:
750
            allowed = set()
751
            if shared:
752
                allowed.update(self.permissions.access_list_shared(path))
753
            if public:
754
                allowed.update(
755
                    [x[0] for x in self.permissions.public_list(path)])
756
            allowed = sorted(allowed)
757
            if not allowed:
758
                return []
759
        return allowed
760

    
761
    @debug_method
762
    def list_objects(self, user, account, container, prefix='', delimiter=None,
763
                     marker=None, limit=10000, virtual=True, domain=None,
764
                     keys=None, shared=False, until=None, size_range=None,
765
                     public=False):
766
        """List (object name, object version_id) under a container."""
767

    
768
        keys = keys or []
769
        return self._list_objects(
770
            user, account, container, prefix, delimiter, marker, limit,
771
            virtual, domain, keys, shared, until, size_range, False, public)
772

    
773
    @debug_method
774
    def list_object_meta(self, user, account, container, prefix='',
775
                         delimiter=None, marker=None, limit=10000,
776
                         virtual=True, domain=None, keys=None, shared=False,
777
                         until=None, size_range=None, public=False):
778
        """Return a list of metadata dicts of objects under a container."""
779

    
780
        keys = keys or []
781
        props = self._list_objects(
782
            user, account, container, prefix, delimiter, marker, limit,
783
            virtual, domain, keys, shared, until, size_range, True, public)
784
        objects = []
785
        for p in props:
786
            if len(p) == 2:
787
                objects.append({'subdir': p[0]})
788
            else:
789
                objects.append({
790
                    'name': p[0],
791
                    'bytes': p[self.SIZE + 1],
792
                    'type': p[self.TYPE + 1],
793
                    'hash': p[self.HASH + 1],
794
                    'version': p[self.SERIAL + 1],
795
                    'version_timestamp': p[self.MTIME + 1],
796
                    'modified': p[self.MTIME + 1] if until is None else None,
797
                    'modified_by': p[self.MUSER + 1],
798
                    'uuid': p[self.UUID + 1],
799
                    'checksum': p[self.CHECKSUM + 1]})
800
        return objects
801

    
802
    @debug_method
803
    def list_object_permissions(self, user, account, container, prefix=''):
804
        """Return a list of paths enforce permissions under a container."""
805

    
806
        return self._list_object_permissions(user, account, container, prefix,
807
                                             True, False)
808

    
809
    @debug_method
810
    def list_object_public(self, user, account, container, prefix=''):
811
        """Return a mapping of object paths to public ids under a container."""
812

    
813
        public = {}
814
        for path, p in self.permissions.public_list('/'.join((account,
815
                                                              container,
816
                                                              prefix))):
817
            public[path] = p
818
        return public
819

    
820
    @debug_method
821
    def get_object_meta(self, user, account, container, name, domain,
822
                        version=None, include_user_defined=True):
823
        """Return a dictionary with the object metadata for the domain."""
824

    
825
        self._can_read(user, account, container, name)
826
        path, node = self._lookup_object(account, container, name)
827
        props = self._get_version(node, version)
828
        if version is None:
829
            modified = props[self.MTIME]
830
        else:
831
            try:
832
                modified = self._get_version(
833
                    node)[self.MTIME]  # Overall last modification.
834
            except NameError:  # Object may be deleted.
835
                del_props = self.node.version_lookup(
836
                    node, inf, CLUSTER_DELETED)
837
                if del_props is None:
838
                    raise ItemNotExists('Object does not exist')
839
                modified = del_props[self.MTIME]
840

    
841
        meta = {}
842
        if include_user_defined:
843
            meta.update(
844
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
845
        meta.update({'name': name,
846
                     'bytes': props[self.SIZE],
847
                     'type': props[self.TYPE],
848
                     'hash': props[self.HASH],
849
                     'version': props[self.SERIAL],
850
                     'version_timestamp': props[self.MTIME],
851
                     'modified': modified,
852
                     'modified_by': props[self.MUSER],
853
                     'uuid': props[self.UUID],
854
                     'checksum': props[self.CHECKSUM]})
855
        return meta
856

    
857
    @debug_method
858
    def update_object_meta(self, user, account, container, name, domain, meta,
859
                           replace=False):
860
        """Update object metadata for a domain and return the new version."""
861

    
862
        self._can_write(user, account, container, name)
863

    
864
        path, node = self._lookup_object(account, container, name,
865
                                         lock_container=True)
866
        src_version_id, dest_version_id = self._put_metadata(
867
            user, node, domain, meta, replace,
868
            update_statistics_ancestors_depth=1)
869
        self._apply_versioning(account, container, src_version_id,
870
                               update_statistics_ancestors_depth=1)
871
        return dest_version_id
872
    
873
    @debug_method
874
    def get_object_permissions_bulk(self, user, account, container, names):
875
        """Return the action allowed on the object, the path
876
        from which the object gets its permissions from,
877
        along with a dictionary containing the permissions."""
878

    
879
        permissions_path = self._get_permissions_path_bulk(account,
880
                container,names)
881
        object_permissions = {}
882
        access_objects = self.permissions.access_check_bulk(permissions_path,
883
                user)
884
        group_parents = access_objects['group_parents']
885
        nobject_permissions = {}
886
        for path in permissions_path:
887
            allowed = 1
888
            name = path.split('/')[-1]
889
            if user != account:
890
                try:
891
                    allowed = access_objects[path]
892
                except KeyError:
893
                    raise NotAllowedError
894
            access_dict, allowed = \
895
                self.permissions.access_get_for_bulk(access_objects[path])
896
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
897
                                         access_dict)
898
        self._lookup_objects(permissions_path)
899
        return nobject_permissions
900

    
901

    
902
    @debug_method
903
    def get_object_permissions(self, user, account, container, name):
904
        """Return the action allowed on the object, the path
905
        from which the object gets its permissions from,
906
        along with a dictionary containing the permissions."""
907

    
908
        allowed = 'write'
909
        permissions_path = self._get_permissions_path(account, container, name)
910
        if user != account:
911
            if self.permissions.access_check(permissions_path, self.WRITE,
912
                                             user):
913
                allowed = 'write'
914
            elif self.permissions.access_check(permissions_path, self.READ,
915
                                               user):
916
                allowed = 'read'
917
            else:
918
                raise NotAllowedError
919
        self._lookup_object(account, container, name)
920
        return (allowed,
921
                permissions_path,
922
                self.permissions.access_get(permissions_path))
923

    
924
    @debug_method
925
    def update_object_permissions(self, user, account, container, name,
926
                                  permissions):
927
        """Update the permissions associated with the object."""
928

    
929
        if user != account:
930
            raise NotAllowedError
931
        path = self._lookup_object(account, container, name,
932
                                   lock_container=True)[0]
933
        self._check_permissions(path, permissions)
934
        self.permissions.access_set(path, permissions)
935
        self._report_sharing_change(user, account, path, {'members':
936
                                    self.permissions.access_members(path)})
937

    
938
    @debug_method
939
    def get_object_public(self, user, account, container, name):
940
        """Return the public id of the object if applicable."""
941

    
942
        self._can_read(user, account, container, name)
943
        path = self._lookup_object(account, container, name)[0]
944
        p = self.permissions.public_get(path)
945
        return p
946

    
947
    @debug_method
948
    def update_object_public(self, user, account, container, name, public):
949
        """Update the public status of the object."""
950

    
951
        self._can_write(user, account, container, name)
952
        path = self._lookup_object(account, container, name,
953
                                   lock_container=True)[0]
954
        if not public:
955
            self.permissions.public_unset(path)
956
        else:
957
            self.permissions.public_set(
958
                path, self.public_url_security, self.public_url_alphabet)
959

    
960
    @debug_method
961
    def get_object_hashmap(self, user, account, container, name, version=None):
962
        """Return the object's size and a list with partial hashes."""
963

    
964
        self._can_read(user, account, container, name)
965
        path, node = self._lookup_object(account, container, name)
966
        props = self._get_version(node, version)
967
        if props[self.HASH] is None:
968
            return 0, ()
969
        hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
970
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
971

    
972
    def _update_object_hash(self, user, account, container, name, size, type,
973
                            hash, checksum, domain, meta, replace_meta,
974
                            permissions, src_node=None, src_version_id=None,
975
                            is_copy=False, report_size_change=True):
976
        if permissions is not None and user != account:
977
            raise NotAllowedError
978
        self._can_write(user, account, container, name)
979
        if permissions is not None:
980
            path = '/'.join((account, container, name))
981
            self._check_permissions(path, permissions)
982

    
983
        account_path, account_node = self._lookup_account(account, True)
984
        container_path, container_node = self._lookup_container(
985
            account, container)
986

    
987
        path, node = self._put_object_node(
988
            container_path, container_node, name)
989
        pre_version_id, dest_version_id = self._put_version_duplicate(
990
            user, node, src_node=src_node, size=size, type=type, hash=hash,
991
            checksum=checksum, is_copy=is_copy,
992
            update_statistics_ancestors_depth=1)
993

    
994
        # Handle meta.
995
        if src_version_id is None:
996
            src_version_id = pre_version_id
997
        self._put_metadata_duplicate(
998
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
999

    
1000
        del_size = self._apply_versioning(account, container, pre_version_id,
1001
                                          update_statistics_ancestors_depth=1)
1002
        size_delta = size - del_size
1003
        if size_delta > 0:
1004
            # Check account quota.
1005
            if not self.using_external_quotaholder:
1006
                account_quota = long(self._get_policy(
1007
                    account_node, is_account_policy=True)['quota'])
1008
                account_usage = self._get_statistics(account_node,
1009
                                                     compute=True)[1]
1010
                if (account_quota > 0 and account_usage > account_quota):
1011
                    raise QuotaError(
1012
                        'Account quota exceeded: limit: %s, usage: %s' % (
1013
                            account_quota, account_usage))
1014

    
1015
            # Check container quota.
1016
            container_quota = long(self._get_policy(
1017
                container_node, is_account_policy=False)['quota'])
1018
            container_usage = self._get_statistics(container_node)[1]
1019
            if (container_quota > 0 and container_usage > container_quota):
1020
                # This must be executed in a transaction, so the version is
1021
                # never created if it fails.
1022
                raise QuotaError(
1023
                    'Container quota exceeded: limit: %s, usage: %s' % (
1024
                        container_quota, container_usage
1025
                    )
1026
                )
1027

    
1028
        if report_size_change:
1029
            self._report_size_change(
1030
                user, account, size_delta,
1031
                {'action': 'object update', 'path': path,
1032
                 'versions': ','.join([str(dest_version_id)])})
1033
        if permissions is not None:
1034
            self.permissions.access_set(path, permissions)
1035
            self._report_sharing_change(
1036
                user, account, path,
1037
                {'members': self.permissions.access_members(path)})
1038

    
1039
        self._report_object_change(
1040
            user, account, path,
1041
            details={'version': dest_version_id, 'action': 'object update'})
1042
        return dest_version_id
1043

    
1044
    @debug_method
1045
    def update_object_hashmap(self, user, account, container, name, size, type,
1046
                              hashmap, checksum, domain, meta=None,
1047
                              replace_meta=False, permissions=None):
1048
        """Create/update an object's hashmap and return the new version."""
1049

    
1050
        meta = meta or {}
1051
        if size == 0:  # No such thing as an empty hashmap.
1052
            hashmap = [self.put_block('')]
1053
        map = HashMap(self.block_size, self.hash_algorithm)
1054
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1055
        missing = self.store.block_search(map)
1056
        if missing:
1057
            ie = IndexError()
1058
            ie.data = [binascii.hexlify(x) for x in missing]
1059
            raise ie
1060

    
1061
        hash = map.hash()
1062
        hexlified = binascii.hexlify(hash)
1063
        dest_version_id = self._update_object_hash(
1064
            user, account, container, name, size, type, hexlified, checksum,
1065
            domain, meta, replace_meta, permissions)
1066
        self.store.map_put(hash, map)
1067
        return dest_version_id, hexlified
1068

    
1069
    @debug_method
1070
    def update_object_checksum(self, user, account, container, name, version,
1071
                               checksum):
1072
        """Update an object's checksum."""
1073

    
1074
        # Update objects with greater version and same hashmap
1075
        # and size (fix metadata updates).
1076
        self._can_write(user, account, container, name)
1077
        path, node = self._lookup_object(account, container, name,
1078
                                         lock_container=True)
1079
        props = self._get_version(node, version)
1080
        versions = self.node.node_get_versions(node)
1081
        for x in versions:
1082
            if (x[self.SERIAL] >= int(version) and
1083
                x[self.HASH] == props[self.HASH] and
1084
                    x[self.SIZE] == props[self.SIZE]):
1085
                self.node.version_put_property(
1086
                    x[self.SERIAL], 'checksum', checksum)
1087

    
1088
    def _copy_object(self, user, src_account, src_container, src_name,
1089
                     dest_account, dest_container, dest_name, type,
1090
                     dest_domain=None, dest_meta=None, replace_meta=False,
1091
                     permissions=None, src_version=None, is_move=False,
1092
                     delimiter=None):
1093

    
1094
        report_size_change = not is_move
1095
        dest_meta = dest_meta or {}
1096
        dest_version_ids = []
1097
        self._can_read(user, src_account, src_container, src_name)
1098

    
1099
        src_container_path = '/'.join((src_account, src_container))
1100
        dest_container_path = '/'.join((dest_account, dest_container))
1101
        # Lock container paths in alphabetical order
1102
        if src_container_path < dest_container_path:
1103
            self._lookup_container(src_account, src_container)
1104
            self._lookup_container(dest_account, dest_container)
1105
        else:
1106
            self._lookup_container(dest_account, dest_container)
1107
            self._lookup_container(src_account, src_container)
1108

    
1109
        path, node = self._lookup_object(src_account, src_container, src_name)
1110
        # TODO: Will do another fetch of the properties in duplicate version...
1111
        props = self._get_version(
1112
            node, src_version)  # Check to see if source exists.
1113
        src_version_id = props[self.SERIAL]
1114
        hash = props[self.HASH]
1115
        size = props[self.SIZE]
1116
        is_copy = not is_move and (src_account, src_container, src_name) != (
1117
            dest_account, dest_container, dest_name)  # New uuid.
1118
        dest_version_ids.append(self._update_object_hash(
1119
            user, dest_account, dest_container, dest_name, size, type, hash,
1120
            None, dest_domain, dest_meta, replace_meta, permissions,
1121
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1122
            report_size_change=report_size_change))
1123
        if is_move and ((src_account, src_container, src_name) !=
1124
                        (dest_account, dest_container, dest_name)):
1125
            self._delete_object(user, src_account, src_container, src_name,
1126
                                report_size_change=report_size_change)
1127

    
1128
        if delimiter:
1129
            prefix = (src_name + delimiter if not
1130
                      src_name.endswith(delimiter) else src_name)
1131
            src_names = self._list_objects_no_limit(
1132
                user, src_account, src_container, prefix, delimiter=None,
1133
                virtual=False, domain=None, keys=[], shared=False, until=None,
1134
                size_range=None, all_props=True, public=False)
1135
            src_names.sort(key=lambda x: x[2])  # order by nodes
1136
            paths = [elem[0] for elem in src_names]
1137
            nodes = [elem[2] for elem in src_names]
1138
            # TODO: Will do another fetch of the properties
1139
            # in duplicate version...
1140
            props = self._get_versions(nodes)  # Check to see if source exists.
1141

    
1142
            for prop, path, node in zip(props, paths, nodes):
1143
                src_version_id = prop[self.SERIAL]
1144
                hash = prop[self.HASH]
1145
                vtype = prop[self.TYPE]
1146
                size = prop[self.SIZE]
1147
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1148
                    delimiter) else dest_name
1149
                vdest_name = path.replace(prefix, dest_prefix, 1)
1150
                dest_version_ids.append(self._update_object_hash(
1151
                    user, dest_account, dest_container, vdest_name, size,
1152
                    vtype, hash, None, dest_domain, meta={},
1153
                    replace_meta=False, permissions=None, src_node=node,
1154
                    src_version_id=src_version_id, is_copy=is_copy))
1155
                if is_move and ((src_account, src_container, src_name) !=
1156
                                (dest_account, dest_container, dest_name)):
1157
                    self._delete_object(user, src_account, src_container, path)
1158
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1159
                dest_version_ids)
1160

    
1161
    @debug_method
1162
    def copy_object(self, user, src_account, src_container, src_name,
1163
                    dest_account, dest_container, dest_name, type, domain,
1164
                    meta=None, replace_meta=False, permissions=None,
1165
                    src_version=None, delimiter=None):
1166
        """Copy an object's data and metadata."""
1167

    
1168
        meta = meta or {}
1169
        dest_version_id = self._copy_object(
1170
            user, src_account, src_container, src_name, dest_account,
1171
            dest_container, dest_name, type, domain, meta, replace_meta,
1172
            permissions, src_version, False, delimiter)
1173
        return dest_version_id
1174

    
1175
    @debug_method
1176
    def move_object(self, user, src_account, src_container, src_name,
1177
                    dest_account, dest_container, dest_name, type, domain,
1178
                    meta=None, replace_meta=False, permissions=None,
1179
                    delimiter=None):
1180
        """Move an object's data and metadata."""
1181

    
1182
        meta = meta or {}
1183
        if user != src_account:
1184
            raise NotAllowedError
1185
        dest_version_id = self._move_object(
1186
            user, src_account, src_container, src_name, dest_account,
1187
            dest_container, dest_name, type, domain, meta, replace_meta,
1188
            permissions, None, delimiter=delimiter)
1189
        return dest_version_id
1190

    
1191
    def _delete_object(self, user, account, container, name, until=None,
1192
                       delimiter=None, report_size_change=True):
1193
        if user != account:
1194
            raise NotAllowedError
1195

    
1196
        # lookup object and lock container path also
1197
        path, node = self._lookup_object(account, container, name,
1198
                                         lock_container=True)
1199

    
1200
        if until is not None:
1201
            if node is None:
1202
                return
1203
            hashes = []
1204
            size = 0
1205
            serials = []
1206
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1207
                                           update_statistics_ancestors_depth=1)
1208
            hashes += h
1209
            size += s
1210
            serials += v
1211
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1212
                                           update_statistics_ancestors_depth=1)
1213
            hashes += h
1214
            if not self.free_versioning:
1215
                size += s
1216
            serials += v
1217
            for h in hashes:
1218
                self.store.map_delete(h)
1219
            self.node.node_purge(node, until, CLUSTER_DELETED,
1220
                                 update_statistics_ancestors_depth=1)
1221
            try:
1222
                self._get_version(node)
1223
            except NameError:
1224
                self.permissions.access_clear(path)
1225
            self._report_size_change(
1226
                user, account, -size, {
1227
                    'action': 'object purge',
1228
                    'path': path,
1229
                    'versions': ','.join(str(i) for i in serials)
1230
                }
1231
            )
1232
            return
1233

    
1234
        if not self._exists(node):
1235
            raise ItemNotExists('Object is deleted.')
1236
        src_version_id, dest_version_id = self._put_version_duplicate(
1237
            user, node, size=0, type='', hash=None, checksum='',
1238
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1239
        del_size = self._apply_versioning(account, container, src_version_id,
1240
                                          update_statistics_ancestors_depth=1)
1241
        if report_size_change:
1242
            self._report_size_change(
1243
                user, account, -del_size,
1244
                {'action': 'object delete',
1245
                 'path': path,
1246
                 'versions': ','.join([str(dest_version_id)])})
1247
        self._report_object_change(
1248
            user, account, path, details={'action': 'object delete'})
1249
        self.permissions.access_clear(path)
1250

    
1251
        if delimiter:
1252
            prefix = name + delimiter if not name.endswith(delimiter) else name
1253
            src_names = self._list_objects_no_limit(
1254
                user, account, container, prefix, delimiter=None,
1255
                virtual=False, domain=None, keys=[], shared=False, until=None,
1256
                size_range=None, all_props=True, public=False)
1257
            paths = []
1258
            for t in src_names:
1259
                path = '/'.join((account, container, t[0]))
1260
                node = t[2]
1261
                if not self._exists(node):
1262
                    continue
1263
                src_version_id, dest_version_id = self._put_version_duplicate(
1264
                    user, node, size=0, type='', hash=None, checksum='',
1265
                    cluster=CLUSTER_DELETED,
1266
                    update_statistics_ancestors_depth=1)
1267
                del_size = self._apply_versioning(
1268
                    account, container, src_version_id,
1269
                    update_statistics_ancestors_depth=1)
1270
                if report_size_change:
1271
                    self._report_size_change(
1272
                        user, account, -del_size,
1273
                        {'action': 'object delete',
1274
                         'path': path,
1275
                         'versions': ','.join([str(dest_version_id)])})
1276
                self._report_object_change(
1277
                    user, account, path, details={'action': 'object delete'})
1278
                paths.append(path)
1279
            self.permissions.access_clear_bulk(paths)
1280

    
1281
    @debug_method
1282
    def delete_object(self, user, account, container, name, until=None,
1283
                      prefix='', delimiter=None):
1284
        """Delete/purge an object."""
1285

    
1286
        self._delete_object(user, account, container, name, until, delimiter)
1287

    
1288
    @debug_method
1289
    def list_versions(self, user, account, container, name):
1290
        """Return a list of all object (version, version_timestamp) tuples."""
1291

    
1292
        self._can_read(user, account, container, name)
1293
        path, node = self._lookup_object(account, container, name)
1294
        versions = self.node.node_get_versions(node)
1295
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1296
                x[self.CLUSTER] != CLUSTER_DELETED]
1297

    
1298
    @debug_method
1299
    def get_uuid(self, user, uuid):
1300
        """Return the (account, container, name) for the UUID given."""
1301

    
1302
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1303
        if info is None:
1304
            raise NameError
1305
        path, serial = info
1306
        account, container, name = path.split('/', 2)
1307
        self._can_read(user, account, container, name)
1308
        return (account, container, name)
1309

    
1310
    @debug_method
1311
    def get_public(self, user, public):
1312
        """Return the (account, container, name) for the public id given."""
1313

    
1314
        path = self.permissions.public_path(public)
1315
        if path is None:
1316
            raise NameError
1317
        account, container, name = path.split('/', 2)
1318
        self._can_read(user, account, container, name)
1319
        return (account, container, name)
1320

    
1321
    def get_block(self, hash):
1322
        """Return a block's data."""
1323

    
1324
        logger.debug("get_block: %s", hash)
1325
        block = self.store.block_get(self._unhexlify_hash(hash))
1326
        if not block:
1327
            raise ItemNotExists('Block does not exist')
1328
        return block
1329

    
1330
    def put_block(self, data):
1331
        """Store a block and return the hash."""
1332

    
1333
        logger.debug("put_block: %s", len(data))
1334
        return binascii.hexlify(self.store.block_put(data))
1335

    
1336
    def update_block(self, hash, data, offset=0):
1337
        """Update a known block and return the hash."""
1338

    
1339
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1340
        if offset == 0 and len(data) == self.block_size:
1341
            return self.put_block(data)
1342
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1343
        return binascii.hexlify(h)
1344

    
1345
    # Path functions.
1346

    
1347
    def _generate_uuid(self):
1348
        return str(uuidlib.uuid4())
1349

    
1350
    def _put_object_node(self, path, parent, name):
1351
        path = '/'.join((path, name))
1352
        node = self.node.node_lookup(path)
1353
        if node is None:
1354
            node = self.node.node_create(parent, path)
1355
        return path, node
1356

    
1357
    def _put_path(self, user, parent, path,
1358
                  update_statistics_ancestors_depth=None):
1359
        node = self.node.node_create(parent, path)
1360
        self.node.version_create(node, None, 0, '', None, user,
1361
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1362
                                 update_statistics_ancestors_depth)
1363
        return node
1364

    
1365
    def _lookup_account(self, account, create=True):
1366
        node = self.node.node_lookup(account)
1367
        if node is None and create:
1368
            node = self._put_path(
1369
                account, self.ROOTNODE, account,
1370
                update_statistics_ancestors_depth=-1)  # User is account.
1371
        return account, node
1372

    
1373
    def _lookup_container(self, account, container):
1374
        for_update = True if self.lock_container_path else False
1375
        path = '/'.join((account, container))
1376
        node = self.node.node_lookup(path, for_update)
1377
        if node is None:
1378
            raise ItemNotExists('Container does not exist')
1379
        return path, node
1380

    
1381
    def _lookup_object(self, account, container, name, lock_container=False):
1382
        if lock_container:
1383
            self._lookup_container(account, container)
1384

    
1385
        path = '/'.join((account, container, name))
1386
        node = self.node.node_lookup(path)
1387
        if node is None:
1388
            raise ItemNotExists('Object does not exist')
1389
        return path, node
1390

    
1391
    def _lookup_objects(self, paths):
1392
        nodes = self.node.node_lookup_bulk(paths)
1393
        return paths, nodes
1394

    
1395
    def _get_properties(self, node, until=None):
1396
        """Return properties until the timestamp given."""
1397

    
1398
        before = until if until is not None else inf
1399
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1400
        if props is None and until is not None:
1401
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1402
        if props is None:
1403
            raise ItemNotExists('Path does not exist')
1404
        return props
1405

    
1406
    def _get_statistics(self, node, until=None, compute=False):
1407
        """Return (count, sum of size, timestamp) of everything under node."""
1408

    
1409
        if until is not None:
1410
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1411
        elif compute:
1412
            stats = self.node.statistics_latest(node,
1413
                                                except_cluster=CLUSTER_DELETED)
1414
        else:
1415
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1416
        if stats is None:
1417
            stats = (0, 0, 0)
1418
        return stats
1419

    
1420
    def _get_version(self, node, version=None):
1421
        if version is None:
1422
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1423
            if props is None:
1424
                raise ItemNotExists('Object does not exist')
1425
        else:
1426
            try:
1427
                version = int(version)
1428
            except ValueError:
1429
                raise VersionNotExists('Version does not exist')
1430
            props = self.node.version_get_properties(version, node=node)
1431
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1432
                raise VersionNotExists('Version does not exist')
1433
        return props
1434

    
1435
    def _get_versions(self, nodes):
1436
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1437

    
1438
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1439
                               type=None, hash=None, checksum=None,
1440
                               cluster=CLUSTER_NORMAL, is_copy=False,
1441
                               update_statistics_ancestors_depth=None):
1442
        """Create a new version of the node."""
1443

    
1444
        props = self.node.version_lookup(
1445
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1446
        if props is not None:
1447
            src_version_id = props[self.SERIAL]
1448
            src_hash = props[self.HASH]
1449
            src_size = props[self.SIZE]
1450
            src_type = props[self.TYPE]
1451
            src_checksum = props[self.CHECKSUM]
1452
        else:
1453
            src_version_id = None
1454
            src_hash = None
1455
            src_size = 0
1456
            src_type = ''
1457
            src_checksum = ''
1458
        if size is None:  # Set metadata.
1459
            hash = src_hash  # This way hash can be set to None
1460
                             # (account or container).
1461
            size = src_size
1462
        if type is None:
1463
            type = src_type
1464
        if checksum is None:
1465
            checksum = src_checksum
1466
        uuid = self._generate_uuid(
1467
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1468

    
1469
        if src_node is None:
1470
            pre_version_id = src_version_id
1471
        else:
1472
            pre_version_id = None
1473
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1474
            if props is not None:
1475
                pre_version_id = props[self.SERIAL]
1476
        if pre_version_id is not None:
1477
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1478
                                        update_statistics_ancestors_depth)
1479

    
1480
        dest_version_id, mtime = self.node.version_create(
1481
            node, hash, size, type, src_version_id, user, uuid, checksum,
1482
            cluster, update_statistics_ancestors_depth)
1483

    
1484
        self.node.attribute_unset_is_latest(node, dest_version_id)
1485

    
1486
        return pre_version_id, dest_version_id
1487

    
1488
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1489
                                node, meta, replace=False):
1490
        if src_version_id is not None:
1491
            self.node.attribute_copy(src_version_id, dest_version_id)
1492
        if not replace:
1493
            self.node.attribute_del(dest_version_id, domain, (
1494
                k for k, v in meta.iteritems() if v == ''))
1495
            self.node.attribute_set(dest_version_id, domain, node, (
1496
                (k, v) for k, v in meta.iteritems() if v != ''))
1497
        else:
1498
            self.node.attribute_del(dest_version_id, domain)
1499
            self.node.attribute_set(dest_version_id, domain, node, ((
1500
                k, v) for k, v in meta.iteritems()))
1501

    
1502
    def _put_metadata(self, user, node, domain, meta, replace=False,
1503
                      update_statistics_ancestors_depth=None):
1504
        """Create a new version and store metadata."""
1505

    
1506
        src_version_id, dest_version_id = self._put_version_duplicate(
1507
            user, node,
1508
            update_statistics_ancestors_depth=
1509
            update_statistics_ancestors_depth)
1510
        self._put_metadata_duplicate(
1511
            src_version_id, dest_version_id, domain, node, meta, replace)
1512
        return src_version_id, dest_version_id
1513

    
1514
    def _list_limits(self, listing, marker, limit):
1515
        start = 0
1516
        if marker:
1517
            try:
1518
                start = listing.index(marker) + 1
1519
            except ValueError:
1520
                pass
1521
        if not limit or limit > 10000:
1522
            limit = 10000
1523
        return start, limit
1524

    
1525
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1526
                                marker=None, limit=10000, virtual=True,
1527
                                domain=None, keys=None, until=None,
1528
                                size_range=None, allowed=None,
1529
                                all_props=False):
1530
        keys = keys or []
1531
        allowed = allowed or []
1532
        cont_prefix = path + '/'
1533
        prefix = cont_prefix + prefix
1534
        start = cont_prefix + marker if marker else None
1535
        before = until if until is not None else inf
1536
        filterq = keys if domain else []
1537
        sizeq = size_range
1538

    
1539
        objects, prefixes = self.node.latest_version_list(
1540
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1541
            allowed, domain, filterq, sizeq, all_props)
1542
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1543
        objects.sort(key=lambda x: x[0])
1544
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1545
        return objects
1546

    
1547
    # Reporting functions.
1548

    
1549
    @debug_method
1550
    def _report_size_change(self, user, account, size, details=None):
1551
        details = details or {}
1552

    
1553
        if size == 0:
1554
            return
1555

    
1556
        account_node = self._lookup_account(account, True)[1]
1557
        total = self._get_statistics(account_node, compute=True)[1]
1558
        details.update({'user': user, 'total': total})
1559
        self.messages.append(
1560
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1561
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1562

    
1563
        if not self.using_external_quotaholder:
1564
            return
1565

    
1566
        try:
1567
            name = details['path'] if 'path' in details else ''
1568
            serial = self.astakosclient.issue_one_commission(
1569
                token=self.service_token,
1570
                holder=account,
1571
                source=DEFAULT_SOURCE,
1572
                provisions={'pithos.diskspace': size},
1573
                name=name)
1574
        except BaseException, e:
1575
            raise QuotaError(e)
1576
        else:
1577
            self.serials.append(serial)
1578

    
1579
    @debug_method
1580
    def _report_object_change(self, user, account, path, details=None):
1581
        details = details or {}
1582
        details.update({'user': user})
1583
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1584
                              account, QUEUE_INSTANCE_ID, 'object', path,
1585
                              details))
1586

    
1587
    @debug_method
1588
    def _report_sharing_change(self, user, account, path, details=None):
1589
        details = details or {}
1590
        details.update({'user': user})
1591
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1592
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1593
                              details))
1594

    
1595
    # Policy functions.
1596

    
1597
    def _check_policy(self, policy, is_account_policy=True):
1598
        default_policy = self.default_account_policy \
1599
            if is_account_policy else self.default_container_policy
1600
        for k in policy.keys():
1601
            if policy[k] == '':
1602
                policy[k] = default_policy.get(k)
1603
        for k, v in policy.iteritems():
1604
            if k == 'quota':
1605
                q = int(v)  # May raise ValueError.
1606
                if q < 0:
1607
                    raise ValueError
1608
            elif k == 'versioning':
1609
                if v not in ['auto', 'none']:
1610
                    raise ValueError
1611
            else:
1612
                raise ValueError
1613

    
1614
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1615
        default_policy = self.default_account_policy \
1616
            if is_account_policy else self.default_container_policy
1617
        if replace:
1618
            for k, v in default_policy.iteritems():
1619
                if k not in policy:
1620
                    policy[k] = v
1621
        self.node.policy_set(node, policy)
1622

    
1623
    def _get_policy(self, node, is_account_policy=True):
1624
        default_policy = self.default_account_policy \
1625
            if is_account_policy else self.default_container_policy
1626
        policy = default_policy.copy()
1627
        policy.update(self.node.policy_get(node))
1628
        return policy
1629

    
1630
    def _apply_versioning(self, account, container, version_id,
1631
                          update_statistics_ancestors_depth=None):
1632
        """Delete the provided version if such is the policy.
1633
           Return size of object removed.
1634
        """
1635

    
1636
        if version_id is None:
1637
            return 0
1638
        path, node = self._lookup_container(account, container)
1639
        versioning = self._get_policy(
1640
            node, is_account_policy=False)['versioning']
1641
        if versioning != 'auto':
1642
            hash, size = self.node.version_remove(
1643
                version_id, update_statistics_ancestors_depth)
1644
            self.store.map_delete(hash)
1645
            return size
1646
        elif self.free_versioning:
1647
            return self.node.version_get_properties(
1648
                version_id, keys=('size',))[0]
1649
        return 0
1650

    
1651
    # Access control functions.
1652

    
1653
    def _check_groups(self, groups):
1654
        # raise ValueError('Bad characters in groups')
1655
        pass
1656

    
1657
    def _check_permissions(self, path, permissions):
1658
        # raise ValueError('Bad characters in permissions')
1659
        pass
1660

    
1661
    def _get_formatted_paths(self, paths):
1662
        formatted = []
1663
        for p in paths:
1664
            node = self.node.node_lookup(p)
1665
            props = None
1666
            if node is not None:
1667
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1668
            if props is not None:
1669
                if props[self.TYPE].split(';', 1)[0].strip() in (
1670
                        'application/directory', 'application/folder'):
1671
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1672
                formatted.append((p, self.MATCH_EXACT))
1673
        return formatted
1674

    
1675
    def _get_formatted_paths(self, paths):
1676
        formatted = []
1677
        if len(paths) == 0 :
1678
            return formatted
1679
        props = self.node.get_props(paths)
1680
        if props:
1681
            for prop in props:
1682
                if prop[1].split(';', 1)[0].strip() in (
1683
                        'application/directory', 'application/folder'):
1684
                    formatted.append((prop[0].rstrip('/') + '/', self.MATCH_PREFIX))
1685
                formatted.append((prop[0], self.MATCH_EXACT))
1686
        return formatted
1687

    
1688
    def _get_permissions_path(self, account, container, name):
1689
        path = '/'.join((account, container, name))
1690
        permission_paths = self.permissions.access_inherit(path)
1691
        permission_paths.sort()
1692
        permission_paths.reverse()
1693
        for p in permission_paths:
1694
            if p == path:
1695
                return p
1696
            else:
1697
                if p.count('/') < 2:
1698
                    continue
1699
                node = self.node.node_lookup(p)
1700
                props = None
1701
                if node is not None:
1702
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1703
                if props is not None:
1704
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1705
                        return p
1706
        return None
1707

    
1708
    def _get_permissions_path_bulk(self, account, container, names):
1709
        formatted_paths = []
1710
        for name in names:
1711
            path = '/'.join((account, container, name))
1712
            formatted_paths.append(path)
1713
        permission_paths = self.permissions.access_inherit_bulk(formatted_paths)
1714
        permission_paths.sort()
1715
        permission_paths.reverse()
1716
        permission_paths_list = []
1717
        lookup_list = []
1718
        for p in permission_paths:
1719
            if p in formatted_paths:
1720
                permission_paths_list.append(p)
1721
            else:
1722
                if p.count('/') < 2:
1723
                    continue
1724
                lookup_list.append(p)
1725

    
1726
        if len(lookup_list) > 0:
1727
            props = self.node.get_props(paths)
1728
            if props:
1729
                for prop in props:
1730
                    if prop[1].split(';', 1)[0].strip() in (
1731
                            'application/directory', 'application/folder'):
1732
                        permission_paths_list.append((prop[0].rstrip('/') + '/',self.MATCH_PREFIX))
1733

    
1734
        if len(permission_paths_list) > 0:
1735
            return permission_paths_list
1736

    
1737
        return None
1738

    
1739
    def _can_read(self, user, account, container, name):
1740
        if user == account:
1741
            return True
1742
        path = '/'.join((account, container, name))
1743
        if self.permissions.public_get(path) is not None:
1744
            return True
1745
        path = self._get_permissions_path(account, container, name)
1746
        if not path:
1747
            raise NotAllowedError
1748
        if (not self.permissions.access_check(path, self.READ, user) and not
1749
                self.permissions.access_check(path, self.WRITE, user)):
1750
            raise NotAllowedError
1751

    
1752
    def _can_write(self, user, account, container, name):
1753
        if user == account:
1754
            return True
1755
        path = '/'.join((account, container, name))
1756
        path = self._get_permissions_path(account, container, name)
1757
        if not path:
1758
            raise NotAllowedError
1759
        if not self.permissions.access_check(path, self.WRITE, user):
1760
            raise NotAllowedError
1761

    
1762
    def _allowed_accounts(self, user):
1763
        allow = set()
1764
        for path in self.permissions.access_list_paths(user):
1765
            allow.add(path.split('/', 1)[0])
1766
        return sorted(allow)
1767

    
1768
    def _allowed_containers(self, user, account):
1769
        allow = set()
1770
        for path in self.permissions.access_list_paths(user, account):
1771
            allow.add(path.split('/', 2)[1])
1772
        return sorted(allow)
1773

    
1774
    # Domain functions
1775

    
1776
    @debug_method
1777
    def get_domain_objects(self, domain, user=None):
1778
        allowed_paths = self.permissions.access_list_paths(
1779
            user, include_owned=user is not None, include_containers=False)
1780
        if not allowed_paths:
1781
            return []
1782
        obj_list = self.node.domain_object_list(
1783
            domain, allowed_paths, CLUSTER_NORMAL)
1784
        return [(path,
1785
                 self._build_metadata(props, user_defined_meta),
1786
                 self.permissions.access_get(path)) for
1787
                path, props, user_defined_meta in obj_list]
1788

    
1789
    # util functions
1790

    
1791
    def _build_metadata(self, props, user_defined=None,
1792
                        include_user_defined=True):
1793
        meta = {'bytes': props[self.SIZE],
1794
                'type': props[self.TYPE],
1795
                'hash': props[self.HASH],
1796
                'version': props[self.SERIAL],
1797
                'version_timestamp': props[self.MTIME],
1798
                'modified_by': props[self.MUSER],
1799
                'uuid': props[self.UUID],
1800
                'checksum': props[self.CHECKSUM]}
1801
        if include_user_defined and user_defined is not None:
1802
            meta.update(user_defined)
1803
        return meta
1804

    
1805
    def _exists(self, node):
1806
        try:
1807
            self._get_version(node)
1808
        except ItemNotExists:
1809
            return False
1810
        else:
1811
            return True
1812

    
1813
    def _unhexlify_hash(self, hash):
1814
        try:
1815
            return binascii.unhexlify(hash)
1816
        except TypeError:
1817
            raise InvalidHash(hash)