Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 7ed99da8

History | View | Annotate | Download (60.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from commissioning.clients.quotaholder import QuotaholderHTTP
43

    
44
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45
    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
46

    
47
# Stripped-down version of the HashMap class found in tools.
48

    
49

    
50
class HashMap(list):
51

    
52
    def __init__(self, blocksize, blockhash):
53
        super(HashMap, self).__init__()
54
        self.blocksize = blocksize
55
        self.blockhash = blockhash
56

    
57
    def _hash_raw(self, v):
58
        h = hashlib.new(self.blockhash)
59
        h.update(v)
60
        return h.digest()
61

    
62
    def hash(self):
63
        if len(self) == 0:
64
            return self._hash_raw('')
65
        if len(self) == 1:
66
            return self.__getitem__(0)
67

    
68
        h = list(self)
69
        s = 2
70
        while s < len(h):
71
            s = s * 2
72
        h += [('\x00' * len(h[0]))] * (s - len(h))
73
        while len(h) > 1:
74
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75
        return h[0]
76

    
77
# Default modules and settings.
78
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81
DEFAULT_BLOCK_PATH = 'data/'
82
DEFAULT_BLOCK_UMASK = 0o022
83
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
86

    
87
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88
QUEUE_CLIENT_ID = 'pithos'
89
QUEUE_INSTANCE_ID = '1'
90

    
91
(CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
92

    
93
inf = float('inf')
94

    
95
ULTIMATE_ANSWER = 42
96

    
97

    
98
logger = logging.getLogger(__name__)
99

    
100

    
101
def backend_method(func=None, autocommit=1):
102
    if func is None:
103
        def fn(func):
104
            return backend_method(func, autocommit)
105
        return fn
106

    
107
    if not autocommit:
108
        return func
109

    
110
    def fn(self, *args, **kw):
111
        self.wrapper.execute()
112
        try:
113
            self.messages = []
114
            ret = func(self, *args, **kw)
115
            for m in self.messages:
116
                self.queue.send(*m)
117
            self.wrapper.commit()
118
            return ret
119
        except:
120
            self.wrapper.rollback()
121
            raise
122
    return fn
123

    
124

    
125
class ModularBackend(BaseBackend):
126
    """A modular backend.
127

128
    Uses modules for SQL functions and storage.
129
    """
130

    
131
    def __init__(self, db_module=None, db_connection=None,
132
                 block_module=None, block_path=None, block_umask=None,
133
                 queue_module=None, queue_hosts=None,
134
                 queue_exchange=None, quotaholder_url=None):
135
        db_module = db_module or DEFAULT_DB_MODULE
136
        db_connection = db_connection or DEFAULT_DB_CONNECTION
137
        block_module = block_module or DEFAULT_BLOCK_MODULE
138
        block_path = block_path or DEFAULT_BLOCK_PATH
139
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
140
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
141
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
142
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
143
                
144
        self.hash_algorithm = 'sha256'
145
        self.block_size = 4 * 1024 * 1024  # 4MB
146

    
147
        self.default_policy = {'quota': DEFAULT_QUOTA,
148
                               'versioning': DEFAULT_VERSIONING}
149

    
150
        def load_module(m):
151
            __import__(m)
152
            return sys.modules[m]
153

    
154
        self.db_module = load_module(db_module)
155
        self.wrapper = self.db_module.DBWrapper(db_connection)
156
        params = {'wrapper': self.wrapper}
157
        self.permissions = self.db_module.Permissions(**params)
158
        self.config = self.db_module.Config(**params)
159
        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
160
        for x in ['READ', 'WRITE']:
161
            setattr(self, x, getattr(self.db_module, x))
162
        self.node = self.db_module.Node(**params)
163
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
164
            setattr(self, x, getattr(self.db_module, x))
165

    
166
        self.block_module = load_module(block_module)
167
        params = {'path': block_path,
168
                  'block_size': self.block_size,
169
                  'hash_algorithm': self.hash_algorithm,
170
                  'umask': block_umask}
171
        self.store = self.block_module.Store(**params)
172

    
173
        if queue_module and queue_hosts:
174
            self.queue_module = load_module(queue_module)
175
            params = {'hosts': queue_hosts,
176
                          'exchange': queue_exchange,
177
                      'client_id': QUEUE_CLIENT_ID}
178
            self.queue = self.queue_module.Queue(**params)
179
        else:
180
            class NoQueue:
181
                def send(self, *args):
182
                    pass
183

    
184
                def close(self):
185
                    pass
186

    
187
            self.queue = NoQueue()
188

    
189
        self.quotaholder_url = quotaholder_url
190
        self.quotaholder = QuotaholderHTTP(quotaholder_url)
191
        self.serials = []
192

    
193
    def close(self):
194
        self.wrapper.close()
195
        self.queue.close()
196

    
197
    @backend_method
198
    def list_accounts(self, user, marker=None, limit=10000):
199
        """Return a list of accounts the user can access."""
200

    
201
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
202
        allowed = self._allowed_accounts(user)
203
        start, limit = self._list_limits(allowed, marker, limit)
204
        return allowed[start:start + limit]
205

    
206
    @backend_method
207
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
208
        """Return a dictionary with the account metadata for the domain."""
209

    
210
        logger.debug(
211
            "get_account_meta: %s %s %s %s", user, account, domain, until)
212
        path, node = self._lookup_account(account, user == account)
213
        if user != account:
214
            if until or node is None or account not in self._allowed_accounts(user):
215
                raise NotAllowedError
216
        try:
217
            props = self._get_properties(node, until)
218
            mtime = props[self.MTIME]
219
        except NameError:
220
            props = None
221
            mtime = until
222
        count, bytes, tstamp = self._get_statistics(node, until)
223
        tstamp = max(tstamp, mtime)
224
        if until is None:
225
            modified = tstamp
226
        else:
227
            modified = self._get_statistics(
228
                node)[2]  # Overall last modification.
229
            modified = max(modified, mtime)
230

    
231
        if user != account:
232
            meta = {'name': account}
233
        else:
234
            meta = {}
235
            if props is not None and include_user_defined:
236
                meta.update(
237
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
238
            if until is not None:
239
                meta.update({'until_timestamp': tstamp})
240
            meta.update({'name': account, 'count': count, 'bytes': bytes})
241
        meta.update({'modified': modified})
242
        return meta
243

    
244
    @backend_method
245
    def update_account_meta(self, user, account, domain, meta, replace=False):
246
        """Update the metadata associated with the account for the domain."""
247

    
248
        logger.debug("update_account_meta: %s %s %s %s %s", user,
249
                     account, domain, meta, replace)
250
        if user != account:
251
            raise NotAllowedError
252
        path, node = self._lookup_account(account, True)
253
        self._put_metadata(user, node, domain, meta, replace)
254

    
255
    @backend_method
256
    def get_account_groups(self, user, account):
257
        """Return a dictionary with the user groups defined for this account."""
258

    
259
        logger.debug("get_account_groups: %s %s", user, account)
260
        if user != account:
261
            if account not in self._allowed_accounts(user):
262
                raise NotAllowedError
263
            return {}
264
        self._lookup_account(account, True)
265
        return self.permissions.group_dict(account)
266

    
267
    @backend_method
268
    def update_account_groups(self, user, account, groups, replace=False):
269
        """Update the groups associated with the account."""
270

    
271
        logger.debug("update_account_groups: %s %s %s %s", user,
272
                     account, groups, replace)
273
        if user != account:
274
            raise NotAllowedError
275
        self._lookup_account(account, True)
276
        self._check_groups(groups)
277
        if replace:
278
            self.permissions.group_destroy(account)
279
        for k, v in groups.iteritems():
280
            if not replace:  # If not already deleted.
281
                self.permissions.group_delete(account, k)
282
            if v:
283
                self.permissions.group_addmany(account, k, v)
284

    
285
    @backend_method
286
    def get_account_policy(self, user, account):
287
        """Return a dictionary with the account policy."""
288

    
289
        logger.debug("get_account_policy: %s %s", user, account)
290
        if user != account:
291
            if account not in self._allowed_accounts(user):
292
                raise NotAllowedError
293
            return {}
294
        path, node = self._lookup_account(account, True)
295
        return self._get_policy(node)
296

    
297
    @backend_method
298
    def update_account_policy(self, user, account, policy, replace=False):
299
        """Update the policy associated with the account."""
300

    
301
        logger.debug("update_account_policy: %s %s %s %s", user,
302
                     account, policy, replace)
303
        if user != account:
304
            raise NotAllowedError
305
        path, node = self._lookup_account(account, True)
306
        self._check_policy(policy)
307
        self._put_policy(node, policy, replace)
308

    
309
    @backend_method
310
    def put_account(self, user, account, policy={}):
311
        """Create a new account with the given name."""
312

    
313
        logger.debug("put_account: %s %s %s", user, account, policy)
314
        if user != account:
315
            raise NotAllowedError
316
        node = self.node.node_lookup(account)
317
        if node is not None:
318
            raise AccountExists('Account already exists')
319
        if policy:
320
            self._check_policy(policy)
321
        node = self._put_path(user, self.ROOTNODE, account)
322
        self._put_policy(node, policy, True)
323

    
324
    @backend_method
325
    def delete_account(self, user, account):
326
        """Delete the account with the given name."""
327

    
328
        logger.debug("delete_account: %s %s", user, account)
329
        if user != account:
330
            raise NotAllowedError
331
        node = self.node.node_lookup(account)
332
        if node is None:
333
            return
334
        if not self.node.node_remove(node):
335
            raise AccountNotEmpty('Account is not empty')
336
        self.permissions.group_destroy(account)
337

    
338
    @backend_method
339
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
340
        """Return a list of containers existing under an account."""
341

    
342
        logger.debug("list_containers: %s %s %s %s %s %s %s", user,
343
                     account, marker, limit, shared, until, public)
344
        if user != account:
345
            if until or account not in self._allowed_accounts(user):
346
                raise NotAllowedError
347
            allowed = self._allowed_containers(user, account)
348
            start, limit = self._list_limits(allowed, marker, limit)
349
            return allowed[start:start + limit]
350
        if shared or public:
351
            allowed = set()
352
            if shared:
353
                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
354
            if public:
355
                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
356
            allowed = sorted(allowed)
357
            start, limit = self._list_limits(allowed, marker, limit)
358
            return allowed[start:start + limit]
359
        node = self.node.node_lookup(account)
360
        containers = [x[0] for x in self._list_object_properties(
361
            node, account, '', '/', marker, limit, False, None, [], until)]
362
        start, limit = self._list_limits(
363
            [x[0] for x in containers], marker, limit)
364
        return containers[start:start + limit]
365

    
366
    @backend_method
367
    def list_container_meta(self, user, account, container, domain, until=None):
368
        """Return a list with all the container's object meta keys for the domain."""
369

    
370
        logger.debug("list_container_meta: %s %s %s %s %s", user,
371
                     account, container, domain, until)
372
        allowed = []
373
        if user != account:
374
            if until:
375
                raise NotAllowedError
376
            allowed = self.permissions.access_list_paths(
377
                user, '/'.join((account, container)))
378
            if not allowed:
379
                raise NotAllowedError
380
        path, node = self._lookup_container(account, container)
381
        before = until if until is not None else inf
382
        allowed = self._get_formatted_paths(allowed)
383
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
384

    
385
    @backend_method
386
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
387
        """Return a dictionary with the container metadata for the domain."""
388

    
389
        logger.debug("get_container_meta: %s %s %s %s %s", user,
390
                     account, container, domain, until)
391
        if user != account:
392
            if until or container not in self._allowed_containers(user, account):
393
                raise NotAllowedError
394
        path, node = self._lookup_container(account, container)
395
        props = self._get_properties(node, until)
396
        mtime = props[self.MTIME]
397
        count, bytes, tstamp = self._get_statistics(node, until)
398
        tstamp = max(tstamp, mtime)
399
        if until is None:
400
            modified = tstamp
401
        else:
402
            modified = self._get_statistics(
403
                node)[2]  # Overall last modification.
404
            modified = max(modified, mtime)
405

    
406
        if user != account:
407
            meta = {'name': container}
408
        else:
409
            meta = {}
410
            if include_user_defined:
411
                meta.update(
412
                    dict(self.node.attribute_get(props[self.SERIAL], domain)))
413
            if until is not None:
414
                meta.update({'until_timestamp': tstamp})
415
            meta.update({'name': container, 'count': count, 'bytes': bytes})
416
        meta.update({'modified': modified})
417
        return meta
418

    
419
    @backend_method
420
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
421
        """Update the metadata associated with the container for the domain."""
422

    
423
        logger.debug("update_container_meta: %s %s %s %s %s %s",
424
                     user, account, container, domain, meta, replace)
425
        if user != account:
426
            raise NotAllowedError
427
        path, node = self._lookup_container(account, container)
428
        src_version_id, dest_version_id = self._put_metadata(
429
            user, node, domain, meta, replace)
430
        if src_version_id is not None:
431
            versioning = self._get_policy(node)['versioning']
432
            if versioning != 'auto':
433
                self.node.version_remove(src_version_id)
434

    
435
    @backend_method
436
    def get_container_policy(self, user, account, container):
437
        """Return a dictionary with the container policy."""
438

    
439
        logger.debug(
440
            "get_container_policy: %s %s %s", user, account, container)
441
        if user != account:
442
            if container not in self._allowed_containers(user, account):
443
                raise NotAllowedError
444
            return {}
445
        path, node = self._lookup_container(account, container)
446
        return self._get_policy(node)
447

    
448
    @backend_method
449
    def update_container_policy(self, user, account, container, policy, replace=False):
450
        """Update the policy associated with the container."""
451

    
452
        logger.debug("update_container_policy: %s %s %s %s %s",
453
                     user, account, container, policy, replace)
454
        if user != account:
455
            raise NotAllowedError
456
        path, node = self._lookup_container(account, container)
457
        self._check_policy(policy)
458
        self._put_policy(node, policy, replace)
459

    
460
    @backend_method
461
    def put_container(self, user, account, container, policy={}):
462
        """Create a new container with the given name."""
463

    
464
        logger.debug(
465
            "put_container: %s %s %s %s", user, account, container, policy)
466
        if user != account:
467
            raise NotAllowedError
468
        try:
469
            path, node = self._lookup_container(account, container)
470
        except NameError:
471
            pass
472
        else:
473
            raise ContainerExists('Container already exists')
474
        if policy:
475
            self._check_policy(policy)
476
        path = '/'.join((account, container))
477
        node = self._put_path(
478
            user, self._lookup_account(account, True)[1], path)
479
        self._put_policy(node, policy, True)
480

    
481
    @backend_method
482
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
483
        """Delete/purge the container with the given name."""
484

    
485
        logger.debug("delete_container: %s %s %s %s %s %s", user,
486
                     account, container, until, prefix, delimiter)
487
        if user != account:
488
            raise NotAllowedError
489
        path, node = self._lookup_container(account, container)
490

    
491
        if until is not None:
492
            hashes, size, serials = self.node.node_purge_children(
493
                node, until, CLUSTER_HISTORY)
494
            for h in hashes:
495
                self.store.map_delete(h)
496
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
497
            self._report_size_change(user, account, -size,
498
                                     {'action':'container purge', 'path': path,
499
                                      'versions': serials})
500
            return
501

    
502
        if not delimiter:
503
            if self._get_statistics(node)[0] > 0:
504
                raise ContainerNotEmpty('Container is not empty')
505
            hashes, size, serials = self.node.node_purge_children(
506
                node, inf, CLUSTER_HISTORY)
507
            for h in hashes:
508
                self.store.map_delete(h)
509
            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
510
            self.node.node_remove(node)
511
            self._report_size_change(user, account, -size,
512
                                     {'action': 'container delete',
513
                                      'path': path,
514
                                      'versions': serials})
515
        else:
516
                # remove only contents
517
            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
518
            paths = []
519
            for t in src_names:
520
                path = '/'.join((account, container, t[0]))
521
                node = t[2]
522
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
523
                del_size = self._apply_versioning(
524
                    account, container, src_version_id)
525
                if del_size:
526
                    self._report_size_change(user, account, -del_size,
527
                                             {'action': 'object delete',
528
                                              'path': path, 'versions': [dest_version_id]})
529
                self._report_object_change(
530
                    user, account, path, details={'action': 'object delete'})
531
                paths.append(path)
532
            self.permissions.access_clear_bulk(paths)
533

    
534
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
535
        if user != account and until:
536
            raise NotAllowedError
537
        if shared and public:
538
            # get shared first
539
            shared = self._list_object_permissions(
540
                user, account, container, prefix, shared=True, public=False)
541
            objects = set()
542
            if shared:
543
                path, node = self._lookup_container(account, container)
544
                shared = self._get_formatted_paths(shared)
545
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
546

    
547
            # get public
548
            objects |= set(self._list_public_object_properties(
549
                user, account, container, prefix, all_props))
550
            objects = list(objects)
551

    
552
            objects.sort(key=lambda x: x[0])
553
            start, limit = self._list_limits(
554
                [x[0] for x in objects], marker, limit)
555
            return objects[start:start + limit]
556
        elif public:
557
            objects = self._list_public_object_properties(
558
                user, account, container, prefix, all_props)
559
            start, limit = self._list_limits(
560
                [x[0] for x in objects], marker, limit)
561
            return objects[start:start + limit]
562

    
563
        allowed = self._list_object_permissions(
564
            user, account, container, prefix, shared, public)
565
        if shared and not allowed:
566
            return []
567
        path, node = self._lookup_container(account, container)
568
        allowed = self._get_formatted_paths(allowed)
569
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
570
        start, limit = self._list_limits(
571
            [x[0] for x in objects], marker, limit)
572
        return objects[start:start + limit]
573

    
574
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
575
        public = self._list_object_permissions(
576
            user, account, container, prefix, shared=False, public=True)
577
        paths, nodes = self._lookup_objects(public)
578
        path = '/'.join((account, container))
579
        cont_prefix = path + '/'
580
        paths = [x[len(cont_prefix):] for x in paths]
581
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
582
        objects = [(path,) + props for path, props in zip(paths, props)]
583
        return objects
584

    
585
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
586
        objects = []
587
        while True:
588
            marker = objects[-1] if objects else None
589
            limit = 10000
590
            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
591
            objects.extend(l)
592
            if not l or len(l) < limit:
593
                break
594
        return objects
595

    
596
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
597
        allowed = []
598
        path = '/'.join((account, container, prefix)).rstrip('/')
599
        if user != account:
600
            allowed = self.permissions.access_list_paths(user, path)
601
            if not allowed:
602
                raise NotAllowedError
603
        else:
604
            allowed = set()
605
            if shared:
606
                allowed.update(self.permissions.access_list_shared(path))
607
            if public:
608
                allowed.update(
609
                    [x[0] for x in self.permissions.public_list(path)])
610
            allowed = sorted(allowed)
611
            if not allowed:
612
                return []
613
        return allowed
614

    
615
    @backend_method
616
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
617
        """Return a list of object (name, version_id) tuples existing under a container."""
618

    
619
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
620
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
621

    
622
    @backend_method
623
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
624
        """Return a list of object metadata dicts existing under a container."""
625

    
626
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
627
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
628
        objects = []
629
        for p in props:
630
            if len(p) == 2:
631
                objects.append({'subdir': p[0]})
632
            else:
633
                objects.append({'name': p[0],
634
                                'bytes': p[self.SIZE + 1],
635
                                'type': p[self.TYPE + 1],
636
                                'hash': p[self.HASH + 1],
637
                                'version': p[self.SERIAL + 1],
638
                                'version_timestamp': p[self.MTIME + 1],
639
                                'modified': p[self.MTIME + 1] if until is None else None,
640
                                'modified_by': p[self.MUSER + 1],
641
                                'uuid': p[self.UUID + 1],
642
                                'checksum': p[self.CHECKSUM + 1]})
643
        return objects
644

    
645
    @backend_method
646
    def list_object_permissions(self, user, account, container, prefix=''):
647
        """Return a list of paths that enforce permissions under a container."""
648

    
649
        logger.debug("list_object_permissions: %s %s %s %s", user,
650
                     account, container, prefix)
651
        return self._list_object_permissions(user, account, container, prefix, True, False)
652

    
653
    @backend_method
654
    def list_object_public(self, user, account, container, prefix=''):
655
        """Return a dict mapping paths to public ids for objects that are public under a container."""
656

    
657
        logger.debug("list_object_public: %s %s %s %s", user,
658
                     account, container, prefix)
659
        public = {}
660
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
661
            public[path] = p + ULTIMATE_ANSWER
662
        return public
663

    
664
    @backend_method
665
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
666
        """Return a dictionary with the object metadata for the domain."""
667

    
668
        logger.debug("get_object_meta: %s %s %s %s %s %s", user,
669
                     account, container, name, domain, version)
670
        self._can_read(user, account, container, name)
671
        path, node = self._lookup_object(account, container, name)
672
        props = self._get_version(node, version)
673
        if version is None:
674
            modified = props[self.MTIME]
675
        else:
676
            try:
677
                modified = self._get_version(
678
                    node)[self.MTIME]  # Overall last modification.
679
            except NameError:  # Object may be deleted.
680
                del_props = self.node.version_lookup(
681
                    node, inf, CLUSTER_DELETED)
682
                if del_props is None:
683
                    raise ItemNotExists('Object does not exist')
684
                modified = del_props[self.MTIME]
685

    
686
        meta = {}
687
        if include_user_defined:
688
            meta.update(
689
                dict(self.node.attribute_get(props[self.SERIAL], domain)))
690
        meta.update({'name': name,
691
                     'bytes': props[self.SIZE],
692
                     'type': props[self.TYPE],
693
                     'hash': props[self.HASH],
694
                     'version': props[self.SERIAL],
695
                     'version_timestamp': props[self.MTIME],
696
                     'modified': modified,
697
                     'modified_by': props[self.MUSER],
698
                     'uuid': props[self.UUID],
699
                     'checksum': props[self.CHECKSUM]})
700
        return meta
701

    
702
    @backend_method
703
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
704
        """Update the metadata associated with the object for the domain and return the new version."""
705

    
706
        logger.debug("update_object_meta: %s %s %s %s %s %s %s",
707
                     user, account, container, name, domain, meta, replace)
708
        self._can_write(user, account, container, name)
709
        path, node = self._lookup_object(account, container, name)
710
        src_version_id, dest_version_id = self._put_metadata(
711
            user, node, domain, meta, replace)
712
        self._apply_versioning(account, container, src_version_id)
713
        return dest_version_id
714

    
715
    @backend_method
716
    def get_object_permissions(self, user, account, container, name):
717
        """Return the action allowed on the object, the path
718
        from which the object gets its permissions from,
719
        along with a dictionary containing the permissions."""
720

    
721
        logger.debug("get_object_permissions: %s %s %s %s", user,
722
                     account, container, name)
723
        allowed = 'write'
724
        permissions_path = self._get_permissions_path(account, container, name)
725
        if user != account:
726
            if self.permissions.access_check(permissions_path, self.WRITE, user):
727
                allowed = 'write'
728
            elif self.permissions.access_check(permissions_path, self.READ, user):
729
                allowed = 'read'
730
            else:
731
                raise NotAllowedError
732
        self._lookup_object(account, container, name)
733
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
734

    
735
    @backend_method
736
    def update_object_permissions(self, user, account, container, name, permissions):
737
        """Update the permissions associated with the object."""
738

    
739
        logger.debug("update_object_permissions: %s %s %s %s %s",
740
                     user, account, container, name, permissions)
741
        if user != account:
742
            raise NotAllowedError
743
        path = self._lookup_object(account, container, name)[0]
744
        self._check_permissions(path, permissions)
745
        self.permissions.access_set(path, permissions)
746
        self._report_sharing_change(user, account, path, {'members':
747
                                    self.permissions.access_members(path)})
748

    
749
    @backend_method
750
    def get_object_public(self, user, account, container, name):
751
        """Return the public id of the object if applicable."""
752

    
753
        logger.debug(
754
            "get_object_public: %s %s %s %s", user, account, container, name)
755
        self._can_read(user, account, container, name)
756
        path = self._lookup_object(account, container, name)[0]
757
        p = self.permissions.public_get(path)
758
        if p is not None:
759
            p += ULTIMATE_ANSWER
760
        return p
761

    
762
    @backend_method
763
    def update_object_public(self, user, account, container, name, public):
764
        """Update the public status of the object."""
765

    
766
        logger.debug("update_object_public: %s %s %s %s %s", user,
767
                     account, container, name, public)
768
        self._can_write(user, account, container, name)
769
        path = self._lookup_object(account, container, name)[0]
770
        if not public:
771
            self.permissions.public_unset(path)
772
        else:
773
            self.permissions.public_set(path)
774

    
775
    @backend_method
776
    def get_object_hashmap(self, user, account, container, name, version=None):
777
        """Return the object's size and a list with partial hashes."""
778

    
779
        logger.debug("get_object_hashmap: %s %s %s %s %s", user,
780
                     account, container, name, version)
781
        self._can_read(user, account, container, name)
782
        path, node = self._lookup_object(account, container, name)
783
        props = self._get_version(node, version)
784
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
785
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
786

    
787
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
788
        if permissions is not None and user != account:
789
            raise NotAllowedError
790
        self._can_write(user, account, container, name)
791
        if permissions is not None:
792
            path = '/'.join((account, container, name))
793
            self._check_permissions(path, permissions)
794

    
795
        account_path, account_node = self._lookup_account(account, True)
796
        container_path, container_node = self._lookup_container(
797
            account, container)
798
        path, node = self._put_object_node(
799
            container_path, container_node, name)
800
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
801

    
802
        # Handle meta.
803
        if src_version_id is None:
804
            src_version_id = pre_version_id
805
        self._put_metadata_duplicate(
806
            src_version_id, dest_version_id, domain, meta, replace_meta)
807

    
808
        # Check quota.
809
        del_size = self._apply_versioning(account, container, pre_version_id)
810
        size_delta = size - del_size
811
        if size_delta > 0:
812
            account_quota = long(self._get_policy(account_node)['quota'])
813
            container_quota = long(self._get_policy(container_node)['quota'])
814
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
815
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
816
                # This must be executed in a transaction, so the version is never created if it fails.
817
                raise QuotaError
818
        self._report_size_change(user, account, size_delta,
819
                                 {'action': 'object update', 'path': path,
820
                                  'versions': [dest_version_id]})
821

    
822
        if permissions is not None:
823
            self.permissions.access_set(path, permissions)
824
            self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
825

    
826
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
827
        return dest_version_id
828

    
829
    @backend_method
830
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
831
        """Create/update an object with the specified size and partial hashes."""
832

    
833
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
834
                     account, container, name, size, type, hashmap, checksum)
835
        if size == 0:  # No such thing as an empty hashmap.
836
            hashmap = [self.put_block('')]
837
        map = HashMap(self.block_size, self.hash_algorithm)
838
        map.extend([binascii.unhexlify(x) for x in hashmap])
839
        missing = self.store.block_search(map)
840
        if missing:
841
            ie = IndexError()
842
            ie.data = [binascii.hexlify(x) for x in missing]
843
            raise ie
844

    
845
        hash = map.hash()
846
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
847
        self.store.map_put(hash, map)
848
        return dest_version_id
849

    
850
    @backend_method
851
    def update_object_checksum(self, user, account, container, name, version, checksum):
852
        """Update an object's checksum."""
853

    
854
        logger.debug("update_object_checksum: %s %s %s %s %s %s",
855
                     user, account, container, name, version, checksum)
856
        # Update objects with greater version and same hashmap and size (fix metadata updates).
857
        self._can_write(user, account, container, name)
858
        path, node = self._lookup_object(account, container, name)
859
        props = self._get_version(node, version)
860
        versions = self.node.node_get_versions(node)
861
        for x in versions:
862
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
863
                self.node.version_put_property(
864
                    x[self.SERIAL], 'checksum', checksum)
865

    
866
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
867
        dest_version_ids = []
868
        self._can_read(user, src_account, src_container, src_name)
869
        path, node = self._lookup_object(src_account, src_container, src_name)
870
        # TODO: Will do another fetch of the properties in duplicate version...
871
        props = self._get_version(
872
            node, src_version)  # Check to see if source exists.
873
        src_version_id = props[self.SERIAL]
874
        hash = props[self.HASH]
875
        size = props[self.SIZE]
876
        is_copy = not is_move and (src_account, src_container, src_name) != (
877
            dest_account, dest_container, dest_name)  # New uuid.
878
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
879
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
880
            self._delete_object(user, src_account, src_container, src_name)
881

    
882
        if delimiter:
883
            prefix = src_name + \
884
                delimiter if not src_name.endswith(delimiter) else src_name
885
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
886
            src_names.sort(key=lambda x: x[2])  # order by nodes
887
            paths = [elem[0] for elem in src_names]
888
            nodes = [elem[2] for elem in src_names]
889
            # TODO: Will do another fetch of the properties in duplicate version...
890
            props = self._get_versions(nodes)  # Check to see if source exists.
891

    
892
            for prop, path, node in zip(props, paths, nodes):
893
                src_version_id = prop[self.SERIAL]
894
                hash = prop[self.HASH]
895
                vtype = prop[self.TYPE]
896
                size = prop[self.SIZE]
897
                dest_prefix = dest_name + delimiter if not dest_name.endswith(
898
                    delimiter) else dest_name
899
                vdest_name = path.replace(prefix, dest_prefix, 1)
900
                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
901
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
902
                    self._delete_object(user, src_account, src_container, path)
903
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
904

    
905
    @backend_method
906
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
907
        """Copy an object's data and metadata."""
908

    
909
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
910
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
911
        return dest_version_id
912

    
913
    @backend_method
914
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
915
        """Move an object's data and metadata."""
916

    
917
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
918
        if user != src_account:
919
            raise NotAllowedError
920
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
921
        return dest_version_id
922

    
923
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
924
        if user != account:
925
            raise NotAllowedError
926

    
927
        if until is not None:
928
            path = '/'.join((account, container, name))
929
            node = self.node.node_lookup(path)
930
            if node is None:
931
                return
932
            hashes = []
933
            size = 0
934
            serials = []
935
            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
936
            hashes += h
937
            size += s
938
            serials += v
939
            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
940
            hashes += h
941
            size += s
942
            serials += v
943
            for h in hashes:
944
                self.store.map_delete(h)
945
            self.node.node_purge(node, until, CLUSTER_DELETED)
946
            try:
947
                props = self._get_version(node)
948
            except NameError:
949
                self.permissions.access_clear(path)
950
            self._report_size_change(user, account, -size,
951
                                    {'action': 'object purge', 'path': path,
952
                                     'versions': serials})
953
            return
954

    
955
        path, node = self._lookup_object(account, container, name)
956
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
957
        del_size = self._apply_versioning(account, container, src_version_id)
958
        if del_size:
959
            self._report_size_change(user, account, -del_size,
960
                                     {'action': 'object delete', 'path': path,
961
                                      'versions': [dest_version_id]})
962
        self._report_object_change(
963
            user, account, path, details={'action': 'object delete'})
964
        self.permissions.access_clear(path)
965

    
966
        if delimiter:
967
            prefix = name + delimiter if not name.endswith(delimiter) else name
968
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
969
            paths = []
970
            for t in src_names:
971
                path = '/'.join((account, container, t[0]))
972
                node = t[2]
973
                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
974
                del_size = self._apply_versioning(
975
                    account, container, src_version_id)
976
                if del_size:
977
                    self._report_size_change(user, account, -del_size,
978
                                             {'action': 'object delete',
979
                                              'path': path,
980
                                              'versions': [dest_version_id]})
981
                self._report_object_change(
982
                    user, account, path, details={'action': 'object delete'})
983
                paths.append(path)
984
            self.permissions.access_clear_bulk(paths)
985

    
986
    @backend_method
987
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
988
        """Delete/purge an object."""
989

    
990
        logger.debug("delete_object: %s %s %s %s %s %s %s", user,
991
                     account, container, name, until, prefix, delimiter)
992
        self._delete_object(user, account, container, name, until, delimiter)
993

    
994
    @backend_method
995
    def list_versions(self, user, account, container, name):
996
        """Return a list of all (version, version_timestamp) tuples for an object."""
997

    
998
        logger.debug(
999
            "list_versions: %s %s %s %s", user, account, container, name)
1000
        self._can_read(user, account, container, name)
1001
        path, node = self._lookup_object(account, container, name)
1002
        versions = self.node.node_get_versions(node)
1003
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1004

    
1005
    @backend_method
1006
    def get_uuid(self, user, uuid):
1007
        """Return the (account, container, name) for the UUID given."""
1008

    
1009
        logger.debug("get_uuid: %s %s", user, uuid)
1010
        info = self.node.latest_uuid(uuid)
1011
        if info is None:
1012
            raise NameError
1013
        path, serial = info
1014
        account, container, name = path.split('/', 2)
1015
        self._can_read(user, account, container, name)
1016
        return (account, container, name)
1017

    
1018
    @backend_method
1019
    def get_public(self, user, public):
1020
        """Return the (account, container, name) for the public id given."""
1021

    
1022
        logger.debug("get_public: %s %s", user, public)
1023
        if public is None or public < ULTIMATE_ANSWER:
1024
            raise NameError
1025
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1026
        if path is None:
1027
            raise NameError
1028
        account, container, name = path.split('/', 2)
1029
        self._can_read(user, account, container, name)
1030
        return (account, container, name)
1031

    
1032
    @backend_method(autocommit=0)
1033
    def get_block(self, hash):
1034
        """Return a block's data."""
1035

    
1036
        logger.debug("get_block: %s", hash)
1037
        block = self.store.block_get(binascii.unhexlify(hash))
1038
        if not block:
1039
            raise ItemNotExists('Block does not exist')
1040
        return block
1041

    
1042
    @backend_method(autocommit=0)
1043
    def put_block(self, data):
1044
        """Store a block and return the hash."""
1045

    
1046
        logger.debug("put_block: %s", len(data))
1047
        return binascii.hexlify(self.store.block_put(data))
1048

    
1049
    @backend_method(autocommit=0)
1050
    def update_block(self, hash, data, offset=0):
1051
        """Update a known block and return the hash."""
1052

    
1053
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
1054
        if offset == 0 and len(data) == self.block_size:
1055
            return self.put_block(data)
1056
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1057
        return binascii.hexlify(h)
1058

    
1059
    # Path functions.
1060

    
1061
    def _generate_uuid(self):
1062
        return str(uuidlib.uuid4())
1063

    
1064
    def _put_object_node(self, path, parent, name):
1065
        path = '/'.join((path, name))
1066
        node = self.node.node_lookup(path)
1067
        if node is None:
1068
            node = self.node.node_create(parent, path)
1069
        return path, node
1070

    
1071
    def _put_path(self, user, parent, path):
1072
        node = self.node.node_create(parent, path)
1073
        self.node.version_create(node, None, 0, '', None, user,
1074
                                 self._generate_uuid(), '', CLUSTER_NORMAL)
1075
        return node
1076

    
1077
    def _lookup_account(self, account, create=True):
1078
        node = self.node.node_lookup(account)
1079
        if node is None and create:
1080
            node = self._put_path(
1081
                account, self.ROOTNODE, account)  # User is account.
1082
        return account, node
1083

    
1084
    def _lookup_container(self, account, container):
1085
        path = '/'.join((account, container))
1086
        node = self.node.node_lookup(path)
1087
        if node is None:
1088
            raise ItemNotExists('Container does not exist')
1089
        return path, node
1090

    
1091
    def _lookup_object(self, account, container, name):
1092
        path = '/'.join((account, container, name))
1093
        node = self.node.node_lookup(path)
1094
        if node is None:
1095
            raise ItemNotExists('Object does not exist')
1096
        return path, node
1097

    
1098
    def _lookup_objects(self, paths):
1099
        nodes = self.node.node_lookup_bulk(paths)
1100
        return paths, nodes
1101

    
1102
    def _get_properties(self, node, until=None):
1103
        """Return properties until the timestamp given."""
1104

    
1105
        before = until if until is not None else inf
1106
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1107
        if props is None and until is not None:
1108
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1109
        if props is None:
1110
            raise ItemNotExists('Path does not exist')
1111
        return props
1112

    
1113
    def _get_statistics(self, node, until=None):
1114
        """Return count, sum of size and latest timestamp of everything under node."""
1115

    
1116
        if until is None:
1117
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1118
        else:
1119
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1120
        if stats is None:
1121
            stats = (0, 0, 0)
1122
        return stats
1123

    
1124
    def _get_version(self, node, version=None):
1125
        if version is None:
1126
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1127
            if props is None:
1128
                raise ItemNotExists('Object does not exist')
1129
        else:
1130
            try:
1131
                version = int(version)
1132
            except ValueError:
1133
                raise VersionNotExists('Version does not exist')
1134
            props = self.node.version_get_properties(version)
1135
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1136
                raise VersionNotExists('Version does not exist')
1137
        return props
1138

    
1139
    def _get_versions(self, nodes):
1140
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1141

    
1142
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1143
        """Create a new version of the node."""
1144

    
1145
        props = self.node.version_lookup(
1146
            node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1147
        if props is not None:
1148
            src_version_id = props[self.SERIAL]
1149
            src_hash = props[self.HASH]
1150
            src_size = props[self.SIZE]
1151
            src_type = props[self.TYPE]
1152
            src_checksum = props[self.CHECKSUM]
1153
        else:
1154
            src_version_id = None
1155
            src_hash = None
1156
            src_size = 0
1157
            src_type = ''
1158
            src_checksum = ''
1159
        if size is None:  # Set metadata.
1160
            hash = src_hash  # This way hash can be set to None (account or container).
1161
            size = src_size
1162
        if type is None:
1163
            type = src_type
1164
        if checksum is None:
1165
            checksum = src_checksum
1166
        uuid = self._generate_uuid(
1167
        ) if (is_copy or src_version_id is None) else props[self.UUID]
1168

    
1169
        if src_node is None:
1170
            pre_version_id = src_version_id
1171
        else:
1172
            pre_version_id = None
1173
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1174
            if props is not None:
1175
                pre_version_id = props[self.SERIAL]
1176
        if pre_version_id is not None:
1177
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1178

    
1179
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1180
        return pre_version_id, dest_version_id
1181

    
1182
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1183
        if src_version_id is not None:
1184
            self.node.attribute_copy(src_version_id, dest_version_id)
1185
        if not replace:
1186
            self.node.attribute_del(dest_version_id, domain, (
1187
                k for k, v in meta.iteritems() if v == ''))
1188
            self.node.attribute_set(dest_version_id, domain, (
1189
                (k, v) for k, v in meta.iteritems() if v != ''))
1190
        else:
1191
            self.node.attribute_del(dest_version_id, domain)
1192
            self.node.attribute_set(dest_version_id, domain, ((
1193
                k, v) for k, v in meta.iteritems()))
1194

    
1195
    def _put_metadata(self, user, node, domain, meta, replace=False):
1196
        """Create a new version and store metadata."""
1197

    
1198
        src_version_id, dest_version_id = self._put_version_duplicate(
1199
            user, node)
1200
        self._put_metadata_duplicate(
1201
            src_version_id, dest_version_id, domain, meta, replace)
1202
        return src_version_id, dest_version_id
1203

    
1204
    def _list_limits(self, listing, marker, limit):
1205
        start = 0
1206
        if marker:
1207
            try:
1208
                start = listing.index(marker) + 1
1209
            except ValueError:
1210
                pass
1211
        if not limit or limit > 10000:
1212
            limit = 10000
1213
        return start, limit
1214

    
1215
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1216
        cont_prefix = path + '/'
1217
        prefix = cont_prefix + prefix
1218
        start = cont_prefix + marker if marker else None
1219
        before = until if until is not None else inf
1220
        filterq = keys if domain else []
1221
        sizeq = size_range
1222

    
1223
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1224
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1225
        objects.sort(key=lambda x: x[0])
1226
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1227
        return objects
1228

    
1229
    # Reporting functions.
1230

    
1231
    def _report_size_change(self, user, account, size, details={}):
1232
        account_node = self._lookup_account(account, True)[1]
1233
        total = self._get_statistics(account_node)[1]
1234
        details.update({'user': user, 'total': total})
1235
        logger.debug(
1236
            "_report_size_change: %s %s %s %s", user, account, size, details)
1237
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1238
                              account, QUEUE_INSTANCE_ID, 'diskspace',
1239
                              float(size), details))
1240

    
1241
        serial = self.quotaholder.issue_commission(
1242
                context     =   {},
1243
                target      =   account,
1244
                key         =   '1',
1245
                clientkey   =   'pithos',
1246
                ownerkey    =   '',
1247
                provisions  =   ()
1248
        )
1249
        self.serials.append(serial)
1250

    
1251
    def _report_object_change(self, user, account, path, details={}):
1252
        details.update({'user': user})
1253
        logger.debug("_report_object_change: %s %s %s %s", user,
1254
                     account, path, details)
1255
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1256
                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
1257

    
1258
    def _report_sharing_change(self, user, account, path, details={}):
1259
        logger.debug("_report_permissions_change: %s %s %s %s",
1260
                     user, account, path, details)
1261
        details.update({'user': user})
1262
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1263
                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1264

    
1265
    # Policy functions.
1266

    
1267
    def _check_policy(self, policy):
1268
        for k in policy.keys():
1269
            if policy[k] == '':
1270
                policy[k] = self.default_policy.get(k)
1271
        for k, v in policy.iteritems():
1272
            if k == 'quota':
1273
                q = int(v)  # May raise ValueError.
1274
                if q < 0:
1275
                    raise ValueError
1276
            elif k == 'versioning':
1277
                if v not in ['auto', 'none']:
1278
                    raise ValueError
1279
            else:
1280
                raise ValueError
1281

    
1282
    def _put_policy(self, node, policy, replace):
1283
        if replace:
1284
            for k, v in self.default_policy.iteritems():
1285
                if k not in policy:
1286
                    policy[k] = v
1287
        self.node.policy_set(node, policy)
1288

    
1289
    def _get_policy(self, node):
1290
        policy = self.default_policy.copy()
1291
        policy.update(self.node.policy_get(node))
1292
        return policy
1293

    
1294
    def _apply_versioning(self, account, container, version_id):
1295
        """Delete the provided version if such is the policy.
1296
           Return size of object removed.
1297
        """
1298

    
1299
        if version_id is None:
1300
            return 0
1301
        path, node = self._lookup_container(account, container)
1302
        versioning = self._get_policy(node)['versioning']
1303
        if versioning != 'auto':
1304
            hash, size = self.node.version_remove(version_id)
1305
            self.store.map_delete(hash)
1306
            return size
1307
        return 0
1308

    
1309
    # Access control functions.
1310

    
1311
    def _check_groups(self, groups):
1312
        # raise ValueError('Bad characters in groups')
1313
        pass
1314

    
1315
    def _check_permissions(self, path, permissions):
1316
        # raise ValueError('Bad characters in permissions')
1317
        pass
1318

    
1319
    def _get_formatted_paths(self, paths):
1320
        formatted = []
1321
        for p in paths:
1322
            node = self.node.node_lookup(p)
1323
            if node is not None:
1324
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1325
            if props is not None:
1326
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1327
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1328
                formatted.append((p, self.MATCH_EXACT))
1329
        return formatted
1330

    
1331
    def _get_permissions_path(self, account, container, name):
1332
        path = '/'.join((account, container, name))
1333
        permission_paths = self.permissions.access_inherit(path)
1334
        permission_paths.sort()
1335
        permission_paths.reverse()
1336
        for p in permission_paths:
1337
            if p == path:
1338
                return p
1339
            else:
1340
                if p.count('/') < 2:
1341
                    continue
1342
                node = self.node.node_lookup(p)
1343
                if node is not None:
1344
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1345
                if props is not None:
1346
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1347
                        return p
1348
        return None
1349

    
1350
    def _can_read(self, user, account, container, name):
1351
        if user == account:
1352
            return True
1353
        path = '/'.join((account, container, name))
1354
        if self.permissions.public_get(path) is not None:
1355
            return True
1356
        path = self._get_permissions_path(account, container, name)
1357
        if not path:
1358
            raise NotAllowedError
1359
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1360
            raise NotAllowedError
1361

    
1362
    def _can_write(self, user, account, container, name):
1363
        if user == account:
1364
            return True
1365
        path = '/'.join((account, container, name))
1366
        path = self._get_permissions_path(account, container, name)
1367
        if not path:
1368
            raise NotAllowedError
1369
        if not self.permissions.access_check(path, self.WRITE, user):
1370
            raise NotAllowedError
1371

    
1372
    def _allowed_accounts(self, user):
1373
        allow = set()
1374
        for path in self.permissions.access_list_paths(user):
1375
            allow.add(path.split('/', 1)[0])
1376
        return sorted(allow)
1377

    
1378
    def _allowed_containers(self, user, account):
1379
        allow = set()
1380
        for path in self.permissions.access_list_paths(user, account):
1381
            allow.add(path.split('/', 2)[1])
1382
        return sorted(allow)