345691ffe41d0764342a1b438a8cf99af17b072a
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above #      copyright notice, this list of conditions and the following
12 #      disclaimer in the documentation and/or other materials
13 #      provided with the distribution.
14
15 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
16 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
19 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
22 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
23 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
25 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
27
28 # The views and conclusions contained in the software and
29 # documentation are those of the authors and should not be
30 # interpreted as representing official policies, either expressed
31 # or implied, of GRNET S.A.
32
33 import sys
34 import os
35 import time
36 import uuid as uuidlib
37 import logging
38 import binascii
39
40 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
41
42 from pithos.lib.hashmap import HashMap
43
44 # Default modules and settings.
45 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
46 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
47 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
48 DEFAULT_BLOCK_PATH = 'data/'
49
50 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
51
52 inf = float('inf')
53
54 ULTIMATE_ANSWER = 42
55
56
57 logger = logging.getLogger(__name__)
58
59
60 def backend_method(func=None, autocommit=1):
61     if func is None:
62         def fn(func):
63             return backend_method(func, autocommit)
64         return fn
65
66     if not autocommit:
67         return func
68     def fn(self, *args, **kw):
69         self.wrapper.execute()
70         try:
71             ret = func(self, *args, **kw)
72             self.wrapper.commit()
73             return ret
74         except:
75             self.wrapper.rollback()
76             raise
77     return fn
78
79
80 class ModularBackend(BaseBackend):
81     """A modular backend.
82     
83     Uses modules for SQL functions and storage.
84     """
85     
86     def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
87         db_module = db_module or DEFAULT_DB_MODULE
88         db_connection = db_connection or DEFAULT_DB_CONNECTION
89         block_module = block_module or DEFAULT_BLOCK_MODULE
90         block_path = block_path or DEFAULT_BLOCK_PATH
91         
92         self.hash_algorithm = 'sha256'
93         self.block_size = 4 * 1024 * 1024 # 4MB
94         
95         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
96         
97         __import__(db_module)
98         self.db_module = sys.modules[db_module]
99         self.wrapper = self.db_module.DBWrapper(db_connection)
100         
101         params = {'wrapper': self.wrapper}
102         self.permissions = self.db_module.Permissions(**params)
103         for x in ['READ', 'WRITE']:
104             setattr(self, x, getattr(self.db_module, x))
105         self.node = self.db_module.Node(**params)
106         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
107             setattr(self, x, getattr(self.db_module, x))
108         
109         __import__(block_module)
110         self.block_module = sys.modules[block_module]
111         
112         params = {'path': block_path,
113                   'block_size': self.block_size,
114                   'hash_algorithm': self.hash_algorithm}
115         self.store = self.block_module.Store(**params)
116     
117     def close(self):
118         self.wrapper.close()
119     
120     @backend_method
121     def list_accounts(self, user, marker=None, limit=10000):
122         """Return a list of accounts the user can access."""
123         
124         logger.debug("list_accounts: %s %s %s", user, marker, limit)
125         allowed = self._allowed_accounts(user)
126         start, limit = self._list_limits(allowed, marker, limit)
127         return allowed[start:start + limit]
128     
129     @backend_method
130     def get_account_meta(self, user, account, domain, until=None):
131         """Return a dictionary with the account metadata for the domain."""
132         
133         logger.debug("get_account_meta: %s %s %s", account, domain, until)
134         path, node = self._lookup_account(account, user == account)
135         if user != account:
136             if until or node is None or account not in self._allowed_accounts(user):
137                 raise NotAllowedError
138         try:
139             props = self._get_properties(node, until)
140             mtime = props[self.MTIME]
141         except NameError:
142             props = None
143             mtime = until
144         count, bytes, tstamp = self._get_statistics(node, until)
145         tstamp = max(tstamp, mtime)
146         if until is None:
147             modified = tstamp
148         else:
149             modified = self._get_statistics(node)[2] # Overall last modification.
150             modified = max(modified, mtime)
151         
152         if user != account:
153             meta = {'name': account}
154         else:
155             meta = {}
156             if props is not None:
157                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
158             if until is not None:
159                 meta.update({'until_timestamp': tstamp})
160             meta.update({'name': account, 'count': count, 'bytes': bytes})
161         meta.update({'modified': modified})
162         return meta
163     
164     @backend_method
165     def update_account_meta(self, user, account, domain, meta, replace=False):
166         """Update the metadata associated with the account for the domain."""
167         
168         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
169         if user != account:
170             raise NotAllowedError
171         path, node = self._lookup_account(account, True)
172         self._put_metadata(user, node, domain, meta, replace)
173     
174     @backend_method
175     def get_account_groups(self, user, account):
176         """Return a dictionary with the user groups defined for this account."""
177         
178         logger.debug("get_account_groups: %s", account)
179         if user != account:
180             if account not in self._allowed_accounts(user):
181                 raise NotAllowedError
182             return {}
183         self._lookup_account(account, True)
184         return self.permissions.group_dict(account)
185     
186     @backend_method
187     def update_account_groups(self, user, account, groups, replace=False):
188         """Update the groups associated with the account."""
189         
190         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
191         if user != account:
192             raise NotAllowedError
193         self._lookup_account(account, True)
194         self._check_groups(groups)
195         if replace:
196             self.permissions.group_destroy(account)
197         for k, v in groups.iteritems():
198             if not replace: # If not already deleted.
199                 self.permissions.group_delete(account, k)
200             if v:
201                 self.permissions.group_addmany(account, k, v)
202     
203     @backend_method
204     def get_account_policy(self, user, account):
205         """Return a dictionary with the account policy."""
206         
207         logger.debug("get_account_policy: %s", account)
208         if user != account:
209             if account not in self._allowed_accounts(user):
210                 raise NotAllowedError
211             return {}
212         path, node = self._lookup_account(account, True)
213         return self._get_policy(node)
214     
215     @backend_method
216     def update_account_policy(self, user, account, policy, replace=False):
217         """Update the policy associated with the account."""
218         
219         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
220         if user != account:
221             raise NotAllowedError
222         path, node = self._lookup_account(account, True)
223         self._check_policy(policy)
224         self._put_policy(node, policy, replace)
225     
226     @backend_method
227     def put_account(self, user, account, policy={}):
228         """Create a new account with the given name."""
229         
230         logger.debug("put_account: %s %s", account, policy)
231         if user != account:
232             raise NotAllowedError
233         node = self.node.node_lookup(account)
234         if node is not None:
235             raise NameError('Account already exists')
236         if policy:
237             self._check_policy(policy)
238         node = self._put_path(user, self.ROOTNODE, account)
239         self._put_policy(node, policy, True)
240     
241     @backend_method
242     def delete_account(self, user, account):
243         """Delete the account with the given name."""
244         
245         logger.debug("delete_account: %s", account)
246         if user != account:
247             raise NotAllowedError
248         node = self.node.node_lookup(account)
249         if node is None:
250             return
251         if not self.node.node_remove(node):
252             raise IndexError('Account is not empty')
253         self.permissions.group_destroy(account)
254     
255     @backend_method
256     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
257         """Return a list of containers existing under an account."""
258         
259         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
260         if user != account:
261             if until or account not in self._allowed_accounts(user):
262                 raise NotAllowedError
263             allowed = self._allowed_containers(user, account)
264             start, limit = self._list_limits(allowed, marker, limit)
265             return allowed[start:start + limit]
266         if shared:
267             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
268             allowed = list(set(allowed))
269             start, limit = self._list_limits(allowed, marker, limit)
270             return allowed[start:start + limit]
271         node = self.node.node_lookup(account)
272         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
273     
274     @backend_method
275     def get_container_meta(self, user, account, container, domain, until=None):
276         """Return a dictionary with the container metadata for the domain."""
277         
278         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
279         if user != account:
280             if until or container not in self._allowed_containers(user, account):
281                 raise NotAllowedError
282         path, node = self._lookup_container(account, container)
283         props = self._get_properties(node, until)
284         mtime = props[self.MTIME]
285         count, bytes, tstamp = self._get_statistics(node, until)
286         tstamp = max(tstamp, mtime)
287         if until is None:
288             modified = tstamp
289         else:
290             modified = self._get_statistics(node)[2] # Overall last modification.
291             modified = max(modified, mtime)
292         
293         if user != account:
294             meta = {'name': container}
295         else:
296             meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
297             if until is not None:
298                 meta.update({'until_timestamp': tstamp})
299             meta.update({'name': container, 'count': count, 'bytes': bytes})
300         meta.update({'modified': modified})
301         return meta
302     
303     @backend_method
304     def update_container_meta(self, user, account, container, domain, meta, replace=False):
305         """Update the metadata associated with the container for the domain."""
306         
307         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
308         if user != account:
309             raise NotAllowedError
310         path, node = self._lookup_container(account, container)
311         self._put_metadata(user, node, domain, meta, replace)
312     
313     @backend_method
314     def get_container_policy(self, user, account, container):
315         """Return a dictionary with the container policy."""
316         
317         logger.debug("get_container_policy: %s %s", account, container)
318         if user != account:
319             if container not in self._allowed_containers(user, account):
320                 raise NotAllowedError
321             return {}
322         path, node = self._lookup_container(account, container)
323         return self._get_policy(node)
324     
325     @backend_method
326     def update_container_policy(self, user, account, container, policy, replace=False):
327         """Update the policy associated with the container."""
328         
329         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
330         if user != account:
331             raise NotAllowedError
332         path, node = self._lookup_container(account, container)
333         self._check_policy(policy)
334         self._put_policy(node, policy, replace)
335     
336     @backend_method
337     def put_container(self, user, account, container, policy={}):
338         """Create a new container with the given name."""
339         
340         logger.debug("put_container: %s %s %s", account, container, policy)
341         if user != account:
342             raise NotAllowedError
343         try:
344             path, node = self._lookup_container(account, container)
345         except NameError:
346             pass
347         else:
348             raise NameError('Container already exists')
349         if policy:
350             self._check_policy(policy)
351         path = '/'.join((account, container))
352         node = self._put_path(user, self._lookup_account(account, True)[1], path)
353         self._put_policy(node, policy, True)
354     
355     @backend_method
356     def delete_container(self, user, account, container, until=None):
357         """Delete/purge the container with the given name."""
358         
359         logger.debug("delete_container: %s %s %s", account, container, until)
360         if user != account:
361             raise NotAllowedError
362         path, node = self._lookup_container(account, container)
363         
364         if until is not None:
365             hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
366             for h in hashes:
367                 self.store.map_delete(h)
368             self.node.node_purge_children(node, until, CLUSTER_DELETED)
369             return
370         
371         if self._get_statistics(node)[0] > 0:
372             raise IndexError('Container is not empty')
373         hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
374         for h in hashes:
375             self.store.map_delete(h)
376         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
377         self.node.node_remove(node)
378     
379     @backend_method
380     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None):
381         """Return a list of objects existing under a container."""
382         
383         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
384         allowed = []
385         if user != account:
386             if until:
387                 raise NotAllowedError
388             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
389             if not allowed:
390                 raise NotAllowedError
391         else:
392             if shared:
393                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
394                 if not allowed:
395                     return []
396         path, node = self._lookup_container(account, container)
397         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
398     
399     @backend_method
400     def list_object_meta(self, user, account, container, domain, until=None):
401         """Return a list with all the container's object meta keys for the domain."""
402         
403         logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
404         allowed = []
405         if user != account:
406             if until:
407                 raise NotAllowedError
408             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
409             if not allowed:
410                 raise NotAllowedError
411         path, node = self._lookup_container(account, container)
412         before = until if until is not None else inf
413         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
414     
415     @backend_method
416     def get_object_meta(self, user, account, container, name, domain, version=None):
417         """Return a dictionary with the object metadata for the domain."""
418         
419         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
420         self._can_read(user, account, container, name)
421         path, node = self._lookup_object(account, container, name)
422         props = self._get_version(node, version)
423         if version is None:
424             modified = props[self.MTIME]
425         else:
426             try:
427                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
428             except NameError: # Object may be deleted.
429                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
430                 if del_props is None:
431                     raise NameError('Object does not exist')
432                 modified = del_props[self.MTIME]
433         
434         meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
435         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
436         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
437         meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
438         return meta
439     
440     @backend_method
441     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
442         """Update the metadata associated with the object for the domain and return the new version."""
443         
444         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
445         self._can_write(user, account, container, name)
446         path, node = self._lookup_object(account, container, name)
447         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
448         self._apply_versioning(account, container, src_version_id)
449         return dest_version_id
450     
451     @backend_method
452     def get_object_permissions(self, user, account, container, name):
453         """Return the action allowed on the object, the path
454         from which the object gets its permissions from,
455         along with a dictionary containing the permissions."""
456         
457         logger.debug("get_object_permissions: %s %s %s", account, container, name)
458         allowed = 'write'
459         if user != account:
460             path = '/'.join((account, container, name))
461             if self.permissions.access_check(path, self.WRITE, user):
462                 allowed = 'write'
463             elif self.permissions.access_check(path, self.READ, user):
464                 allowed = 'read'
465             else:
466                 raise NotAllowedError
467         path = self._lookup_object(account, container, name)[0]
468         return (allowed,) + self.permissions.access_inherit(path)
469     
470     @backend_method
471     def update_object_permissions(self, user, account, container, name, permissions):
472         """Update the permissions associated with the object."""
473         
474         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
475         if user != account:
476             raise NotAllowedError
477         path = self._lookup_object(account, container, name)[0]
478         self._check_permissions(path, permissions)
479         self.permissions.access_set(path, permissions)
480     
481     @backend_method
482     def get_object_public(self, user, account, container, name):
483         """Return the public id of the object if applicable."""
484         
485         logger.debug("get_object_public: %s %s %s", account, container, name)
486         self._can_read(user, account, container, name)
487         path = self._lookup_object(account, container, name)[0]
488         p = self.permissions.public_get(path)
489         if p is not None:
490             p += ULTIMATE_ANSWER
491         return p
492     
493     @backend_method
494     def update_object_public(self, user, account, container, name, public):
495         """Update the public status of the object."""
496         
497         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
498         self._can_write(user, account, container, name)
499         path = self._lookup_object(account, container, name)[0]
500         if not public:
501             self.permissions.public_unset(path)
502         else:
503             self.permissions.public_set(path)
504     
505     @backend_method
506     def get_object_hashmap(self, user, account, container, name, version=None):
507         """Return the object's size and a list with partial hashes."""
508         
509         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
510         self._can_read(user, account, container, name)
511         path, node = self._lookup_object(account, container, name)
512         props = self._get_version(node, version)
513         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
514         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
515     
516     def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
517         if permissions is not None and user != account:
518             raise NotAllowedError
519         self._can_write(user, account, container, name)
520         if permissions is not None:
521             path = '/'.join((account, container, name))
522             self._check_permissions(path, permissions)
523         
524         account_path, account_node = self._lookup_account(account, True)
525         container_path, container_node = self._lookup_container(account, container)
526         path, node = self._put_object_node(container_path, container_node, name)
527         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
528         
529         # Check quota.
530         versioning = self._get_policy(container_node)['versioning']
531         if versioning != 'auto':
532             size_delta = size - 0 # TODO: Get previous size.
533         else:
534             size_delta = size
535         if size_delta > 0:
536             account_quota = long(self._get_policy(account_node)['quota'])
537             container_quota = long(self._get_policy(container_node)['quota'])
538             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
539                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
540                 # This must be executed in a transaction, so the version is never created if it fails.
541                 raise QuotaError
542         
543         if permissions is not None:
544             self.permissions.access_set(path, permissions)
545         self._apply_versioning(account, container, pre_version_id)
546         return pre_version_id, dest_version_id
547     
548     @backend_method
549     def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
550         """Create/update an object with the specified size and partial hashes."""
551         
552         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
553         if size == 0: # No such thing as an empty hashmap.
554             hashmap = [self.put_block('')]
555         map = HashMap(self.block_size, self.hash_algorithm)
556         map.extend([binascii.unhexlify(x) for x in hashmap])
557         missing = self.store.block_search(map)
558         if missing:
559             ie = IndexError()
560             ie.data = [binascii.hexlify(x) for x in missing]
561             raise ie
562         
563         hash = map.hash()
564         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
565         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
566         self.store.map_put(hash, map)
567         return dest_version_id
568     
569     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
570         self._can_read(user, src_account, src_container, src_name)
571         path, node = self._lookup_object(src_account, src_container, src_name)
572         # TODO: Will do another fetch of the properties in duplicate version...
573         props = self._get_version(node, src_version) # Check to see if source exists.
574         src_version_id = props[self.SERIAL]
575         hash = props[self.HASH]
576         size = props[self.SIZE]
577         
578         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
579         pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
580         self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
581         return dest_version_id
582     
583     @backend_method
584     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
585         """Copy an object's data and metadata."""
586         
587         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
588         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
589     
590     @backend_method
591     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
592         """Move an object's data and metadata."""
593         
594         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
595         if user != src_account:
596             raise NotAllowedError
597         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
598         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
599             self._delete_object(user, src_account, src_container, src_name)
600         return dest_version_id
601     
602     def _delete_object(self, user, account, container, name, until=None):
603         if user != account:
604             raise NotAllowedError
605         
606         if until is not None:
607             path = '/'.join((account, container, name))
608             node = self.node.node_lookup(path)
609             if node is None:
610                 return
611             hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
612             hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
613             for h in hashes:
614                 self.store.map_delete(h)
615             self.node.node_purge(node, until, CLUSTER_DELETED)
616             try:
617                 props = self._get_version(node)
618             except NameError:
619                 self.permissions.access_clear(path)
620             return
621         
622         path, node = self._lookup_object(account, container, name)
623         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
624         self._apply_versioning(account, container, src_version_id)
625         self.permissions.access_clear(path)
626     
627     @backend_method
628     def delete_object(self, user, account, container, name, until=None):
629         """Delete/purge an object."""
630         
631         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
632         self._delete_object(user, account, container, name, until)
633     
634     @backend_method
635     def list_versions(self, user, account, container, name):
636         """Return a list of all (version, version_timestamp) tuples for an object."""
637         
638         logger.debug("list_versions: %s %s %s", account, container, name)
639         self._can_read(user, account, container, name)
640         path, node = self._lookup_object(account, container, name)
641         versions = self.node.node_get_versions(node)
642         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
643     
644     @backend_method
645     def get_uuid(self, user, uuid):
646         """Return the (account, container, name) for the UUID given."""
647         logger.debug("get_uuid: %s", uuid)
648         info = self.node.latest_uuid(uuid)
649         if info is None:
650             raise NameError
651         path, serial = info
652         account, container, name = path.split('/', 2)
653         self._can_read(user, account, container, name)
654         return (account, container, name)
655     
656     @backend_method
657     def get_public(self, user, public):
658         """Return the (account, container, name) for the public id given."""
659         logger.debug("get_public: %s", public)
660         if public is None or public < ULTIMATE_ANSWER:
661             raise NameError
662         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
663         if path is None:
664             raise NameError
665         account, container, name = path.split('/', 2)
666         self._can_read(user, account, container, name)
667         return (account, container, name)
668     
669     @backend_method(autocommit=0)
670     def get_block(self, hash):
671         """Return a block's data."""
672         
673         logger.debug("get_block: %s", hash)
674         block = self.store.block_get(binascii.unhexlify(hash))
675         if not block:
676             raise NameError('Block does not exist')
677         return block
678     
679     @backend_method(autocommit=0)
680     def put_block(self, data):
681         """Store a block and return the hash."""
682         
683         logger.debug("put_block: %s", len(data))
684         return binascii.hexlify(self.store.block_put(data))
685     
686     @backend_method(autocommit=0)
687     def update_block(self, hash, data, offset=0):
688         """Update a known block and return the hash."""
689         
690         logger.debug("update_block: %s %s %s", hash, len(data), offset)
691         if offset == 0 and len(data) == self.block_size:
692             return self.put_block(data)
693         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
694         return binascii.hexlify(h)
695     
696     # Path functions.
697     
698     def _generate_uuid(self):
699         return str(uuidlib.uuid4())
700     
701     def _put_object_node(self, path, parent, name):
702         path = '/'.join((path, name))
703         node = self.node.node_lookup(path)
704         if node is None:
705             node = self.node.node_create(parent, path)
706         return path, node
707     
708     def _put_path(self, user, parent, path):
709         node = self.node.node_create(parent, path)
710         self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
711         return node
712     
713     def _lookup_account(self, account, create=True):
714         node = self.node.node_lookup(account)
715         if node is None and create:
716             node = self._put_path(account, self.ROOTNODE, account) # User is account.
717         return account, node
718     
719     def _lookup_container(self, account, container):
720         path = '/'.join((account, container))
721         node = self.node.node_lookup(path)
722         if node is None:
723             raise NameError('Container does not exist')
724         return path, node
725     
726     def _lookup_object(self, account, container, name):
727         path = '/'.join((account, container, name))
728         node = self.node.node_lookup(path)
729         if node is None:
730             raise NameError('Object does not exist')
731         return path, node
732     
733     def _get_properties(self, node, until=None):
734         """Return properties until the timestamp given."""
735         
736         before = until if until is not None else inf
737         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
738         if props is None and until is not None:
739             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
740         if props is None:
741             raise NameError('Path does not exist')
742         return props
743     
744     def _get_statistics(self, node, until=None):
745         """Return count, sum of size and latest timestamp of everything under node."""
746         
747         if until is None:
748             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
749         else:
750             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
751         if stats is None:
752             stats = (0, 0, 0)
753         return stats
754     
755     def _get_version(self, node, version=None):
756         if version is None:
757             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
758             if props is None:
759                 raise NameError('Object does not exist')
760         else:
761             try:
762                 version = int(version)
763             except ValueError:
764                 raise IndexError('Version does not exist')
765             props = self.node.version_get_properties(version)
766             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
767                 raise IndexError('Version does not exist')
768         return props
769     
770     def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
771         """Create a new version of the node."""
772         
773         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
774         if props is not None:
775             src_version_id = props[self.SERIAL]
776             src_hash = props[self.HASH]
777             src_size = props[self.SIZE]
778         else:
779             src_version_id = None
780             src_hash = None
781             src_size = 0
782         if size is None:
783             hash = src_hash # This way hash can be set to None.
784             size = src_size
785         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
786         
787         if src_node is None:
788             pre_version_id = src_version_id
789         else:
790             pre_version_id = None
791             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
792             if props is not None:
793                 pre_version_id = props[self.SERIAL]
794         if pre_version_id is not None:
795             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
796         
797         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
798         return pre_version_id, dest_version_id
799     
800     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
801         if src_version_id is not None:
802             self.node.attribute_copy(src_version_id, dest_version_id)
803         if not replace:
804             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
805             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
806         else:
807             self.node.attribute_del(dest_version_id, domain)
808             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
809     
810     def _put_metadata(self, user, node, domain, meta, replace=False):
811         """Create a new version and store metadata."""
812         
813         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
814         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
815         return src_version_id, dest_version_id
816     
817     def _list_limits(self, listing, marker, limit):
818         start = 0
819         if marker:
820             try:
821                 start = listing.index(marker) + 1
822             except ValueError:
823                 pass
824         if not limit or limit > 10000:
825             limit = 10000
826         return start, limit
827     
828     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
829         cont_prefix = path + '/'
830         prefix = cont_prefix + prefix
831         start = cont_prefix + marker if marker else None
832         before = until if until is not None else inf
833         filterq = keys if domain else []
834         
835         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
836         objects.extend([(p, None) for p in prefixes] if virtual else [])
837         objects.sort(key=lambda x: x[0])
838         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
839         
840         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
841         return objects[start:start + limit]
842     
843     # Policy functions.
844     
845     def _check_policy(self, policy):
846         for k in policy.keys():
847             if policy[k] == '':
848                 policy[k] = self.default_policy.get(k)
849         for k, v in policy.iteritems():
850             if k == 'quota':
851                 q = int(v) # May raise ValueError.
852                 if q < 0:
853                     raise ValueError
854             elif k == 'versioning':
855                 if v not in ['auto', 'none']:
856                     raise ValueError
857             else:
858                 raise ValueError
859     
860     def _put_policy(self, node, policy, replace):
861         if replace:
862             for k, v in self.default_policy.iteritems():
863                 if k not in policy:
864                     policy[k] = v
865         self.node.policy_set(node, policy)
866     
867     def _get_policy(self, node):
868         policy = self.default_policy.copy()
869         policy.update(self.node.policy_get(node))
870         return policy
871     
872     def _apply_versioning(self, account, container, version_id):
873         if version_id is None:
874             return
875         path, node = self._lookup_container(account, container)
876         versioning = self._get_policy(node)['versioning']
877         if versioning != 'auto':
878             hash = self.node.version_remove(version_id)
879             self.store.map_delete(hash)
880     
881     # Access control functions.
882     
883     def _check_groups(self, groups):
884         # raise ValueError('Bad characters in groups')
885         pass
886     
887     def _check_permissions(self, path, permissions):
888         # raise ValueError('Bad characters in permissions')
889         
890         # Check for existing permissions.
891         paths = self.permissions.access_list(path)
892         if paths:
893             ae = AttributeError()
894             ae.data = paths
895             raise ae
896     
897     def _can_read(self, user, account, container, name):
898         if user == account:
899             return True
900         path = '/'.join((account, container, name))
901         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
902             raise NotAllowedError
903     
904     def _can_write(self, user, account, container, name):
905         if user == account:
906             return True
907         path = '/'.join((account, container, name))
908         if not self.permissions.access_check(path, self.WRITE, user):
909             raise NotAllowedError
910     
911     def _allowed_accounts(self, user):
912         allow = set()
913         for path in self.permissions.access_list_paths(user):
914             allow.add(path.split('/', 1)[0])
915         return sorted(allow)
916     
917     def _allowed_containers(self, user, account):
918         allow = set()
919         for path in self.permissions.access_list_paths(user, account):
920             allow.add(path.split('/', 2)[1])
921         return sorted(allow)