Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 0df22aea

History | View | Annotate | Download (36.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from lib.hashfiler import Mapper, Blocker
43

    
44
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45

    
46
inf = float('inf')
47

    
48

    
49
logger = logging.getLogger(__name__)
50

    
51

    
52
class HashMap(list):
53
    
54
    def __init__(self, blocksize, blockhash):
55
        super(HashMap, self).__init__()
56
        self.blocksize = blocksize
57
        self.blockhash = blockhash
58
    
59
    def _hash_raw(self, v):
60
        h = hashlib.new(self.blockhash)
61
        h.update(v)
62
        return h.digest()
63
    
64
    def hash(self):
65
        if len(self) == 0:
66
            return self._hash_raw('')
67
        if len(self) == 1:
68
            return self.__getitem__(0)
69
        
70
        h = list(self)
71
        s = 2
72
        while s < len(h):
73
            s = s * 2
74
        h += [('\x00' * len(h[0]))] * (s - len(h))
75
        while len(h) > 1:
76
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
77
        return h[0]
78

    
79

    
80
def backend_method(func=None, autocommit=1):
81
    if func is None:
82
        def fn(func):
83
            return backend_method(func, autocommit)
84
        return fn
85

    
86
    if not autocommit:
87
        return func
88
    def fn(self, *args, **kw):
89
        self.wrapper.execute()
90
        try:
91
            ret = func(self, *args, **kw)
92
            self.wrapper.commit()
93
            return ret
94
        except:
95
            self.wrapper.rollback()
96
            raise
97
    return fn
98

    
99

    
100
class ModularBackend(BaseBackend):
101
    """A modular backend.
102
    
103
    Uses modules for SQL functions and hashfiler for storage.
104
    """
105
    
106
    def __init__(self, mod, path, db):
107
        self.hash_algorithm = 'sha256'
108
        self.block_size = 4 * 1024 * 1024 # 4MB
109
        
110
        self.default_policy = {'quota': 0, 'versioning': 'manual'}
111
        
112
        if path and not os.path.exists(path):
113
            os.makedirs(path)
114
        if not os.path.isdir(path):
115
            raise RuntimeError("Cannot open path '%s'" % (path,))
116
        
117
        __import__(mod)
118
        self.mod = sys.modules[mod]
119
        self.db = db
120
        self.wrapper = self.mod.dbwrapper.DBWrapper(db)
121
        
122
        params = {'blocksize': self.block_size,
123
                  'blockpath': os.path.join(path + '/blocks'),
124
                  'hashtype': self.hash_algorithm}
125
        self.blocker = Blocker(**params)
126
        
127
        params = {'mappath': os.path.join(path + '/maps'),
128
                  'namelen': self.blocker.hashlen}
129
        self.mapper = Mapper(**params)
130
        
131
        params = {'wrapper': self.wrapper}
132
        self.permissions = self.mod.permissions.Permissions(**params)
133
        for x in ['READ', 'WRITE']:
134
            setattr(self, x, getattr(self.mod.permissions, x))
135
        self.node = self.mod.node.Node(**params)
136
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
137
            setattr(self, x, getattr(self.mod.node, x))
138
    
139
    @backend_method
140
    def list_accounts(self, user, marker=None, limit=10000):
141
        """Return a list of accounts the user can access."""
142
        
143
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
144
        allowed = self._allowed_accounts(user)
145
        start, limit = self._list_limits(allowed, marker, limit)
146
        return allowed[start:start + limit]
147
    
148
    @backend_method
149
    def get_account_meta(self, user, account, until=None):
150
        """Return a dictionary with the account metadata."""
151
        
152
        logger.debug("get_account_meta: %s %s", account, until)
153
        path, node = self._lookup_account(account, user == account)
154
        if user != account:
155
            if until or node is None or account not in self._allowed_accounts(user):
156
                raise NotAllowedError
157
        try:
158
            props = self._get_properties(node, until)
159
            mtime = props[self.MTIME]
160
        except NameError:
161
            props = None
162
            mtime = until
163
        count, bytes, tstamp = self._get_statistics(node, until)
164
        tstamp = max(tstamp, mtime)
165
        if until is None:
166
            modified = tstamp
167
        else:
168
            modified = self._get_statistics(node)[2] # Overall last modification.
169
            modified = max(modified, mtime)
170
        
171
        if user != account:
172
            meta = {'name': account}
173
        else:
174
            meta = {}
175
            if props is not None:
176
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
177
            if until is not None:
178
                meta.update({'until_timestamp': tstamp})
179
            meta.update({'name': account, 'count': count, 'bytes': bytes})
180
        meta.update({'modified': modified})
181
        return meta
182
    
183
    @backend_method
184
    def update_account_meta(self, user, account, meta, replace=False):
185
        """Update the metadata associated with the account."""
186
        
187
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
188
        if user != account:
189
            raise NotAllowedError
190
        path, node = self._lookup_account(account, True)
191
        self._put_metadata(user, node, meta, replace)
192
    
193
    @backend_method
194
    def get_account_groups(self, user, account):
195
        """Return a dictionary with the user groups defined for this account."""
196
        
197
        logger.debug("get_account_groups: %s", account)
198
        if user != account:
199
            if account not in self._allowed_accounts(user):
200
                raise NotAllowedError
201
            return {}
202
        self._lookup_account(account, True)
203
        return self.permissions.group_dict(account)
204
    
205
    @backend_method
206
    def update_account_groups(self, user, account, groups, replace=False):
207
        """Update the groups associated with the account."""
208
        
209
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
210
        if user != account:
211
            raise NotAllowedError
212
        self._lookup_account(account, True)
213
        self._check_groups(groups)
214
        if replace:
215
            self.permissions.group_destroy(account)
216
        for k, v in groups.iteritems():
217
            if not replace: # If not already deleted.
218
                self.permissions.group_delete(account, k)
219
            if v:
220
                self.permissions.group_addmany(account, k, v)
221
    
222
    @backend_method
223
    def get_account_policy(self, user, account):
224
        """Return a dictionary with the account policy."""
225
        
226
        logger.debug("get_account_policy: %s", account)
227
        if user != account:
228
            if account not in self._allowed_accounts(user):
229
                raise NotAllowedError
230
            return {}
231
        path, node = self._lookup_account(account, True)
232
        return self._get_policy(node)
233
    
234
    @backend_method
235
    def update_account_policy(self, user, account, policy, replace=False):
236
        """Update the policy associated with the account."""
237
        
238
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
239
        if user != account:
240
            raise NotAllowedError
241
        path, node = self._lookup_account(account, True)
242
        self._check_policy(policy)
243
        self._put_policy(node, policy, replace)
244
    
245
    @backend_method
246
    def put_account(self, user, account, policy=None):
247
        """Create a new account with the given name."""
248
        
249
        logger.debug("put_account: %s %s", account, policy)
250
        if user != account:
251
            raise NotAllowedError
252
        node = self.node.node_lookup(account)
253
        if node is not None:
254
            raise NameError('Account already exists')
255
        if policy:
256
            self._check_policy(policy)
257
        node = self._put_path(user, self.ROOTNODE, account)
258
        self._put_policy(node, policy, True)
259
    
260
    @backend_method
261
    def delete_account(self, user, account):
262
        """Delete the account with the given name."""
263
        
264
        logger.debug("delete_account: %s", account)
265
        if user != account:
266
            raise NotAllowedError
267
        node = self.node.node_lookup(account)
268
        if node is None:
269
            return
270
        if not self.node.node_remove(node):
271
            raise IndexError('Account is not empty')
272
        self.permissions.group_destroy(account)
273
    
274
    @backend_method
275
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
276
        """Return a list of containers existing under an account."""
277
        
278
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
279
        if user != account:
280
            if until or account not in self._allowed_accounts(user):
281
                raise NotAllowedError
282
            allowed = self._allowed_containers(user, account)
283
            start, limit = self._list_limits(allowed, marker, limit)
284
            return allowed[start:start + limit]
285
        if shared:
286
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
287
            allowed = list(set(allowed))
288
            start, limit = self._list_limits(allowed, marker, limit)
289
            return allowed[start:start + limit]
290
        node = self.node.node_lookup(account)
291
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
292
    
293
    @backend_method
294
    def get_container_meta(self, user, account, container, until=None):
295
        """Return a dictionary with the container metadata."""
296
        
297
        logger.debug("get_container_meta: %s %s %s", account, container, until)
298
        if user != account:
299
            if until or container not in self._allowed_containers(user, account):
300
                raise NotAllowedError
301
        path, node = self._lookup_container(account, container)
302
        props = self._get_properties(node, until)
303
        mtime = props[self.MTIME]
304
        count, bytes, tstamp = self._get_statistics(node, until)
305
        tstamp = max(tstamp, mtime)
306
        if until is None:
307
            modified = tstamp
308
        else:
309
            modified = self._get_statistics(node)[2] # Overall last modification.
310
            modified = max(modified, mtime)
311
        
312
        if user != account:
313
            meta = {'name': container}
314
        else:
315
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
316
            if until is not None:
317
                meta.update({'until_timestamp': tstamp})
318
            meta.update({'name': container, 'count': count, 'bytes': bytes})
319
        meta.update({'modified': modified})
320
        return meta
321
    
322
    @backend_method
323
    def update_container_meta(self, user, account, container, meta, replace=False):
324
        """Update the metadata associated with the container."""
325
        
326
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
327
        if user != account:
328
            raise NotAllowedError
329
        path, node = self._lookup_container(account, container)
330
        self._put_metadata(user, node, meta, replace)
331
    
332
    @backend_method
333
    def get_container_policy(self, user, account, container):
334
        """Return a dictionary with the container policy."""
335
        
336
        logger.debug("get_container_policy: %s %s", account, container)
337
        if user != account:
338
            if container not in self._allowed_containers(user, account):
339
                raise NotAllowedError
340
            return {}
341
        path, node = self._lookup_container(account, container)
342
        return self._get_policy(node)
343
    
344
    @backend_method
345
    def update_container_policy(self, user, account, container, policy, replace=False):
346
        """Update the policy associated with the container."""
347
        
348
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
349
        if user != account:
350
            raise NotAllowedError
351
        path, node = self._lookup_container(account, container)
352
        self._check_policy(policy)
353
        self._put_policy(node, policy, replace)
354
    
355
    @backend_method
356
    def put_container(self, user, account, container, policy=None):
357
        """Create a new container with the given name."""
358
        
359
        logger.debug("put_container: %s %s %s", account, container, policy)
360
        if user != account:
361
            raise NotAllowedError
362
        try:
363
            path, node = self._lookup_container(account, container)
364
        except NameError:
365
            pass
366
        else:
367
            raise NameError('Container already exists')
368
        if policy:
369
            self._check_policy(policy)
370
        path = '/'.join((account, container))
371
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
372
        self._put_policy(node, policy, True)
373
    
374
    @backend_method
375
    def delete_container(self, user, account, container, until=None):
376
        """Delete/purge the container with the given name."""
377
        
378
        logger.debug("delete_container: %s %s %s", account, container, until)
379
        if user != account:
380
            raise NotAllowedError
381
        path, node = self._lookup_container(account, container)
382
        
383
        if until is not None:
384
            self.node.node_purge_children(node, until, CLUSTER_HISTORY)
385
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
386
            return
387
        
388
        if self._get_statistics(node)[0] > 0:
389
            raise IndexError('Container is not empty')
390
        self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
391
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
392
        self.node.node_remove(node)
393
    
394
    @backend_method
395
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
396
        """Return a list of objects existing under a container."""
397
        
398
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
399
        allowed = []
400
        if user != account:
401
            if until:
402
                raise NotAllowedError
403
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
404
            if not allowed:
405
                raise NotAllowedError
406
        else:
407
            if shared:
408
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
409
                if not allowed:
410
                    return []
411
        path, node = self._lookup_container(account, container)
412
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
413
    
414
    @backend_method
415
    def list_object_meta(self, user, account, container, until=None):
416
        """Return a list with all the container's object meta keys."""
417
        
418
        logger.debug("list_object_meta: %s %s %s", account, container, until)
419
        allowed = []
420
        if user != account:
421
            if until:
422
                raise NotAllowedError
423
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
424
            if not allowed:
425
                raise NotAllowedError
426
        path, node = self._lookup_container(account, container)
427
        before = until if until is not None else inf
428
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
429
    
430
    @backend_method
431
    def get_object_meta(self, user, account, container, name, version=None):
432
        """Return a dictionary with the object metadata."""
433
        
434
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
435
        self._can_read(user, account, container, name)
436
        path, node = self._lookup_object(account, container, name)
437
        props = self._get_version(node, version)
438
        if version is None:
439
            modified = props[self.MTIME]
440
        else:
441
            try:
442
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
443
            except NameError: # Object may be deleted.
444
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
445
                if del_props is None:
446
                    raise NameError('Object does not exist')
447
                modified = del_props[self.MTIME]
448
        
449
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
450
        meta.update({'name': name, 'bytes': props[self.SIZE]})
451
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
452
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
453
        return meta
454
    
455
    @backend_method
456
    def update_object_meta(self, user, account, container, name, meta, replace=False):
457
        """Update the metadata associated with the object."""
458
        
459
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
460
        self._can_write(user, account, container, name)
461
        path, node = self._lookup_object(account, container, name)
462
        return self._put_metadata(user, node, meta, replace)
463
    
464
    @backend_method
465
    def get_object_permissions(self, user, account, container, name):
466
        """Return the action allowed on the object, the path
467
        from which the object gets its permissions from,
468
        along with a dictionary containing the permissions."""
469
        
470
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
471
        allowed = 'write'
472
        if user != account:
473
            path = '/'.join((account, container, name))
474
            if self.permissions.access_check(path, self.WRITE, user):
475
                allowed = 'write'
476
            elif self.permissions.access_check(path, self.READ, user):
477
                allowed = 'read'
478
            else:
479
                raise NotAllowedError
480
        path = self._lookup_object(account, container, name)[0]
481
        return (allowed,) + self.permissions.access_inherit(path)
482
    
483
    @backend_method
484
    def update_object_permissions(self, user, account, container, name, permissions):
485
        """Update the permissions associated with the object."""
486
        
487
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
488
        if user != account:
489
            raise NotAllowedError
490
        path = self._lookup_object(account, container, name)[0]
491
        self._check_permissions(path, permissions)
492
        self.permissions.access_set(path, permissions)
493
    
494
    @backend_method
495
    def get_object_public(self, user, account, container, name):
496
        """Return the public URL of the object if applicable."""
497
        
498
        logger.debug("get_object_public: %s %s %s", account, container, name)
499
        self._can_read(user, account, container, name)
500
        path = self._lookup_object(account, container, name)[0]
501
        if self.permissions.public_check(path):
502
            return '/public/' + path
503
        return None
504
    
505
    @backend_method
506
    def update_object_public(self, user, account, container, name, public):
507
        """Update the public status of the object."""
508
        
509
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
510
        self._can_write(user, account, container, name)
511
        path = self._lookup_object(account, container, name)[0]
512
        if not public:
513
            self.permissions.public_unset(path)
514
        else:
515
            self.permissions.public_set(path)
516
    
517
    @backend_method
518
    def get_object_hashmap(self, user, account, container, name, version=None):
519
        """Return the object's size and a list with partial hashes."""
520
        
521
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
522
        self._can_read(user, account, container, name)
523
        path, node = self._lookup_object(account, container, name)
524
        props = self._get_version(node, version)
525
        hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH]))
526
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
527
    
528
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
529
        if permissions is not None and user != account:
530
            raise NotAllowedError
531
        self._can_write(user, account, container, name)
532
        if permissions is not None:
533
            path = '/'.join((account, container, name))
534
            self._check_permissions(path, permissions)
535
        
536
        account_path, account_node = self._lookup_account(account, True)
537
        container_path, container_node = self._lookup_container(account, container)
538
        path, node = self._put_object_node(container_path, container_node, name)
539
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
540
        
541
        # Check quota.
542
        size_delta = size # Change with versioning.
543
        if size_delta > 0:
544
            account_quota = self._get_policy(account_node)['quota']
545
            container_quota = self._get_policy(container_node)['quota']
546
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
547
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
548
                # This must be executed in a transaction, so the version is never created if it fails.
549
                raise
550
        
551
        if not replace_meta and src_version_id is not None:
552
            self.node.attribute_copy(src_version_id, dest_version_id)
553
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
554
        if permissions is not None:
555
            self.permissions.access_set(path, permissions)
556
        return dest_version_id
557
    
558
    @backend_method
559
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
560
        """Create/update an object with the specified size and partial hashes."""
561
        
562
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
563
        map = HashMap(self.block_size, self.hash_algorithm)
564
        map.extend([binascii.unhexlify(x) for x in hashmap])
565
        missing = self.blocker.block_ping(map)
566
        if missing:
567
            ie = IndexError()
568
            ie.data = [binascii.hexlify(x) for x in missing]
569
            raise ie
570
        
571
        hash = map.hash()
572
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
573
        self.mapper.map_stor(hash, map)
574
        return dest_version_id
575
    
576
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
577
        self._can_read(user, src_account, src_container, src_name)
578
        path, node = self._lookup_object(src_account, src_container, src_name)
579
        props = self._get_version(node, src_version)
580
        src_version_id = props[self.SERIAL]
581
        hash = props[self.HASH]
582
        size = props[self.SIZE]
583
        
584
        if replace_meta:
585
            meta = dest_meta
586
        else:
587
            meta = {}
588
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
589
        if not replace_meta:
590
            self.node.attribute_copy(src_version_id, dest_version_id)
591
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
592
        return dest_version_id
593
    
594
    @backend_method
595
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
596
        """Copy an object's data and metadata."""
597
        
598
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
599
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
600
    
601
    @backend_method
602
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
603
        """Move an object's data and metadata."""
604
        
605
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
606
        if user != src_account:
607
            raise NotAllowedError
608
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
609
        self._delete_object(user, src_account, src_container, src_name)
610
        return dest_version_id
611
    
612
    def _delete_object(self, user, account, container, name, until=None):
613
        if user != account:
614
            raise NotAllowedError
615
        
616
        if until is not None:
617
            path = '/'.join((account, container, name))
618
            node = self.node.node_lookup(path)
619
            if node is None:
620
                return
621
            self.node.node_purge(node, until, CLUSTER_NORMAL)
622
            self.node.node_purge(node, until, CLUSTER_HISTORY)
623
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
624
            try:
625
                props = self._get_version(node)
626
            except NameError:
627
                pass
628
            else:
629
                self.permissions.access_clear(path)
630
            return
631
        
632
        path, node = self._lookup_object(account, container, name)
633
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
634
        self.permissions.access_clear(path)
635
    
636
    @backend_method
637
    def delete_object(self, user, account, container, name, until=None):
638
        """Delete/purge an object."""
639
        
640
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
641
        self._delete_object(user, account, container, name, until)
642
    
643
    @backend_method
644
    def list_versions(self, user, account, container, name):
645
        """Return a list of all (version, version_timestamp) tuples for an object."""
646
        
647
        logger.debug("list_versions: %s %s %s", account, container, name)
648
        self._can_read(user, account, container, name)
649
        path, node = self._lookup_object(account, container, name)
650
        versions = self.node.node_get_versions(node)
651
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
652
    
653
    @backend_method(autocommit=0)
654
    def get_block(self, hash):
655
        """Return a block's data."""
656
        
657
        logger.debug("get_block: %s", hash)
658
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
659
        if not blocks:
660
            raise NameError('Block does not exist')
661
        return blocks[0]
662
    
663
    @backend_method(autocommit=0)
664
    def put_block(self, data):
665
        """Store a block and return the hash."""
666
        
667
        logger.debug("put_block: %s", len(data))
668
        hashes, absent = self.blocker.block_stor((data,))
669
        return binascii.hexlify(hashes[0])
670
    
671
    @backend_method(autocommit=0)
672
    def update_block(self, hash, data, offset=0):
673
        """Update a known block and return the hash."""
674
        
675
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
676
        if offset == 0 and len(data) == self.block_size:
677
            return self.put_block(data)
678
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
679
        return binascii.hexlify(h)
680
    
681
    # Path functions.
682
    
683
    def _put_object_node(self, path, parent, name):
684
        path = '/'.join((path, name))
685
        node = self.node.node_lookup(path)
686
        if node is None:
687
            node = self.node.node_create(parent, path)
688
        return path, node
689
    
690
    def _put_path(self, user, parent, path):
691
        node = self.node.node_create(parent, path)
692
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
693
        return node
694
    
695
    def _lookup_account(self, account, create=True):
696
        node = self.node.node_lookup(account)
697
        if node is None and create:
698
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
699
        return account, node
700
    
701
    def _lookup_container(self, account, container):
702
        path = '/'.join((account, container))
703
        node = self.node.node_lookup(path)
704
        if node is None:
705
            raise NameError('Container does not exist')
706
        return path, node
707
    
708
    def _lookup_object(self, account, container, name):
709
        path = '/'.join((account, container, name))
710
        node = self.node.node_lookup(path)
711
        if node is None:
712
            raise NameError('Object does not exist')
713
        return path, node
714
    
715
    def _get_properties(self, node, until=None):
716
        """Return properties until the timestamp given."""
717
        
718
        before = until if until is not None else inf
719
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
720
        if props is None and until is not None:
721
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
722
        if props is None:
723
            raise NameError('Path does not exist')
724
        return props
725
    
726
    def _get_statistics(self, node, until=None):
727
        """Return count, sum of size and latest timestamp of everything under node."""
728
        
729
        if until is None:
730
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
731
        else:
732
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
733
        if stats is None:
734
            stats = (0, 0, 0)
735
        return stats
736
    
737
    def _get_version(self, node, version=None):
738
        if version is None:
739
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
740
            if props is None:
741
                raise NameError('Object does not exist')
742
        else:
743
            try:
744
                version = int(version)
745
            except ValueError:
746
                raise IndexError('Version does not exist')
747
            props = self.node.version_get_properties(version)
748
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
749
                raise IndexError('Version does not exist')
750
        return props
751
    
752
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
753
        """Create a new version of the node."""
754
        
755
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
756
        if props is not None:
757
            src_version_id = props[self.SERIAL]
758
            src_hash = props[self.HASH]
759
            src_size = props[self.SIZE]
760
        else:
761
            src_version_id = None
762
            src_hash = None
763
            src_size = 0
764
        if size is None:
765
            hash = src_hash # This way hash can be set to None.
766
            size = src_size
767
        
768
        if src_version_id is not None:
769
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
770
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
771
        return src_version_id, dest_version_id
772
    
773
    def _put_metadata(self, user, node, meta, replace=False):
774
        """Create a new version and store metadata."""
775
        
776
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
777
        
778
        # TODO: Merge with other functions that update metadata...
779
        if not replace:
780
            if src_version_id is not None:
781
                self.node.attribute_copy(src_version_id, dest_version_id)
782
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
783
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
784
        else:
785
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
786
        return dest_version_id
787
    
788
    def _list_limits(self, listing, marker, limit):
789
        start = 0
790
        if marker:
791
            try:
792
                start = listing.index(marker) + 1
793
            except ValueError:
794
                pass
795
        if not limit or limit > 10000:
796
            limit = 10000
797
        return start, limit
798
    
799
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
800
        cont_prefix = path + '/'
801
        prefix = cont_prefix + prefix
802
        start = cont_prefix + marker if marker else None
803
        before = until if until is not None else inf
804
        filterq = ','.join(keys) if keys else None
805
        
806
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
807
        objects.extend([(p, None) for p in prefixes] if virtual else [])
808
        objects.sort(key=lambda x: x[0])
809
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
810
        
811
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
812
        return objects[start:start + limit]
813
    
814
    # Policy functions.
815
    
816
    def _check_policy(self, policy):
817
        for k in policy.keys():
818
            if policy[k] == '':
819
                policy[k] = self.default_policy.get(k)
820
        for k, v in policy.iteritems():
821
            if k == 'quota':
822
                q = int(v) # May raise ValueError.
823
                if q < 0:
824
                    raise ValueError
825
            elif k == 'versioning':
826
                if v not in ['auto', 'manual', 'none']:
827
                    raise ValueError
828
            else:
829
                raise ValueError
830
    
831
    def _put_policy(self, node, policy, replace):
832
        if replace:
833
            for k, v in self.default_policy.iteritems():
834
                if k not in policy:
835
                    policy[k] = v
836
        self.node.policy_set(node, policy)
837
    
838
    def _get_policy(self, node):
839
        policy = self.default_policy.copy()
840
        policy.update(self.node.policy_get(node))
841
        return policy
842
    
843
    # Access control functions.
844
    
845
    def _check_groups(self, groups):
846
        # raise ValueError('Bad characters in groups')
847
        pass
848
    
849
    def _check_permissions(self, path, permissions):
850
        # raise ValueError('Bad characters in permissions')
851
        
852
        # Check for existing permissions.
853
        paths = self.permissions.access_list(path)
854
        if paths:
855
            ae = AttributeError()
856
            ae.data = paths
857
            raise ae
858
    
859
    def _can_read(self, user, account, container, name):
860
        if user == account:
861
            return True
862
        path = '/'.join((account, container, name))
863
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
864
            raise NotAllowedError
865
    
866
    def _can_write(self, user, account, container, name):
867
        if user == account:
868
            return True
869
        path = '/'.join((account, container, name))
870
        if not self.permissions.access_check(path, self.WRITE, user):
871
            raise NotAllowedError
872
    
873
    def _allowed_accounts(self, user):
874
        allow = set()
875
        for path in self.permissions.access_list_paths(user):
876
            allow.add(path.split('/', 1)[0])
877
        return sorted(allow)
878
    
879
    def _allowed_containers(self, user, account):
880
        allow = set()
881
        for path in self.permissions.access_list_paths(user, account):
882
            allow.add(path.split('/', 2)[1])
883
        return sorted(allow)