Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 2d5b1b7b

History | View | Annotate | Download (74.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import uuid as uuidlib
36
import logging
37
import hashlib
38
import binascii
39

    
40
from functools import wraps, partial
41
from traceback import format_exc
42

    
43
try:
44
    from astakosclient import AstakosClient
45
except ImportError:
46
    AstakosClient = None
47

    
48
from base import (DEFAULT_ACCOUNT_QUOTA, DEFAULT_CONTAINER_QUOTA,
49
                  DEFAULT_CONTAINER_VERSIONING, NotAllowedError, QuotaError,
50
                  BaseBackend, AccountExists, ContainerExists, AccountNotEmpty,
51
                  ContainerNotEmpty, ItemNotExists, VersionNotExists,
52
                  InvalidHash)
53

    
54

    
55
class DisabledAstakosClient(object):
56
    def __init__(self, *args, **kwargs):
57
        self.args = args
58
        self.kwargs = kwargs
59

    
60
    def __getattr__(self, name):
61
        m = ("AstakosClient has been disabled, "
62
             "yet an attempt to access it was made")
63
        raise AssertionError(m)
64

    
65

    
66
# Stripped-down version of the HashMap class found in tools.
67

    
68
class HashMap(list):
69

    
70
    def __init__(self, blocksize, blockhash):
71
        super(HashMap, self).__init__()
72
        self.blocksize = blocksize
73
        self.blockhash = blockhash
74

    
75
    def _hash_raw(self, v):
76
        h = hashlib.new(self.blockhash)
77
        h.update(v)
78
        return h.digest()
79

    
80
    def hash(self):
81
        if len(self) == 0:
82
            return self._hash_raw('')
83
        if len(self) == 1:
84
            return self.__getitem__(0)
85

    
86
        h = list(self)
87
        s = 2
88
        while s < len(h):
89
            s = s * 2
90
        h += [('\x00' * len(h[0]))] * (s - len(h))
91
        while len(h) > 1:
92
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
93
        return h[0]
94

    
95
# Default modules and settings.
96
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
97
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
98
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
99
DEFAULT_BLOCK_PATH = 'data/'
100
DEFAULT_BLOCK_UMASK = 0o022
101
DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
102
DEFAULT_HASH_ALGORITHM = 'sha256'
103
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
104
DEFAULT_BLOCK_PARAMS = {'mappool': None, 'blockpool': None}
105
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
106
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
107
DEFAULT_PUBLIC_URL_ALPHABET = ('0123456789'
108
                               'abcdefghijklmnopqrstuvwxyz'
109
                               'ABCDEFGHIJKLMNOPQRSTUVWXYZ')
110
DEFAULT_PUBLIC_URL_SECURITY = 16
111

    
112
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
113
QUEUE_CLIENT_ID = 'pithos'
114
QUEUE_INSTANCE_ID = '1'
115

    
116
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
117

    
118
inf = float('inf')
119

    
120
ULTIMATE_ANSWER = 42
121

    
122
DEFAULT_SOURCE = 'system'
123

    
124
logger = logging.getLogger(__name__)
125

    
126

    
127
def debug_method(func):
128
    @wraps(func)
129
    def wrapper(self, *args, **kw):
130
        try:
131
            result = func(self, *args, **kw)
132
            return result
133
        except:
134
            result = format_exc()
135
            raise
136
        finally:
137
            all_args = map(repr, args)
138
            map(all_args.append, ('%s=%s' % (k, v) for k, v in kw.iteritems()))
139
            logger.debug(">>> %s(%s) <<< %s" % (
140
                func.__name__, ', '.join(all_args).rstrip(', '), result))
141
    return wrapper
142

    
143

    
144
class ModularBackend(BaseBackend):
145
    """A modular backend.
146

147
    Uses modules for SQL functions and storage.
148
    """
149

    
150
    def __init__(self, db_module=None, db_connection=None,
151
                 block_module=None, block_path=None, block_umask=None,
152
                 block_size=None, hash_algorithm=None,
153
                 queue_module=None, queue_hosts=None, queue_exchange=None,
154
                 astakos_url=None, service_token=None,
155
                 astakosclient_poolsize=None,
156
                 free_versioning=True, block_params=None,
157
                 public_url_security=None,
158
                 public_url_alphabet=None,
159
                 account_quota_policy=None,
160
                 container_quota_policy=None,
161
                 container_versioning_policy=None):
162
        db_module = db_module or DEFAULT_DB_MODULE
163
        db_connection = db_connection or DEFAULT_DB_CONNECTION
164
        block_module = block_module or DEFAULT_BLOCK_MODULE
165
        block_path = block_path or DEFAULT_BLOCK_PATH
166
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
167
        block_params = block_params or DEFAULT_BLOCK_PARAMS
168
        block_size = block_size or DEFAULT_BLOCK_SIZE
169
        hash_algorithm = hash_algorithm or DEFAULT_HASH_ALGORITHM
170
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
171
        account_quota_policy = account_quota_policy or DEFAULT_ACCOUNT_QUOTA
172
        container_quota_policy = container_quota_policy \
173
            or DEFAULT_CONTAINER_QUOTA
174
        container_versioning_policy = container_versioning_policy \
175
            or DEFAULT_CONTAINER_VERSIONING
176

    
177
        self.default_account_policy = {'quota': account_quota_policy}
178
        self.default_container_policy = {
179
            'quota': container_quota_policy,
180
            'versioning': container_versioning_policy
181
        }
182
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
183
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
184

    
185
        self.public_url_security = (public_url_security or
186
                                    DEFAULT_PUBLIC_URL_SECURITY)
187
        self.public_url_alphabet = (public_url_alphabet or
188
                                    DEFAULT_PUBLIC_URL_ALPHABET)
189

    
190
        self.hash_algorithm = hash_algorithm
191
        self.block_size = block_size
192
        self.free_versioning = free_versioning
193

    
194
        h = hashlib.new(hash_algorithm)
195
        h.update('')
196
        self.emptyhash = h.hexdigest()
197

    
198
        def load_module(m):
199
            __import__(m)
200
            return sys.modules[m]
201

    
202
        self.db_module = load_module(db_module)
203
        self.wrapper = self.db_module.DBWrapper(db_connection)
204
        params = {'wrapper': self.wrapper}
205
        self.permissions = self.db_module.Permissions(**params)
206
        self.config = self.db_module.Config(**params)
207
        self.commission_serials = self.db_module.QuotaholderSerial(**params)
208
        for x in ['READ', 'WRITE']:
209
            setattr(self, x, getattr(self.db_module, x))
210
        self.node = self.db_module.Node(**params)
211
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
212
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
213
                  'MATCH_PREFIX', 'MATCH_EXACT']:
214
            setattr(self, x, getattr(self.db_module, x))
215

    
216
        self.ALLOWED =  ['read','write']
217

    
218
        self.block_module = load_module(block_module)
219
        self.block_params = block_params
220
        params = {'path': block_path,
221
                  'block_size': self.block_size,
222
                  'hash_algorithm': self.hash_algorithm,
223
                  'umask': block_umask}
224
        params.update(self.block_params)
225
        self.store = self.block_module.Store(**params)
226

    
227
        if queue_module and queue_hosts:
228
            self.queue_module = load_module(queue_module)
229
            params = {'hosts': queue_hosts,
230
                      'exchange': queue_exchange,
231
                      'client_id': QUEUE_CLIENT_ID}
232
            self.queue = self.queue_module.Queue(**params)
233
        else:
234
            class NoQueue:
235
                def send(self, *args):
236
                    pass
237

    
238
                def close(self):
239
                    pass
240

    
241
            self.queue = NoQueue()
242

    
243
        self.astakos_url = astakos_url
244
        self.service_token = service_token
245

    
246
        if not astakos_url or not AstakosClient:
247
            self.astakosclient = DisabledAstakosClient(
248
                astakos_url,
249
                use_pool=True,
250
                pool_size=astakosclient_poolsize)
251
        else:
252
            self.astakosclient = AstakosClient(
253
                astakos_url,
254
                use_pool=True,
255
                pool_size=astakosclient_poolsize)
256

    
257
        self.serials = []
258
        self.messages = []
259

    
260
        self._move_object = partial(self._copy_object, is_move=True)
261

    
262
        self.lock_container_path = False
263

    
264
    def pre_exec(self, lock_container_path=False):
265
        self.lock_container_path = lock_container_path
266
        self.wrapper.execute()
267

    
268
    def post_exec(self, success_status=True):
269
        if success_status:
270
            # send messages produced
271
            for m in self.messages:
272
                self.queue.send(*m)
273

    
274
            # register serials
275
            if self.serials:
276
                self.commission_serials.insert_many(
277
                    self.serials)
278

    
279
                # commit to ensure that the serials are registered
280
                # even if resolve commission fails
281
                self.wrapper.commit()
282

    
283
                # start new transaction
284
                self.wrapper.execute()
285

    
286
                r = self.astakosclient.resolve_commissions(
287
                    token=self.service_token,
288
                    accept_serials=self.serials,
289
                    reject_serials=[])
290
                self.commission_serials.delete_many(
291
                    r['accepted'])
292

    
293
            self.wrapper.commit()
294
        else:
295
            if self.serials:
296
                self.astakosclient.resolve_commissions(
297
                    token=self.service_token,
298
                    accept_serials=[],
299
                    reject_serials=self.serials)
300
            self.wrapper.rollback()
301

    
302
    def close(self):
303
        self.wrapper.close()
304
        self.queue.close()
305

    
306
    @property
307
    def using_external_quotaholder(self):
308
        return not isinstance(self.astakosclient, DisabledAstakosClient)
309

    
310
    @debug_method
311
    def list_accounts(self, user, marker=None, limit=10000):
312
        """Return a list of accounts the user can access."""
313

    
314
        allowed = self._allowed_accounts(user)
315
        start, limit = self._list_limits(allowed, marker, limit)
316
        return allowed[start:start + limit]
317

    
318
    @debug_method
319
    def get_account_meta(
320
            self, user, account, domain, until=None, include_user_defined=True,
321
            external_quota=None):
322
        """Return a dictionary with the account metadata for the domain."""
323

    
324
        path, node = self._lookup_account(account, user == account)
325
        if user != account:
326
            if until or (node is None) or (account not
327
                                           in self._allowed_accounts(user)):
328
                raise NotAllowedError
329
        try:
330
            props = self._get_properties(node, until)
331
            mtime = props[self.MTIME]
332
        except NameError:
333
            props = None
334
            mtime = until
335
        count, bytes, tstamp = self._get_statistics(node, until, compute=True)
336
        tstamp = max(tstamp, mtime)
337
        if until is None:
338
            modified = tstamp
339
        else:
340
            modified = self._get_statistics(
341
                node, compute=True)[2]  # Overall last modification.
342
            modified = max(modified, mtime)
343

    
344
        if user != account:
345
            meta = {'name': account}
346
        else:
347
            meta = {}
348
            if props is not None and include_user_defined:
349
                meta.update(
350
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
351
            if until is not None:
352
                meta.update({'until_timestamp': tstamp})
353
            meta.update({'name': account, 'count': count, 'bytes': bytes})
354
            if self.using_external_quotaholder:
355
                external_quota = external_quota or {}
356
                meta['bytes'] = external_quota.get('usage', 0)
357
        meta.update({'modified': modified})
358
        return meta
359

    
360
    @debug_method
361
    def update_account_meta(self, user, account, domain, meta, replace=False):
362
        """Update the metadata associated with the account for the domain."""
363

    
364
        if user != account:
365
            raise NotAllowedError
366
        path, node = self._lookup_account(account, True)
367
        self._put_metadata(user, node, domain, meta, replace,
368
                           update_statistics_ancestors_depth=-1)
369

    
370
    @debug_method
371
    def get_account_groups(self, user, account):
372
        """Return a dictionary with the user groups defined for the account."""
373

    
374
        if user != account:
375
            if account not in self._allowed_accounts(user):
376
                raise NotAllowedError
377
            return {}
378
        self._lookup_account(account, True)
379
        return self.permissions.group_dict(account)
380

    
381
    @debug_method
382
    def update_account_groups(self, user, account, groups, replace=False):
383
        """Update the groups associated with the account."""
384

    
385
        if user != account:
386
            raise NotAllowedError
387
        self._lookup_account(account, True)
388
        self._check_groups(groups)
389
        if replace:
390
            self.permissions.group_destroy(account)
391
        for k, v in groups.iteritems():
392
            if not replace:  # If not already deleted.
393
                self.permissions.group_delete(account, k)
394
            if v:
395
                self.permissions.group_addmany(account, k, v)
396

    
397
    @debug_method
398
    def get_account_policy(self, user, account, external_quota=None):
399
        """Return a dictionary with the account policy."""
400

    
401
        if user != account:
402
            if account not in self._allowed_accounts(user):
403
                raise NotAllowedError
404
            return {}
405
        path, node = self._lookup_account(account, True)
406
        policy = self._get_policy(node, is_account_policy=True)
407
        if self.using_external_quotaholder:
408
            external_quota = external_quota or {}
409
            policy['quota'] = external_quota.get('limit', 0)
410
        return policy
411

    
412
    @debug_method
413
    def update_account_policy(self, user, account, policy, replace=False):
414
        """Update the policy associated with the account."""
415

    
416
        if user != account:
417
            raise NotAllowedError
418
        path, node = self._lookup_account(account, True)
419
        self._check_policy(policy, is_account_policy=True)
420
        self._put_policy(node, policy, replace, is_account_policy=True)
421

    
422
    @debug_method
423
    def put_account(self, user, account, policy=None):
424
        """Create a new account with the given name."""
425

    
426
        policy = policy or {}
427
        if user != account:
428
            raise NotAllowedError
429
        node = self.node.node_lookup(account)
430
        if node is not None:
431
            raise AccountExists('Account already exists')
432
        if policy:
433
            self._check_policy(policy, is_account_policy=True)
434
        node = self._put_path(user, self.ROOTNODE, account,
435
                              update_statistics_ancestors_depth=-1)
436
        self._put_policy(node, policy, True, is_account_policy=True)
437

    
438
    @debug_method
439
    def delete_account(self, user, account):
440
        """Delete the account with the given name."""
441

    
442
        if user != account:
443
            raise NotAllowedError
444
        node = self.node.node_lookup(account)
445
        if node is None:
446
            return
447
        if not self.node.node_remove(node,
448
                                     update_statistics_ancestors_depth=-1):
449
            raise AccountNotEmpty('Account is not empty')
450
        self.permissions.group_destroy(account)
451

    
452
    @debug_method
453
    def list_containers(self, user, account, marker=None, limit=10000,
454
                        shared=False, until=None, public=False):
455
        """Return a list of containers existing under an account."""
456

    
457
        if user != account:
458
            if until or account not in self._allowed_accounts(user):
459
                raise NotAllowedError
460
            allowed = self._allowed_containers(user, account)
461
            start, limit = self._list_limits(allowed, marker, limit)
462
            return allowed[start:start + limit]
463
        if shared or public:
464
            allowed = set()
465
            if shared:
466
                allowed.update([x.split('/', 2)[1] for x in
467
                               self.permissions.access_list_shared(account)])
468
            if public:
469
                allowed.update([x[0].split('/', 2)[1] for x in
470
                               self.permissions.public_list(account)])
471
            allowed = sorted(allowed)
472
            start, limit = self._list_limits(allowed, marker, limit)
473
            return allowed[start:start + limit]
474
        node = self.node.node_lookup(account)
475
        containers = [x[0] for x in self._list_object_properties(
476
            node, account, '', '/', marker, limit, False, None, [], until)]
477
        start, limit = self._list_limits(
478
            [x[0] for x in containers], marker, limit)
479
        return containers[start:start + limit]
480

    
481
    @debug_method
482
    def list_container_meta(self, user, account, container, domain,
483
                            until=None):
484
        """Return a list of the container's object meta keys for a domain."""
485

    
486
        allowed = []
487
        if user != account:
488
            if until:
489
                raise NotAllowedError
490
            allowed = self.permissions.access_list_paths(
491
                user, '/'.join((account, container)))
492
            if not allowed:
493
                raise NotAllowedError
494
        path, node = self._lookup_container(account, container)
495
        before = until if until is not None else inf
496
        allowed = self._get_formatted_paths(allowed)
497
        return self.node.latest_attribute_keys(node, domain, before,
498
                                               CLUSTER_DELETED, allowed)
499

    
500
    @debug_method
501
    def get_container_meta(self, user, account, container, domain, until=None,
502
                           include_user_defined=True):
503
        """Return a dictionary with the container metadata for the domain."""
504

    
505
        if user != account:
506
            if until or container not in self._allowed_containers(user,
507
                                                                  account):
508
                raise NotAllowedError
509
        path, node = self._lookup_container(account, container)
510
        props = self._get_properties(node, until)
511
        mtime = props[self.MTIME]
512
        count, bytes, tstamp = self._get_statistics(node, until)
513
        tstamp = max(tstamp, mtime)
514
        if until is None:
515
            modified = tstamp
516
        else:
517
            modified = self._get_statistics(
518
                node)[2]  # Overall last modification.
519
            modified = max(modified, mtime)
520

    
521
        if user != account:
522
            meta = {'name': container}
523
        else:
524
            meta = {}
525
            if include_user_defined:
526
                meta.update(
527
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
528
            if until is not None:
529
                meta.update({'until_timestamp': tstamp})
530
            meta.update({'name': container, 'count': count, 'bytes': bytes})
531
        meta.update({'modified': modified})
532
        return meta
533

    
534
    @debug_method
535
    def update_container_meta(self, user, account, container, domain, meta,
536
                              replace=False):
537
        """Update the metadata associated with the container for the domain."""
538

    
539
        if user != account:
540
            raise NotAllowedError
541
        path, node = self._lookup_container(account, container)
542
        src_version_id, dest_version_id = self._put_metadata(
543
            user, node, domain, meta, replace,
544
            update_statistics_ancestors_depth=0)
545
        if src_version_id is not None:
546
            versioning = self._get_policy(
547
                node, is_account_policy=False)['versioning']
548
            if versioning != 'auto':
549
                self.node.version_remove(src_version_id,
550
                                         update_statistics_ancestors_depth=0)
551

    
552
    @debug_method
553
    def get_container_policy(self, user, account, container):
554
        """Return a dictionary with the container policy."""
555

    
556
        if user != account:
557
            if container not in self._allowed_containers(user, account):
558
                raise NotAllowedError
559
            return {}
560
        path, node = self._lookup_container(account, container)
561
        return self._get_policy(node, is_account_policy=False)
562

    
563
    @debug_method
564
    def update_container_policy(self, user, account, container, policy,
565
                                replace=False):
566
        """Update the policy associated with the container."""
567

    
568
        if user != account:
569
            raise NotAllowedError
570
        path, node = self._lookup_container(account, container)
571
        self._check_policy(policy, is_account_policy=False)
572
        self._put_policy(node, policy, replace, is_account_policy=False)
573

    
574
    @debug_method
575
    def put_container(self, user, account, container, policy=None):
576
        """Create a new container with the given name."""
577

    
578
        policy = policy or {}
579
        if user != account:
580
            raise NotAllowedError
581
        try:
582
            path, node = self._lookup_container(account, container)
583
        except NameError:
584
            pass
585
        else:
586
            raise ContainerExists('Container already exists')
587
        if policy:
588
            self._check_policy(policy, is_account_policy=False)
589
        path = '/'.join((account, container))
590
        node = self._put_path(
591
            user, self._lookup_account(account, True)[1], path,
592
            update_statistics_ancestors_depth=-1)
593
        self._put_policy(node, policy, True, is_account_policy=False)
594

    
595
    @debug_method
596
    def delete_container(self, user, account, container, until=None, prefix='',
597
                         delimiter=None):
598
        """Delete/purge the container with the given name."""
599

    
600
        if user != account:
601
            raise NotAllowedError
602
        path, node = self._lookup_container(account, container)
603

    
604
        if until is not None:
605
            hashes, size, serials = self.node.node_purge_children(
606
                node, until, CLUSTER_HISTORY,
607
                update_statistics_ancestors_depth=0)
608
            for h in hashes:
609
                self.store.map_delete(h)
610
            self.node.node_purge_children(node, until, CLUSTER_DELETED,
611
                                          update_statistics_ancestors_depth=0)
612
            if not self.free_versioning:
613
                self._report_size_change(
614
                    user, account, -size, {
615
                        'action': 'container purge',
616
                        'path': path,
617
                        'versions': ','.join(str(i) for i in serials)
618
                    }
619
                )
620
            return
621

    
622
        if not delimiter:
623
            if self._get_statistics(node)[0] > 0:
624
                raise ContainerNotEmpty('Container is not empty')
625
            hashes, size, serials = self.node.node_purge_children(
626
                node, inf, CLUSTER_HISTORY,
627
                update_statistics_ancestors_depth=0)
628
            for h in hashes:
629
                self.store.map_delete(h)
630
            self.node.node_purge_children(node, inf, CLUSTER_DELETED,
631
                                          update_statistics_ancestors_depth=0)
632
            self.node.node_remove(node, update_statistics_ancestors_depth=0)
633
            if not self.free_versioning:
634
                self._report_size_change(
635
                    user, account, -size, {
636
                        'action': 'container purge',
637
                        'path': path,
638
                        'versions': ','.join(str(i) for i in serials)
639
                    }
640
                )
641
        else:
642
            # remove only contents
643
            src_names = self._list_objects_no_limit(
644
                user, account, container, prefix='', delimiter=None,
645
                virtual=False, domain=None, keys=[], shared=False, until=None,
646
                size_range=None, all_props=True, public=False)
647
            paths = []
648
            for t in src_names:
649
                path = '/'.join((account, container, t[0]))
650
                node = t[2]
651
                if not self._exists(node):
652
                    continue
653
                src_version_id, dest_version_id = self._put_version_duplicate(
654
                    user, node, size=0, type='', hash=None, checksum='',
655
                    cluster=CLUSTER_DELETED,
656
                    update_statistics_ancestors_depth=1)
657
                del_size = self._apply_versioning(
658
                    account, container, src_version_id,
659
                    update_statistics_ancestors_depth=1)
660
                self._report_size_change(
661
                    user, account, -del_size, {
662
                        'action': 'object delete',
663
                        'path': path,
664
                        'versions': ','.join([str(dest_version_id)])})
665
                self._report_object_change(
666
                    user, account, path, details={'action': 'object delete'})
667
                paths.append(path)
668
            self.permissions.access_clear_bulk(paths)
669

    
670
    def _list_objects(self, user, account, container, prefix, delimiter,
671
                      marker, limit, virtual, domain, keys, shared, until,
672
                      size_range, all_props, public):
673
        if user != account and until:
674
            raise NotAllowedError
675
        if shared and public:
676
            # get shared first
677
            shared_paths = self._list_object_permissions(
678
                user, account, container, prefix, shared=True, public=False)
679
            objects = set()
680
            if shared_paths:
681
                path, node = self._lookup_container(account, container)
682
                shared_paths = self._get_formatted_paths(shared_paths)
683
                objects |= set(self._list_object_properties(
684
                    node, path, prefix, delimiter, marker, limit, virtual,
685
                    domain, keys, until, size_range, shared_paths, all_props))
686

    
687
            # get public
688
            objects |= set(self._list_public_object_properties(
689
                user, account, container, prefix, all_props))
690
            objects = list(objects)
691

    
692
            objects.sort(key=lambda x: x[0])
693
            start, limit = self._list_limits(
694
                [x[0] for x in objects], marker, limit)
695
            return objects[start:start + limit]
696
        elif public:
697
            objects = self._list_public_object_properties(
698
                user, account, container, prefix, all_props)
699
            start, limit = self._list_limits(
700
                [x[0] for x in objects], marker, limit)
701
            return objects[start:start + limit]
702

    
703
        allowed = self._list_object_permissions(
704
            user, account, container, prefix, shared, public)
705
        if shared and not allowed:
706
            return []
707
        path, node = self._lookup_container(account, container)
708
        allowed = self._get_formatted_paths(allowed)
709
        objects = self._list_object_properties(
710
            node, path, prefix, delimiter, marker, limit, virtual, domain,
711
            keys, until, size_range, allowed, all_props)
712
        start, limit = self._list_limits(
713
            [x[0] for x in objects], marker, limit)
714
        return objects[start:start + limit]
715

    
716
    def _list_public_object_properties(self, user, account, container, prefix,
717
                                       all_props):
718
        public = self._list_object_permissions(
719
            user, account, container, prefix, shared=False, public=True)
720
        paths, nodes = self._lookup_objects(public)
721
        path = '/'.join((account, container))
722
        cont_prefix = path + '/'
723
        paths = [x[len(cont_prefix):] for x in paths]
724
        objects = [(p,) + props for p, props in
725
                   zip(paths, self.node.version_lookup_bulk(
726
                       nodes, all_props=all_props))]
727
        return objects
728

    
729
    def _list_objects_no_limit(self, user, account, container, prefix,
730
                               delimiter, virtual, domain, keys, shared, until,
731
                               size_range, all_props, public):
732
        objects = []
733
        while True:
734
            marker = objects[-1] if objects else None
735
            limit = 10000
736
            l = self._list_objects(
737
                user, account, container, prefix, delimiter, marker, limit,
738
                virtual, domain, keys, shared, until, size_range, all_props,
739
                public)
740
            objects.extend(l)
741
            if not l or len(l) < limit:
742
                break
743
        return objects
744

    
745
    def _list_object_permissions(self, user, account, container, prefix,
746
                                 shared, public):
747
        allowed = []
748
        path = '/'.join((account, container, prefix)).rstrip('/')
749
        if user != account:
750
            allowed = self.permissions.access_list_paths(user, path)
751
            if not allowed:
752
                raise NotAllowedError
753
        else:
754
            allowed = set()
755
            if shared:
756
                allowed.update(self.permissions.access_list_shared(path))
757
            if public:
758
                allowed.update(
759
                    [x[0] for x in self.permissions.public_list(path)])
760
            allowed = sorted(allowed)
761
            if not allowed:
762
                return []
763
        return allowed
764

    
765
    @debug_method
766
    def list_objects(self, user, account, container, prefix='', delimiter=None,
767
                     marker=None, limit=10000, virtual=True, domain=None,
768
                     keys=None, shared=False, until=None, size_range=None,
769
                     public=False):
770
        """List (object name, object version_id) under a container."""
771

    
772
        keys = keys or []
773
        return self._list_objects(
774
            user, account, container, prefix, delimiter, marker, limit,
775
            virtual, domain, keys, shared, until, size_range, False, public)
776

    
777
    @debug_method
778
    def list_object_meta(self, user, account, container, prefix='',
779
                         delimiter=None, marker=None, limit=10000,
780
                         virtual=True, domain=None, keys=None, shared=False,
781
                         until=None, size_range=None, public=False):
782
        """Return a list of metadata dicts of objects under a container."""
783

    
784
        keys = keys or []
785
        props = self._list_objects(
786
            user, account, container, prefix, delimiter, marker, limit,
787
            virtual, domain, keys, shared, until, size_range, True, public)
788
        objects = []
789
        for p in props:
790
            if len(p) == 2:
791
                objects.append({'subdir': p[0]})
792
            else:
793
                objects.append({
794
                    'name': p[0],
795
                    'bytes': p[self.SIZE + 1],
796
                    'type': p[self.TYPE + 1],
797
                    'hash': p[self.HASH + 1],
798
                    'version': p[self.SERIAL + 1],
799
                    'version_timestamp': p[self.MTIME + 1],
800
                    'modified': p[self.MTIME + 1] if until is None else None,
801
                    'modified_by': p[self.MUSER + 1],
802
                    'uuid': p[self.UUID + 1],
803
                    'checksum': p[self.CHECKSUM + 1]})
804
        return objects
805

    
806
    @debug_method
807
    def list_object_permissions(self, user, account, container, prefix=''):
808
        """Return a list of paths enforce permissions under a container."""
809

    
810
        return self._list_object_permissions(user, account, container, prefix,
811
                                             True, False)
812

    
813
    @debug_method
814
    def list_object_public(self, user, account, container, prefix=''):
815
        """Return a mapping of object paths to public ids under a container."""
816

    
817
        public = {}
818
        for path, p in self.permissions.public_list('/'.join((account,
819
                                                              container,
820
                                                              prefix))):
821
            public[path] = p
822
        return public
823

    
824
    @debug_method
825
    def get_object_meta(self, user, account, container, name, domain,
826
                        version=None, include_user_defined=True):
827
        """Return a dictionary with the object metadata for the domain."""
828

    
829
        self._can_read(user, account, container, name)
830
        path, node = self._lookup_object(account, container, name)
831
        props = self._get_version(node, version)
832
        if version is None:
833
            modified = props[self.MTIME]
834
        else:
835
            try:
836
                modified = self._get_version(
837
                    node)[self.MTIME]  # Overall last modification.
838
            except NameError:  # Object may be deleted.
839
                del_props = self.node.version_lookup(
840
                    node, inf, CLUSTER_DELETED)
841
                if del_props is None:
842
                    raise ItemNotExists('Object does not exist')
843
                modified = del_props[self.MTIME]
844

    
845
        meta = {}
846
        if include_user_defined:
847
            meta.update(
848
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
849
        meta.update({'name': name,
850
                     'bytes': props[self.SIZE],
851
                     'type': props[self.TYPE],
852
                     'hash': props[self.HASH],
853
                     'version': props[self.SERIAL],
854
                     'version_timestamp': props[self.MTIME],
855
                     'modified': modified,
856
                     'modified_by': props[self.MUSER],
857
                     'uuid': props[self.UUID],
858
                     'checksum': props[self.CHECKSUM]})
859
        return meta
860

    
861
    @debug_method
862
    def update_object_meta(self, user, account, container, name, domain, meta,
863
                           replace=False):
864
        """Update object metadata for a domain and return the new version."""
865

    
866
        self._can_write(user, account, container, name)
867

    
868
        path, node = self._lookup_object(account, container, name,
869
                                         lock_container=True)
870
        src_version_id, dest_version_id = self._put_metadata(
871
            user, node, domain, meta, replace,
872
            update_statistics_ancestors_depth=1)
873
        self._apply_versioning(account, container, src_version_id,
874
                               update_statistics_ancestors_depth=1)
875
        return dest_version_id
876
    
877
    @debug_method
878
    def get_object_permissions_bulk(self, user, account, container, names):
879
        """Return the action allowed on the object, the path
880
        from which the object gets its permissions from,
881
        along with a dictionary containing the permissions."""
882

    
883
        permissions_path = self._get_permissions_path_bulk(account,
884
                container,names)
885
        access_objects = self.permissions.access_check_bulk(permissions_path,
886
                user)
887
        #group_parents = access_objects['group_parents']
888
        nobject_permissions = {}
889
        for path in permissions_path:
890
            allowed = 1
891
            name = path.split('/')[-1]
892
            if user != account:
893
                try:
894
                    allowed = access_objects[path]
895
                except KeyError:
896
                    raise NotAllowedError
897
            access_dict, allowed = \
898
                self.permissions.access_get_for_bulk(access_objects[path])
899
            nobject_permissions[name] = (self.ALLOWED[allowed], path,
900
                                         access_dict)
901
        self._lookup_objects(permissions_path)
902
        return nobject_permissions
903

    
904

    
905
    @debug_method
906
    def get_object_permissions(self, user, account, container, name):
907
        """Return the action allowed on the object, the path
908
        from which the object gets its permissions from,
909
        along with a dictionary containing the permissions."""
910

    
911
        allowed = 'write'
912
        permissions_path = self._get_permissions_path(account, container, name)
913
        if user != account:
914
            if self.permissions.access_check(permissions_path, self.WRITE,
915
                                             user):
916
                allowed = 'write'
917
            elif self.permissions.access_check(permissions_path, self.READ,
918
                                               user):
919
                allowed = 'read'
920
            else:
921
                raise NotAllowedError
922
        self._lookup_object(account, container, name)
923
        return (allowed,
924
                permissions_path,
925
                self.permissions.access_get(permissions_path))
926

    
927
    @debug_method
928
    def update_object_permissions(self, user, account, container, name,
929
                                  permissions):
930
        """Update the permissions associated with the object."""
931

    
932
        if user != account:
933
            raise NotAllowedError
934
        path = self._lookup_object(account, container, name,
935
                                   lock_container=True)[0]
936
        self._check_permissions(path, permissions)
937
        self.permissions.access_set(path, permissions)
938
        self._report_sharing_change(user, account, path, {'members':
939
                                    self.permissions.access_members(path)})
940

    
941
    @debug_method
942
    def get_object_public(self, user, account, container, name):
943
        """Return the public id of the object if applicable."""
944

    
945
        self._can_read(user, account, container, name)
946
        path = self._lookup_object(account, container, name)[0]
947
        p = self.permissions.public_get(path)
948
        return p
949

    
950
    @debug_method
951
    def update_object_public(self, user, account, container, name, public):
952
        """Update the public status of the object."""
953

    
954
        self._can_write(user, account, container, name)
955
        path = self._lookup_object(account, container, name,
956
                                   lock_container=True)[0]
957
        if not public:
958
            self.permissions.public_unset(path)
959
        else:
960
            self.permissions.public_set(
961
                path, self.public_url_security, self.public_url_alphabet)
962

    
963
    @debug_method
964
    def get_object_hashmap(self, user, account, container, name, version=None):
965
        """Return the object's size and a list with partial hashes."""
966

    
967
        self._can_read(user, account, container, name)
968
        path, node = self._lookup_object(account, container, name)
969
        props = self._get_version(node, version)
970
        if props[self.HASH] is None:
971
            return 0, ()
972
        if props[self.HASH].startswith("archip:"):
973
            hashmap = self.store.map_get_archip(props[self.HASH],
974
                                                props[self.SIZE])
975
            return props[self.SIZE], [x for x in hashmap]
976
        else:
977
            hashmap = self.store.map_get(self._unhexlify_hash(props[self.HASH]))
978
            return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
979

    
980
    def _update_object_hash(self, user, account, container, name, size, type,
981
                            hash, checksum, domain, meta, replace_meta,
982
                            permissions, src_node=None, src_version_id=None,
983
                            is_copy=False, report_size_change=True):
984
        if permissions is not None and user != account:
985
            raise NotAllowedError
986
        self._can_write(user, account, container, name)
987
        if permissions is not None:
988
            path = '/'.join((account, container, name))
989
            self._check_permissions(path, permissions)
990

    
991
        account_path, account_node = self._lookup_account(account, True)
992
        container_path, container_node = self._lookup_container(
993
            account, container)
994

    
995
        path, node = self._put_object_node(
996
            container_path, container_node, name)
997
        pre_version_id, dest_version_id = self._put_version_duplicate(
998
            user, node, src_node=src_node, size=size, type=type, hash=hash,
999
            checksum=checksum, is_copy=is_copy,
1000
            update_statistics_ancestors_depth=1)
1001

    
1002
        # Handle meta.
1003
        if src_version_id is None:
1004
            src_version_id = pre_version_id
1005
        self._put_metadata_duplicate(
1006
            src_version_id, dest_version_id, domain, node, meta, replace_meta)
1007

    
1008
        del_size = self._apply_versioning(account, container, pre_version_id,
1009
                                          update_statistics_ancestors_depth=1)
1010
        size_delta = size - del_size
1011
        if size_delta > 0:
1012
            # Check account quota.
1013
            if not self.using_external_quotaholder:
1014
                account_quota = long(self._get_policy(
1015
                    account_node, is_account_policy=True)['quota'])
1016
                account_usage = self._get_statistics(account_node,
1017
                                                     compute=True)[1]
1018
                if (account_quota > 0 and account_usage > account_quota):
1019
                    raise QuotaError(
1020
                        'Account quota exceeded: limit: %s, usage: %s' % (
1021
                            account_quota, account_usage))
1022

    
1023
            # Check container quota.
1024
            container_quota = long(self._get_policy(
1025
                container_node, is_account_policy=False)['quota'])
1026
            container_usage = self._get_statistics(container_node)[1]
1027
            if (container_quota > 0 and container_usage > container_quota):
1028
                # This must be executed in a transaction, so the version is
1029
                # never created if it fails.
1030
                raise QuotaError(
1031
                    'Container quota exceeded: limit: %s, usage: %s' % (
1032
                        container_quota, container_usage
1033
                    )
1034
                )
1035

    
1036
        if report_size_change:
1037
            self._report_size_change(
1038
                user, account, size_delta,
1039
                {'action': 'object update', 'path': path,
1040
                 'versions': ','.join([str(dest_version_id)])})
1041
        if permissions is not None:
1042
            self.permissions.access_set(path, permissions)
1043
            self._report_sharing_change(
1044
                user, account, path,
1045
                {'members': self.permissions.access_members(path)})
1046

    
1047
        self._report_object_change(
1048
            user, account, path,
1049
            details={'version': dest_version_id, 'action': 'object update'})
1050
        return dest_version_id
1051

    
1052
    @debug_method
1053
    def register_object_hash(self, user, account, container, name, size, type,
1054
                             hash, checksum='', domain='pithos', meta=None,
1055
                             replace_meta=False, permissions=None):
1056
        """Register an object's hash without providing the object hashmap.
1057

1058
        Lock the container path, create a node pointing to the object path,
1059
        create a version pointing to the mapfile
1060
        and issue the size change in the quotaholder.
1061

1062
        :param user: the user account which performs the action
1063

1064
        :param account: the account under which the object resides
1065

1066
        :param container: the container under which the object resides
1067

1068
        :param name: the object name
1069

1070
        :param size: the object size
1071

1072
        :param type: the object mimetype
1073

1074
        :param mapfile: the mapfile pointing to the object data
1075

1076
        :param checkcum: the md5 checksum (optional)
1077

1078
        :param domain: the object domain
1079

1080
        :param meta: a dict with custom object metadata
1081

1082
        :param replace_meta: replace existing metadata or not
1083

1084
        :param permissions: a dict with the read and write object permissions
1085

1086
        :returns: the new object uuid
1087

1088
        :raises: ItemNotExists, NotAllowedError, QuotaError
1089
        """
1090

    
1091
        meta = meta or {}
1092
        dest_version_id = self._update_object_hash(
1093
            user, account, container, name, size, type, hash, checksum,
1094
            domain, meta, replace_meta, permissions)
1095
        return self.node.version_get_properties(dest_version_id, keys=('uuid',))[0]
1096

    
1097
    @debug_method
1098
    def update_object_hashmap(self, user, account, container, name, size, type,
1099
                              hashmap, checksum, domain, meta=None,
1100
                              replace_meta=False, permissions=None):
1101
        """Create/update an object's hashmap and return the new version."""
1102

    
1103

    
1104
        for h in hashmap:
1105
            if h.startswith("archip_") or h == self.emptyhash:
1106
                raise Exception('Cannot update Archipelago Volume hashmap.')
1107
        meta = meta or {}
1108
        if size == 0:  # No such thing as an empty hashmap.
1109
            hashmap = [self.put_block('')]
1110
        map = HashMap(self.block_size, self.hash_algorithm)
1111
        map.extend([self._unhexlify_hash(x) for x in hashmap])
1112
        missing = self.store.block_search(map)
1113
        if missing:
1114
            ie = IndexError()
1115
            ie.data = [binascii.hexlify(x) for x in missing]
1116
            raise ie
1117

    
1118
        hash = map.hash()
1119
        hexlified = binascii.hexlify(hash)
1120
        # _update_object_hash() locks destination path
1121
        dest_version_id = self._update_object_hash(
1122
            user, account, container, name, size, type, hexlified, checksum,
1123
            domain, meta, replace_meta, permissions)
1124
        self.store.map_put(hash, map)
1125
        return dest_version_id, hexlified
1126

    
1127
    @debug_method
1128
    def update_object_checksum(self, user, account, container, name, version,
1129
                               checksum):
1130
        """Update an object's checksum."""
1131

    
1132
        # Update objects with greater version and same hashmap
1133
        # and size (fix metadata updates).
1134
        self._can_write(user, account, container, name)
1135
        path, node = self._lookup_object(account, container, name,
1136
                                         lock_container=True)
1137
        props = self._get_version(node, version)
1138
        versions = self.node.node_get_versions(node)
1139
        for x in versions:
1140
            if (x[self.SERIAL] >= int(version) and
1141
                x[self.HASH] == props[self.HASH] and
1142
                    x[self.SIZE] == props[self.SIZE]):
1143
                self.node.version_put_property(
1144
                    x[self.SERIAL], 'checksum', checksum)
1145

    
1146
    def _copy_object(self, user, src_account, src_container, src_name,
1147
                     dest_account, dest_container, dest_name, type,
1148
                     dest_domain=None, dest_meta=None, replace_meta=False,
1149
                     permissions=None, src_version=None, is_move=False,
1150
                     delimiter=None):
1151

    
1152
        report_size_change = not is_move
1153
        dest_meta = dest_meta or {}
1154
        dest_version_ids = []
1155
        self._can_read(user, src_account, src_container, src_name)
1156

    
1157
        src_container_path = '/'.join((src_account, src_container))
1158
        dest_container_path = '/'.join((dest_account, dest_container))
1159
        # Lock container paths in alphabetical order
1160
        if src_container_path < dest_container_path:
1161
            self._lookup_container(src_account, src_container)
1162
            self._lookup_container(dest_account, dest_container)
1163
        else:
1164
            self._lookup_container(dest_account, dest_container)
1165
            self._lookup_container(src_account, src_container)
1166

    
1167
        path, node = self._lookup_object(src_account, src_container, src_name)
1168
        # TODO: Will do another fetch of the properties in duplicate version...
1169
        props = self._get_version(
1170
            node, src_version)  # Check to see if source exists.
1171
        src_version_id = props[self.SERIAL]
1172
        hash = props[self.HASH]
1173
        size = props[self.SIZE]
1174
        is_copy = not is_move and (src_account, src_container, src_name) != (
1175
            dest_account, dest_container, dest_name)  # New uuid.
1176
        dest_version_ids.append(self._update_object_hash(
1177
            user, dest_account, dest_container, dest_name, size, type, hash,
1178
            None, dest_domain, dest_meta, replace_meta, permissions,
1179
            src_node=node, src_version_id=src_version_id, is_copy=is_copy,
1180
            report_size_change=report_size_change))
1181
        if is_move and ((src_account, src_container, src_name) !=
1182
                        (dest_account, dest_container, dest_name)):
1183
            self._delete_object(user, src_account, src_container, src_name,
1184
                                report_size_change=report_size_change)
1185

    
1186
        if delimiter:
1187
            prefix = (src_name + delimiter if not
1188
                      src_name.endswith(delimiter) else src_name)
1189
            src_names = self._list_objects_no_limit(
1190
                user, src_account, src_container, prefix, delimiter=None,
1191
                virtual=False, domain=None, keys=[], shared=False, until=None,
1192
                size_range=None, all_props=True, public=False)
1193
            src_names.sort(key=lambda x: x[2])  # order by nodes
1194
            paths = [elem[0] for elem in src_names]
1195
            nodes = [elem[2] for elem in src_names]
1196
            # TODO: Will do another fetch of the properties
1197
            # in duplicate version...
1198
            props = self._get_versions(nodes)  # Check to see if source exists.
1199

    
1200
            for prop, path, node in zip(props, paths, nodes):
1201
                src_version_id = prop[self.SERIAL]
1202
                hash = prop[self.HASH]
1203
                vtype = prop[self.TYPE]
1204
                size = prop[self.SIZE]
1205
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
1206
                    delimiter) else dest_name
1207
                vdest_name = path.replace(prefix, dest_prefix, 1)
1208
                # _update_object_hash() locks destination path
1209
                dest_version_ids.append(self._update_object_hash(
1210
                    user, dest_account, dest_container, vdest_name, size,
1211
                    vtype, hash, None, dest_domain, meta={},
1212
                    replace_meta=False, permissions=None, src_node=node,
1213
                    src_version_id=src_version_id, is_copy=is_copy))
1214
                if is_move and ((src_account, src_container, src_name) !=
1215
                                (dest_account, dest_container, dest_name)):
1216
                    self._delete_object(user, src_account, src_container, path)
1217
        return (dest_version_ids[0] if len(dest_version_ids) == 1 else
1218
                dest_version_ids)
1219

    
1220
    @debug_method
1221
    def copy_object(self, user, src_account, src_container, src_name,
1222
                    dest_account, dest_container, dest_name, type, domain,
1223
                    meta=None, replace_meta=False, permissions=None,
1224
                    src_version=None, delimiter=None):
1225
        """Copy an object's data and metadata."""
1226

    
1227
        meta = meta or {}
1228
        dest_version_id = self._copy_object(
1229
            user, src_account, src_container, src_name, dest_account,
1230
            dest_container, dest_name, type, domain, meta, replace_meta,
1231
            permissions, src_version, False, delimiter)
1232
        return dest_version_id
1233

    
1234
    @debug_method
1235
    def move_object(self, user, src_account, src_container, src_name,
1236
                    dest_account, dest_container, dest_name, type, domain,
1237
                    meta=None, replace_meta=False, permissions=None,
1238
                    delimiter=None):
1239
        """Move an object's data and metadata."""
1240

    
1241
        meta = meta or {}
1242
        if user != src_account:
1243
            raise NotAllowedError
1244
        dest_version_id = self._move_object(
1245
            user, src_account, src_container, src_name, dest_account,
1246
            dest_container, dest_name, type, domain, meta, replace_meta,
1247
            permissions, None, delimiter=delimiter)
1248
        return dest_version_id
1249

    
1250
    def _delete_object(self, user, account, container, name, until=None,
1251
                       delimiter=None, report_size_change=True):
1252
        if user != account:
1253
            raise NotAllowedError
1254

    
1255
        # lookup object and lock container path also
1256
        path, node = self._lookup_object(account, container, name,
1257
                                         lock_container=True)
1258

    
1259
        if until is not None:
1260
            if node is None:
1261
                return
1262
            hashes = []
1263
            size = 0
1264
            serials = []
1265
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL,
1266
                                           update_statistics_ancestors_depth=1)
1267
            hashes += h
1268
            size += s
1269
            serials += v
1270
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY,
1271
                                           update_statistics_ancestors_depth=1)
1272
            hashes += h
1273
            if not self.free_versioning:
1274
                size += s
1275
            serials += v
1276
            for h in hashes:
1277
                self.store.map_delete(h)
1278
            self.node.node_purge(node, until, CLUSTER_DELETED,
1279
                                 update_statistics_ancestors_depth=1)
1280
            try:
1281
                self._get_version(node)
1282
            except NameError:
1283
                self.permissions.access_clear(path)
1284
            self._report_size_change(
1285
                user, account, -size, {
1286
                    'action': 'object purge',
1287
                    'path': path,
1288
                    'versions': ','.join(str(i) for i in serials)
1289
                }
1290
            )
1291
            return
1292

    
1293
        if not self._exists(node):
1294
            raise ItemNotExists('Object is deleted.')
1295

    
1296
        src_version_id, dest_version_id = self._put_version_duplicate(
1297
            user, node, size=0, type='', hash=None, checksum='',
1298
            cluster=CLUSTER_DELETED, update_statistics_ancestors_depth=1)
1299
        del_size = self._apply_versioning(account, container, src_version_id,
1300
                                          update_statistics_ancestors_depth=1)
1301
        if report_size_change:
1302
            self._report_size_change(
1303
                user, account, -del_size,
1304
                {'action': 'object delete',
1305
                 'path': path,
1306
                 'versions': ','.join([str(dest_version_id)])})
1307
        self._report_object_change(
1308
            user, account, path, details={'action': 'object delete'})
1309
        self.permissions.access_clear(path)
1310

    
1311
        if delimiter:
1312
            prefix = name + delimiter if not name.endswith(delimiter) else name
1313
            src_names = self._list_objects_no_limit(
1314
                user, account, container, prefix, delimiter=None,
1315
                virtual=False, domain=None, keys=[], shared=False, until=None,
1316
                size_range=None, all_props=True, public=False)
1317
            paths = []
1318
            for t in src_names:
1319
                path = '/'.join((account, container, t[0]))
1320
                node = t[2]
1321
                if not self._exists(node):
1322
                    continue
1323
                src_version_id, dest_version_id = self._put_version_duplicate(
1324
                    user, node, size=0, type='', hash=None, checksum='',
1325
                    cluster=CLUSTER_DELETED,
1326
                    update_statistics_ancestors_depth=1)
1327
                del_size = self._apply_versioning(
1328
                    account, container, src_version_id,
1329
                    update_statistics_ancestors_depth=1)
1330
                if report_size_change:
1331
                    self._report_size_change(
1332
                        user, account, -del_size,
1333
                        {'action': 'object delete',
1334
                         'path': path,
1335
                         'versions': ','.join([str(dest_version_id)])})
1336
                self._report_object_change(
1337
                    user, account, path, details={'action': 'object delete'})
1338
                paths.append(path)
1339
            self.permissions.access_clear_bulk(paths)
1340

    
1341
    @debug_method
1342
    def delete_object(self, user, account, container, name, until=None,
1343
                      prefix='', delimiter=None):
1344
        """Delete/purge an object."""
1345

    
1346
        self._delete_object(user, account, container, name, until, delimiter)
1347

    
1348
    @debug_method
1349
    def list_versions(self, user, account, container, name):
1350
        """Return a list of all object (version, version_timestamp) tuples."""
1351

    
1352
        self._can_read(user, account, container, name)
1353
        path, node = self._lookup_object(account, container, name)
1354
        versions = self.node.node_get_versions(node)
1355
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if
1356
                x[self.CLUSTER] != CLUSTER_DELETED]
1357

    
1358
    @debug_method
1359
    def get_uuid(self, user, uuid):
1360
        """Return the (account, container, name) for the UUID given."""
1361

    
1362
        info = self.node.latest_uuid(uuid, CLUSTER_NORMAL)
1363
        if info is None:
1364
            raise NameError
1365
        path, serial = info
1366
        account, container, name = path.split('/', 2)
1367
        self._can_read(user, account, container, name)
1368
        return (account, container, name)
1369

    
1370
    @debug_method
1371
    def get_public(self, user, public):
1372
        """Return the (account, container, name) for the public id given."""
1373

    
1374
        path = self.permissions.public_path(public)
1375
        if path is None:
1376
            raise NameError
1377
        account, container, name = path.split('/', 2)
1378
        self._can_read(user, account, container, name)
1379
        return (account, container, name)
1380

    
1381
    def get_block(self, hash):
1382
        """Return a block's data."""
1383

    
1384
        logger.debug("get_block: %s", hash)
1385
        if hash.startswith('archip_') or hash == self.emptyhash:
1386
            block = self.store.block_get_archip(hash)
1387
        else:
1388
            block = self.store.block_get(self._unhexlify_hash(hash))
1389
        if not block:
1390
            raise ItemNotExists('Block does not exist')
1391
        return block
1392

    
1393
    def put_block(self, data):
1394
        """Store a block and return the hash."""
1395

    
1396
        logger.debug("put_block: %s", len(data))
1397
        return binascii.hexlify(self.store.block_put(data))
1398

    
1399
    def update_block(self, hash, data, offset=0):
1400
        """Update a known block and return the hash."""
1401

    
1402
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1403
        if hash.startswith("archip_") or hash == self.emptyhash:
1404
            raise Exception('Cannot update Archipelago Volume block.')
1405
        if offset == 0 and len(data) == self.block_size:
1406
            return self.put_block(data)
1407
        h = self.store.block_update(self._unhexlify_hash(hash), offset, data)
1408
        return binascii.hexlify(h)
1409

    
1410
    # Path functions.
1411

    
1412
    def _generate_uuid(self):
1413
        return str(uuidlib.uuid4())
1414

    
1415
    def _put_object_node(self, path, parent, name):
1416
        path = '/'.join((path, name))
1417
        node = self.node.node_lookup(path)
1418
        if node is None:
1419
            node = self.node.node_create(parent, path)
1420
        return path, node
1421

    
1422
    def _put_path(self, user, parent, path,
1423
                  update_statistics_ancestors_depth=None):
1424
        node = self.node.node_create(parent, path)
1425
        self.node.version_create(node, None, 0, '', None, user,
1426
                                 self._generate_uuid(), '', CLUSTER_NORMAL,
1427
                                 update_statistics_ancestors_depth)
1428
        return node
1429

    
1430
    def _lookup_account(self, account, create=True):
1431
        node = self.node.node_lookup(account)
1432
        if node is None and create:
1433
            node = self._put_path(
1434
                account, self.ROOTNODE, account,
1435
                update_statistics_ancestors_depth=-1)  # User is account.
1436
        return account, node
1437

    
1438
    def _lookup_container(self, account, container):
1439
        for_update = True if self.lock_container_path else False
1440
        path = '/'.join((account, container))
1441
        node = self.node.node_lookup(path, for_update)
1442
        if node is None:
1443
            raise ItemNotExists('Container does not exist')
1444
        return path, node
1445

    
1446
    def _lookup_object(self, account, container, name, lock_container=False):
1447
        if lock_container:
1448
            self._lookup_container(account, container)
1449

    
1450
        path = '/'.join((account, container, name))
1451
        node = self.node.node_lookup(path)
1452
        if node is None:
1453
            raise ItemNotExists('Object does not exist')
1454
        return path, node
1455

    
1456
    def _lookup_objects(self, paths):
1457
        nodes = self.node.node_lookup_bulk(paths)
1458
        return paths, nodes
1459

    
1460
    def _get_properties(self, node, until=None):
1461
        """Return properties until the timestamp given."""
1462

    
1463
        before = until if until is not None else inf
1464
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1465
        if props is None and until is not None:
1466
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1467
        if props is None:
1468
            raise ItemNotExists('Path does not exist')
1469
        return props
1470

    
1471
    def _get_statistics(self, node, until=None, compute=False):
1472
        """Return (count, sum of size, timestamp) of everything under node."""
1473

    
1474
        if until is not None:
1475
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1476
        elif compute:
1477
            stats = self.node.statistics_latest(node,
1478
                                                except_cluster=CLUSTER_DELETED)
1479
        else:
1480
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1481
        if stats is None:
1482
            stats = (0, 0, 0)
1483
        return stats
1484

    
1485
    def _get_version(self, node, version=None):
1486
        if version is None:
1487
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1488
            if props is None:
1489
                raise ItemNotExists('Object does not exist')
1490
        else:
1491
            try:
1492
                version = int(version)
1493
            except ValueError:
1494
                raise VersionNotExists('Version does not exist')
1495
            props = self.node.version_get_properties(version, node=node)
1496
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1497
                raise VersionNotExists('Version does not exist')
1498
        return props
1499

    
1500
    def _get_versions(self, nodes):
1501
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1502

    
1503
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1504
                               type=None, hash=None, checksum=None,
1505
                               cluster=CLUSTER_NORMAL, is_copy=False,
1506
                               update_statistics_ancestors_depth=None):
1507
        """Create a new version of the node."""
1508

    
1509
        props = self.node.version_lookup(
1510
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1511
        if props is not None:
1512
            src_version_id = props[self.SERIAL]
1513
            src_hash = props[self.HASH]
1514
            src_size = props[self.SIZE]
1515
            src_type = props[self.TYPE]
1516
            src_checksum = props[self.CHECKSUM]
1517
        else:
1518
            src_version_id = None
1519
            src_hash = None
1520
            src_size = 0
1521
            src_type = ''
1522
            src_checksum = ''
1523
        if size is None:  # Set metadata.
1524
            hash = src_hash  # This way hash can be set to None
1525
                             # (account or container).
1526
            size = src_size
1527
        if type is None:
1528
            type = src_type
1529
        if checksum is None:
1530
            checksum = src_checksum
1531
        uuid = self._generate_uuid(
1532
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1533

    
1534
        if src_node is None:
1535
            pre_version_id = src_version_id
1536
        else:
1537
            pre_version_id = None
1538
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1539
            if props is not None:
1540
                pre_version_id = props[self.SERIAL]
1541
        if pre_version_id is not None:
1542
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY,
1543
                                        update_statistics_ancestors_depth)
1544

    
1545
        dest_version_id, mtime = self.node.version_create(
1546
            node, hash, size, type, src_version_id, user, uuid, checksum,
1547
            cluster, update_statistics_ancestors_depth)
1548

    
1549
        self.node.attribute_unset_is_latest(node, dest_version_id)
1550

    
1551
        return pre_version_id, dest_version_id
1552

    
1553
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain,
1554
                                node, meta, replace=False):
1555
        if src_version_id is not None:
1556
            self.node.attribute_copy(src_version_id, dest_version_id)
1557
        if not replace:
1558
            self.node.attribute_del(dest_version_id, domain, (
1559
                k for k, v in meta.iteritems() if v == ''))
1560
            self.node.attribute_set(dest_version_id, domain, node, (
1561
                (k, v) for k, v in meta.iteritems() if v != ''))
1562
        else:
1563
            self.node.attribute_del(dest_version_id, domain)
1564
            self.node.attribute_set(dest_version_id, domain, node, ((
1565
                k, v) for k, v in meta.iteritems()))
1566

    
1567
    def _put_metadata(self, user, node, domain, meta, replace=False,
1568
                      update_statistics_ancestors_depth=None):
1569
        """Create a new version and store metadata."""
1570

    
1571
        src_version_id, dest_version_id = self._put_version_duplicate(
1572
            user, node,
1573
            update_statistics_ancestors_depth=
1574
            update_statistics_ancestors_depth)
1575
        self._put_metadata_duplicate(
1576
            src_version_id, dest_version_id, domain, node, meta, replace)
1577
        return src_version_id, dest_version_id
1578

    
1579
    def _list_limits(self, listing, marker, limit):
1580
        start = 0
1581
        if marker:
1582
            try:
1583
                start = listing.index(marker) + 1
1584
            except ValueError:
1585
                pass
1586
        if not limit or limit > 10000:
1587
            limit = 10000
1588
        return start, limit
1589

    
1590
    def _list_object_properties(self, parent, path, prefix='', delimiter=None,
1591
                                marker=None, limit=10000, virtual=True,
1592
                                domain=None, keys=None, until=None,
1593
                                size_range=None, allowed=None,
1594
                                all_props=False):
1595
        keys = keys or []
1596
        allowed = allowed or []
1597
        cont_prefix = path + '/'
1598
        prefix = cont_prefix + prefix
1599
        start = cont_prefix + marker if marker else None
1600
        before = until if until is not None else inf
1601
        filterq = keys if domain else []
1602
        sizeq = size_range
1603

    
1604
        objects, prefixes = self.node.latest_version_list(
1605
            parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED,
1606
            allowed, domain, filterq, sizeq, all_props)
1607
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1608
        objects.sort(key=lambda x: x[0])
1609
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1610
        return objects
1611

    
1612
    # Reporting functions.
1613

    
1614
    @debug_method
1615
    def _report_size_change(self, user, account, size, details=None):
1616
        details = details or {}
1617

    
1618
        if size == 0:
1619
            return
1620

    
1621
        account_node = self._lookup_account(account, True)[1]
1622
        total = self._get_statistics(account_node, compute=True)[1]
1623
        details.update({'user': user, 'total': total})
1624
        self.messages.append(
1625
            (QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1626
             account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1627

    
1628
        if not self.using_external_quotaholder:
1629
            return
1630

    
1631
        try:
1632
            name = details['path'] if 'path' in details else ''
1633
            serial = self.astakosclient.issue_one_commission(
1634
                token=self.service_token,
1635
                holder=account,
1636
                source=DEFAULT_SOURCE,
1637
                provisions={'pithos.diskspace': size},
1638
                name=name)
1639
        except BaseException, e:
1640
            raise QuotaError(e)
1641
        else:
1642
            self.serials.append(serial)
1643

    
1644
    @debug_method
1645
    def _report_object_change(self, user, account, path, details=None):
1646
        details = details or {}
1647
        details.update({'user': user})
1648
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1649
                              account, QUEUE_INSTANCE_ID, 'object', path,
1650
                              details))
1651

    
1652
    @debug_method
1653
    def _report_sharing_change(self, user, account, path, details=None):
1654
        details = details or {}
1655
        details.update({'user': user})
1656
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1657
                              account, QUEUE_INSTANCE_ID, 'sharing', path,
1658
                              details))
1659

    
1660
    # Policy functions.
1661

    
1662
    def _check_policy(self, policy, is_account_policy=True):
1663
        default_policy = self.default_account_policy \
1664
            if is_account_policy else self.default_container_policy
1665
        for k in policy.keys():
1666
            if policy[k] == '':
1667
                policy[k] = default_policy.get(k)
1668
        for k, v in policy.iteritems():
1669
            if k == 'quota':
1670
                q = int(v)  # May raise ValueError.
1671
                if q < 0:
1672
                    raise ValueError
1673
            elif k == 'versioning':
1674
                if v not in ['auto', 'none']:
1675
                    raise ValueError
1676
            else:
1677
                raise ValueError
1678

    
1679
    def _put_policy(self, node, policy, replace, is_account_policy=True):
1680
        default_policy = self.default_account_policy \
1681
            if is_account_policy else self.default_container_policy
1682
        if replace:
1683
            for k, v in default_policy.iteritems():
1684
                if k not in policy:
1685
                    policy[k] = v
1686
        self.node.policy_set(node, policy)
1687

    
1688
    def _get_policy(self, node, is_account_policy=True):
1689
        default_policy = self.default_account_policy \
1690
            if is_account_policy else self.default_container_policy
1691
        policy = default_policy.copy()
1692
        policy.update(self.node.policy_get(node))
1693
        return policy
1694

    
1695
    def _apply_versioning(self, account, container, version_id,
1696
                          update_statistics_ancestors_depth=None):
1697
        """Delete the provided version if such is the policy.
1698
           Return size of object removed.
1699
        """
1700

    
1701
        if version_id is None:
1702
            return 0
1703
        path, node = self._lookup_container(account, container)
1704
        versioning = self._get_policy(
1705
            node, is_account_policy=False)['versioning']
1706
        if versioning != 'auto':
1707
            hash, size = self.node.version_remove(
1708
                version_id, update_statistics_ancestors_depth)
1709
            self.store.map_delete(hash)
1710
            return size
1711
        elif self.free_versioning:
1712
            return self.node.version_get_properties(
1713
                version_id, keys=('size',))[0]
1714
        return 0
1715

    
1716
    # Access control functions.
1717

    
1718
    def _check_groups(self, groups):
1719
        # raise ValueError('Bad characters in groups')
1720
        pass
1721

    
1722
    def _check_permissions(self, path, permissions):
1723
        # raise ValueError('Bad characters in permissions')
1724
        pass
1725

    
1726

    
1727
    def _get_formatted_paths(self, paths):
1728
        formatted = []
1729
        if len(paths) == 0 :
1730
            return formatted
1731
        props = self.node.get_props(paths)
1732
        if props:
1733
            for prop in props:
1734
                if prop[1].split(';', 1)[0].strip() in (
1735
                        'application/directory', 'application/folder'):
1736
                    formatted.append((prop[0].rstrip('/') + '/', self.MATCH_PREFIX))
1737
                formatted.append((prop[0], self.MATCH_EXACT))
1738
        return formatted
1739

    
1740
    def _get_permissions_path(self, account, container, name):
1741
        path = '/'.join((account, container, name))
1742
        permission_paths = self.permissions.access_inherit(path)
1743
        permission_paths.sort()
1744
        permission_paths.reverse()
1745
        for p in permission_paths:
1746
            if p == path:
1747
                return p
1748
            else:
1749
                if p.count('/') < 2:
1750
                    continue
1751
                node = self.node.node_lookup(p)
1752
                props = None
1753
                if node is not None:
1754
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1755
                if props is not None:
1756
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1757
                        return p
1758
        return None
1759

    
1760
    def _get_permissions_path_bulk(self, account, container, names):
1761
        formatted_paths = []
1762
        for name in names:
1763
            path = '/'.join((account, container, name))
1764
            formatted_paths.append(path)
1765
        permission_paths = self.permissions.access_inherit_bulk(formatted_paths)
1766
        permission_paths.sort()
1767
        permission_paths.reverse()
1768
        permission_paths_list = []
1769
        lookup_list = []
1770
        for p in permission_paths:
1771
            if p in formatted_paths:
1772
                permission_paths_list.append(p)
1773
            else:
1774
                if p.count('/') < 2:
1775
                    continue
1776
                lookup_list.append(p)
1777

    
1778
        if len(lookup_list) > 0:
1779
            props = self.node.get_props(lookup_list)
1780
            if props:
1781
                for prop in props:
1782
                    if prop[1].split(';', 1)[0].strip() in (
1783
                            'application/directory', 'application/folder'):
1784
                        permission_paths_list.append((prop[0].rstrip('/') + '/',self.MATCH_PREFIX))
1785

    
1786
        if len(permission_paths_list) > 0:
1787
            return permission_paths_list
1788

    
1789
        return None
1790

    
1791
    def _can_read(self, user, account, container, name):
1792
        if user == account:
1793
            return True
1794
        path = '/'.join((account, container, name))
1795
        if self.permissions.public_get(path) is not None:
1796
            return True
1797
        path = self._get_permissions_path(account, container, name)
1798
        if not path:
1799
            raise NotAllowedError
1800
        if (not self.permissions.access_check(path, self.READ, user) and not
1801
                self.permissions.access_check(path, self.WRITE, user)):
1802
            raise NotAllowedError
1803

    
1804
    def _can_write(self, user, account, container, name):
1805
        if user == account:
1806
            return True
1807
        path = '/'.join((account, container, name))
1808
        path = self._get_permissions_path(account, container, name)
1809
        if not path:
1810
            raise NotAllowedError
1811
        if not self.permissions.access_check(path, self.WRITE, user):
1812
            raise NotAllowedError
1813

    
1814
    def _allowed_accounts(self, user):
1815
        allow = set()
1816
        for path in self.permissions.access_list_paths(user):
1817
            allow.add(path.split('/', 1)[0])
1818
        return sorted(allow)
1819

    
1820
    def _allowed_containers(self, user, account):
1821
        allow = set()
1822
        for path in self.permissions.access_list_paths(user, account):
1823
            allow.add(path.split('/', 2)[1])
1824
        return sorted(allow)
1825

    
1826
    # Domain functions
1827

    
1828
    @debug_method
1829
    def get_domain_objects(self, domain, user=None):
1830
        allowed_paths = self.permissions.access_list_paths(
1831
            user, include_owned=user is not None, include_containers=False)
1832
        if not allowed_paths:
1833
            return []
1834
        obj_list = self.node.domain_object_list(
1835
            domain, allowed_paths, CLUSTER_NORMAL)
1836
        return [(path,
1837
                 self._build_metadata(props, user_defined_meta),
1838
                 self.permissions.access_get(path)) for
1839
                path, props, user_defined_meta in obj_list]
1840

    
1841
    # util functions
1842

    
1843
    def _build_metadata(self, props, user_defined=None,
1844
                        include_user_defined=True):
1845
        meta = {'bytes': props[self.SIZE],
1846
                'type': props[self.TYPE],
1847
                'hash': props[self.HASH],
1848
                'version': props[self.SERIAL],
1849
                'version_timestamp': props[self.MTIME],
1850
                'modified_by': props[self.MUSER],
1851
                'uuid': props[self.UUID],
1852
                'checksum': props[self.CHECKSUM]}
1853
        if include_user_defined and user_defined is not None:
1854
            meta.update(user_defined)
1855
        return meta
1856

    
1857
    def _exists(self, node):
1858
        try:
1859
            self._get_version(node)
1860
        except ItemNotExists:
1861
            return False
1862
        else:
1863
            return True
1864

    
1865
    def _unhexlify_hash(self, hash):
1866
        try:
1867
            return binascii.unhexlify(hash)
1868
        except TypeError:
1869
            raise InvalidHash(hash)