Split pithos components in separate packages
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import binascii
40
41 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42
43 from pithos.lib.hashmap import HashMap
44
45 # Default modules and settings.
46 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 DEFAULT_BLOCK_PATH = 'data/'
50
51 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
52
53 inf = float('inf')
54
55 ULTIMATE_ANSWER = 42
56
57
58 logger = logging.getLogger(__name__)
59
60
61 def backend_method(func=None, autocommit=1):
62     if func is None:
63         def fn(func):
64             return backend_method(func, autocommit)
65         return fn
66
67     if not autocommit:
68         return func
69     def fn(self, *args, **kw):
70         self.wrapper.execute()
71         try:
72             ret = func(self, *args, **kw)
73             self.wrapper.commit()
74             return ret
75         except:
76             self.wrapper.rollback()
77             raise
78     return fn
79
80
81 class ModularBackend(BaseBackend):
82     """A modular backend.
83     
84     Uses modules for SQL functions and storage.
85     """
86     
87     def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
88         db_module = db_module or DEFAULT_DB_MODULE
89         db_connection = db_connection or DEFAULT_DB_CONNECTION
90         block_module = block_module or DEFAULT_BLOCK_MODULE
91         block_path = block_path or DEFAULT_BLOCK_PATH
92         
93         self.hash_algorithm = 'sha256'
94         self.block_size = 4 * 1024 * 1024 # 4MB
95         
96         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
97         
98         __import__(db_module)
99         self.db_module = sys.modules[db_module]
100         self.wrapper = self.db_module.DBWrapper(db_connection)
101         
102         params = {'wrapper': self.wrapper}
103         self.permissions = self.db_module.Permissions(**params)
104         for x in ['READ', 'WRITE']:
105             setattr(self, x, getattr(self.db_module, x))
106         self.node = self.db_module.Node(**params)
107         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
108             setattr(self, x, getattr(self.db_module, x))
109         
110         __import__(block_module)
111         self.block_module = sys.modules[block_module]
112         
113         params = {'path': block_path,
114                   'block_size': self.block_size,
115                   'hash_algorithm': self.hash_algorithm}
116         self.store = self.block_module.Store(**params)
117     
118     def close(self):
119         self.wrapper.close()
120     
121     @backend_method
122     def list_accounts(self, user, marker=None, limit=10000):
123         """Return a list of accounts the user can access."""
124         
125         logger.debug("list_accounts: %s %s %s", user, marker, limit)
126         allowed = self._allowed_accounts(user)
127         start, limit = self._list_limits(allowed, marker, limit)
128         return allowed[start:start + limit]
129     
130     @backend_method
131     def get_account_meta(self, user, account, domain, until=None):
132         """Return a dictionary with the account metadata for the domain."""
133         
134         logger.debug("get_account_meta: %s %s %s", account, domain, until)
135         path, node = self._lookup_account(account, user == account)
136         if user != account:
137             if until or node is None or account not in self._allowed_accounts(user):
138                 raise NotAllowedError
139         try:
140             props = self._get_properties(node, until)
141             mtime = props[self.MTIME]
142         except NameError:
143             props = None
144             mtime = until
145         count, bytes, tstamp = self._get_statistics(node, until)
146         tstamp = max(tstamp, mtime)
147         if until is None:
148             modified = tstamp
149         else:
150             modified = self._get_statistics(node)[2] # Overall last modification.
151             modified = max(modified, mtime)
152         
153         if user != account:
154             meta = {'name': account}
155         else:
156             meta = {}
157             if props is not None:
158                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
159             if until is not None:
160                 meta.update({'until_timestamp': tstamp})
161             meta.update({'name': account, 'count': count, 'bytes': bytes})
162         meta.update({'modified': modified})
163         return meta
164     
165     @backend_method
166     def update_account_meta(self, user, account, domain, meta, replace=False):
167         """Update the metadata associated with the account for the domain."""
168         
169         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
170         if user != account:
171             raise NotAllowedError
172         path, node = self._lookup_account(account, True)
173         self._put_metadata(user, node, domain, meta, replace)
174     
175     @backend_method
176     def get_account_groups(self, user, account):
177         """Return a dictionary with the user groups defined for this account."""
178         
179         logger.debug("get_account_groups: %s", account)
180         if user != account:
181             if account not in self._allowed_accounts(user):
182                 raise NotAllowedError
183             return {}
184         self._lookup_account(account, True)
185         return self.permissions.group_dict(account)
186     
187     @backend_method
188     def update_account_groups(self, user, account, groups, replace=False):
189         """Update the groups associated with the account."""
190         
191         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
192         if user != account:
193             raise NotAllowedError
194         self._lookup_account(account, True)
195         self._check_groups(groups)
196         if replace:
197             self.permissions.group_destroy(account)
198         for k, v in groups.iteritems():
199             if not replace: # If not already deleted.
200                 self.permissions.group_delete(account, k)
201             if v:
202                 self.permissions.group_addmany(account, k, v)
203     
204     @backend_method
205     def get_account_policy(self, user, account):
206         """Return a dictionary with the account policy."""
207         
208         logger.debug("get_account_policy: %s", account)
209         if user != account:
210             if account not in self._allowed_accounts(user):
211                 raise NotAllowedError
212             return {}
213         path, node = self._lookup_account(account, True)
214         return self._get_policy(node)
215     
216     @backend_method
217     def update_account_policy(self, user, account, policy, replace=False):
218         """Update the policy associated with the account."""
219         
220         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
221         if user != account:
222             raise NotAllowedError
223         path, node = self._lookup_account(account, True)
224         self._check_policy(policy)
225         self._put_policy(node, policy, replace)
226     
227     @backend_method
228     def put_account(self, user, account, policy={}):
229         """Create a new account with the given name."""
230         
231         logger.debug("put_account: %s %s", account, policy)
232         if user != account:
233             raise NotAllowedError
234         node = self.node.node_lookup(account)
235         if node is not None:
236             raise NameError('Account already exists')
237         if policy:
238             self._check_policy(policy)
239         node = self._put_path(user, self.ROOTNODE, account)
240         self._put_policy(node, policy, True)
241     
242     @backend_method
243     def delete_account(self, user, account):
244         """Delete the account with the given name."""
245         
246         logger.debug("delete_account: %s", account)
247         if user != account:
248             raise NotAllowedError
249         node = self.node.node_lookup(account)
250         if node is None:
251             return
252         if not self.node.node_remove(node):
253             raise IndexError('Account is not empty')
254         self.permissions.group_destroy(account)
255     
256     @backend_method
257     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
258         """Return a list of containers existing under an account."""
259         
260         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
261         if user != account:
262             if until or account not in self._allowed_accounts(user):
263                 raise NotAllowedError
264             allowed = self._allowed_containers(user, account)
265             start, limit = self._list_limits(allowed, marker, limit)
266             return allowed[start:start + limit]
267         if shared:
268             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
269             allowed = list(set(allowed))
270             start, limit = self._list_limits(allowed, marker, limit)
271             return allowed[start:start + limit]
272         node = self.node.node_lookup(account)
273         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
274     
275     @backend_method
276     def get_container_meta(self, user, account, container, domain, until=None):
277         """Return a dictionary with the container metadata for the domain."""
278         
279         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
280         if user != account:
281             if until or container not in self._allowed_containers(user, account):
282                 raise NotAllowedError
283         path, node = self._lookup_container(account, container)
284         props = self._get_properties(node, until)
285         mtime = props[self.MTIME]
286         count, bytes, tstamp = self._get_statistics(node, until)
287         tstamp = max(tstamp, mtime)
288         if until is None:
289             modified = tstamp
290         else:
291             modified = self._get_statistics(node)[2] # Overall last modification.
292             modified = max(modified, mtime)
293         
294         if user != account:
295             meta = {'name': container}
296         else:
297             meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
298             if until is not None:
299                 meta.update({'until_timestamp': tstamp})
300             meta.update({'name': container, 'count': count, 'bytes': bytes})
301         meta.update({'modified': modified})
302         return meta
303     
304     @backend_method
305     def update_container_meta(self, user, account, container, domain, meta, replace=False):
306         """Update the metadata associated with the container for the domain."""
307         
308         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
309         if user != account:
310             raise NotAllowedError
311         path, node = self._lookup_container(account, container)
312         self._put_metadata(user, node, domain, meta, replace)
313     
314     @backend_method
315     def get_container_policy(self, user, account, container):
316         """Return a dictionary with the container policy."""
317         
318         logger.debug("get_container_policy: %s %s", account, container)
319         if user != account:
320             if container not in self._allowed_containers(user, account):
321                 raise NotAllowedError
322             return {}
323         path, node = self._lookup_container(account, container)
324         return self._get_policy(node)
325     
326     @backend_method
327     def update_container_policy(self, user, account, container, policy, replace=False):
328         """Update the policy associated with the container."""
329         
330         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         self._check_policy(policy)
335         self._put_policy(node, policy, replace)
336     
337     @backend_method
338     def put_container(self, user, account, container, policy={}):
339         """Create a new container with the given name."""
340         
341         logger.debug("put_container: %s %s %s", account, container, policy)
342         if user != account:
343             raise NotAllowedError
344         try:
345             path, node = self._lookup_container(account, container)
346         except NameError:
347             pass
348         else:
349             raise NameError('Container already exists')
350         if policy:
351             self._check_policy(policy)
352         path = '/'.join((account, container))
353         node = self._put_path(user, self._lookup_account(account, True)[1], path)
354         self._put_policy(node, policy, True)
355     
356     @backend_method
357     def delete_container(self, user, account, container, until=None):
358         """Delete/purge the container with the given name."""
359         
360         logger.debug("delete_container: %s %s %s", account, container, until)
361         if user != account:
362             raise NotAllowedError
363         path, node = self._lookup_container(account, container)
364         
365         if until is not None:
366             hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
367             for h in hashes:
368                 self.store.map_delete(h)
369             self.node.node_purge_children(node, until, CLUSTER_DELETED)
370             return
371         
372         if self._get_statistics(node)[0] > 0:
373             raise IndexError('Container is not empty')
374         hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
375         for h in hashes:
376             self.store.map_delete(h)
377         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
378         self.node.node_remove(node)
379     
380     @backend_method
381     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
382         """Return a list of objects existing under a container."""
383         
384         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
385         allowed = []
386         if user != account:
387             if until:
388                 raise NotAllowedError
389             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
390             if not allowed:
391                 raise NotAllowedError
392         else:
393             if shared:
394                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
395                 if not allowed:
396                     return []
397         path, node = self._lookup_container(account, container)
398         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
399     
400     @backend_method
401     def list_object_meta(self, user, account, container, domain, until=None):
402         """Return a list with all the container's object meta keys for the domain."""
403         
404         logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
405         allowed = []
406         if user != account:
407             if until:
408                 raise NotAllowedError
409             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
410             if not allowed:
411                 raise NotAllowedError
412         path, node = self._lookup_container(account, container)
413         before = until if until is not None else inf
414         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
415     
416     @backend_method
417     def get_object_meta(self, user, account, container, name, domain, version=None):
418         """Return a dictionary with the object metadata for the domain."""
419         
420         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
421         self._can_read(user, account, container, name)
422         path, node = self._lookup_object(account, container, name)
423         props = self._get_version(node, version)
424         if version is None:
425             modified = props[self.MTIME]
426         else:
427             try:
428                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
429             except NameError: # Object may be deleted.
430                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
431                 if del_props is None:
432                     raise NameError('Object does not exist')
433                 modified = del_props[self.MTIME]
434         
435         meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
436         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
437         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
438         meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
439         return meta
440     
441     @backend_method
442     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
443         """Update the metadata associated with the object for the domain and return the new version."""
444         
445         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
446         self._can_write(user, account, container, name)
447         path, node = self._lookup_object(account, container, name)
448         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
449         self._apply_versioning(account, container, src_version_id)
450         return dest_version_id
451     
452     @backend_method
453     def get_object_permissions(self, user, account, container, name):
454         """Return the action allowed on the object, the path
455         from which the object gets its permissions from,
456         along with a dictionary containing the permissions."""
457         
458         logger.debug("get_object_permissions: %s %s %s", account, container, name)
459         allowed = 'write'
460         if user != account:
461             path = '/'.join((account, container, name))
462             if self.permissions.access_check(path, self.WRITE, user):
463                 allowed = 'write'
464             elif self.permissions.access_check(path, self.READ, user):
465                 allowed = 'read'
466             else:
467                 raise NotAllowedError
468         path = self._lookup_object(account, container, name)[0]
469         return (allowed,) + self.permissions.access_inherit(path)
470     
471     @backend_method
472     def update_object_permissions(self, user, account, container, name, permissions):
473         """Update the permissions associated with the object."""
474         
475         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
476         if user != account:
477             raise NotAllowedError
478         path = self._lookup_object(account, container, name)[0]
479         self._check_permissions(path, permissions)
480         self.permissions.access_set(path, permissions)
481     
482     @backend_method
483     def get_object_public(self, user, account, container, name):
484         """Return the public id of the object if applicable."""
485         
486         logger.debug("get_object_public: %s %s %s", account, container, name)
487         self._can_read(user, account, container, name)
488         path = self._lookup_object(account, container, name)[0]
489         p = self.permissions.public_get(path)
490         if p is not None:
491             p += ULTIMATE_ANSWER
492         return p
493     
494     @backend_method
495     def update_object_public(self, user, account, container, name, public):
496         """Update the public status of the object."""
497         
498         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
499         self._can_write(user, account, container, name)
500         path = self._lookup_object(account, container, name)[0]
501         if not public:
502             self.permissions.public_unset(path)
503         else:
504             self.permissions.public_set(path)
505     
506     @backend_method
507     def get_object_hashmap(self, user, account, container, name, version=None):
508         """Return the object's size and a list with partial hashes."""
509         
510         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
511         self._can_read(user, account, container, name)
512         path, node = self._lookup_object(account, container, name)
513         props = self._get_version(node, version)
514         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
515         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
516     
517     def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
518         if permissions is not None and user != account:
519             raise NotAllowedError
520         self._can_write(user, account, container, name)
521         if permissions is not None:
522             path = '/'.join((account, container, name))
523             self._check_permissions(path, permissions)
524         
525         account_path, account_node = self._lookup_account(account, True)
526         container_path, container_node = self._lookup_container(account, container)
527         path, node = self._put_object_node(container_path, container_node, name)
528         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
529         
530         # Check quota.
531         versioning = self._get_policy(container_node)['versioning']
532         if versioning != 'auto':
533             size_delta = size - 0 # TODO: Get previous size.
534         else:
535             size_delta = size
536         if size_delta > 0:
537             account_quota = long(self._get_policy(account_node)['quota'])
538             container_quota = long(self._get_policy(container_node)['quota'])
539             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
540                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
541                 # This must be executed in a transaction, so the version is never created if it fails.
542                 raise QuotaError
543         
544         if permissions is not None:
545             self.permissions.access_set(path, permissions)
546         self._apply_versioning(account, container, pre_version_id)
547         return pre_version_id, dest_version_id
548     
549     @backend_method
550     def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
551         """Create/update an object with the specified size and partial hashes."""
552         
553         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
554         if size == 0: # No such thing as an empty hashmap.
555             hashmap = [self.put_block('')]
556         map = HashMap(self.block_size, self.hash_algorithm)
557         map.extend([binascii.unhexlify(x) for x in hashmap])
558         missing = self.store.block_search(map)
559         if missing:
560             ie = IndexError()
561             ie.data = [binascii.hexlify(x) for x in missing]
562             raise ie
563         
564         hash = map.hash()
565         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
566         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
567         self.store.map_put(hash, map)
568         return dest_version_id
569     
570     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
571         self._can_read(user, src_account, src_container, src_name)
572         path, node = self._lookup_object(src_account, src_container, src_name)
573         # TODO: Will do another fetch of the properties in duplicate version...
574         props = self._get_version(node, src_version) # Check to see if source exists.
575         src_version_id = props[self.SERIAL]
576         hash = props[self.HASH]
577         size = props[self.SIZE]
578         
579         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
580         pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
581         self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
582         return dest_version_id
583     
584     @backend_method
585     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
586         """Copy an object's data and metadata."""
587         
588         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
589         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
590     
591     @backend_method
592     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
593         """Move an object's data and metadata."""
594         
595         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
596         if user != src_account:
597             raise NotAllowedError
598         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
599         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
600             self._delete_object(user, src_account, src_container, src_name)
601         return dest_version_id
602     
603     def _delete_object(self, user, account, container, name, until=None):
604         if user != account:
605             raise NotAllowedError
606         
607         if until is not None:
608             path = '/'.join((account, container, name))
609             node = self.node.node_lookup(path)
610             if node is None:
611                 return
612             hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
613             hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
614             for h in hashes:
615                 self.store.map_delete(h)
616             self.node.node_purge(node, until, CLUSTER_DELETED)
617             try:
618                 props = self._get_version(node)
619             except NameError:
620                 self.permissions.access_clear(path)
621             return
622         
623         path, node = self._lookup_object(account, container, name)
624         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
625         self._apply_versioning(account, container, src_version_id)
626         self.permissions.access_clear(path)
627     
628     @backend_method
629     def delete_object(self, user, account, container, name, until=None):
630         """Delete/purge an object."""
631         
632         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
633         self._delete_object(user, account, container, name, until)
634     
635     @backend_method
636     def list_versions(self, user, account, container, name):
637         """Return a list of all (version, version_timestamp) tuples for an object."""
638         
639         logger.debug("list_versions: %s %s %s", account, container, name)
640         self._can_read(user, account, container, name)
641         path, node = self._lookup_object(account, container, name)
642         versions = self.node.node_get_versions(node)
643         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
644     
645     @backend_method
646     def get_uuid(self, user, uuid):
647         """Return the (account, container, name) for the UUID given."""
648         logger.debug("get_uuid: %s", uuid)
649         info = self.node.latest_uuid(uuid)
650         if info is None:
651             raise NameError
652         path, serial = info
653         account, container, name = path.split('/', 2)
654         self._can_read(user, account, container, name)
655         return (account, container, name)
656     
657     @backend_method
658     def get_public(self, user, public):
659         """Return the (account, container, name) for the public id given."""
660         logger.debug("get_public: %s", public)
661         if public is None or public < ULTIMATE_ANSWER:
662             raise NameError
663         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
664         if path is None:
665             raise NameError
666         account, container, name = path.split('/', 2)
667         self._can_read(user, account, container, name)
668         return (account, container, name)
669     
670     @backend_method(autocommit=0)
671     def get_block(self, hash):
672         """Return a block's data."""
673         
674         logger.debug("get_block: %s", hash)
675         block = self.store.block_get(binascii.unhexlify(hash))
676         if not block:
677             raise NameError('Block does not exist')
678         return block
679     
680     @backend_method(autocommit=0)
681     def put_block(self, data):
682         """Store a block and return the hash."""
683         
684         logger.debug("put_block: %s", len(data))
685         return binascii.hexlify(self.store.block_put(data))
686     
687     @backend_method(autocommit=0)
688     def update_block(self, hash, data, offset=0):
689         """Update a known block and return the hash."""
690         
691         logger.debug("update_block: %s %s %s", hash, len(data), offset)
692         if offset == 0 and len(data) == self.block_size:
693             return self.put_block(data)
694         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
695         return binascii.hexlify(h)
696     
697     # Path functions.
698     
699     def _generate_uuid(self):
700         return str(uuidlib.uuid4())
701     
702     def _put_object_node(self, path, parent, name):
703         path = '/'.join((path, name))
704         node = self.node.node_lookup(path)
705         if node is None:
706             node = self.node.node_create(parent, path)
707         return path, node
708     
709     def _put_path(self, user, parent, path):
710         node = self.node.node_create(parent, path)
711         self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
712         return node
713     
714     def _lookup_account(self, account, create=True):
715         node = self.node.node_lookup(account)
716         if node is None and create:
717             node = self._put_path(account, self.ROOTNODE, account) # User is account.
718         return account, node
719     
720     def _lookup_container(self, account, container):
721         path = '/'.join((account, container))
722         node = self.node.node_lookup(path)
723         if node is None:
724             raise NameError('Container does not exist')
725         return path, node
726     
727     def _lookup_object(self, account, container, name):
728         path = '/'.join((account, container, name))
729         node = self.node.node_lookup(path)
730         if node is None:
731             raise NameError('Object does not exist')
732         return path, node
733     
734     def _get_properties(self, node, until=None):
735         """Return properties until the timestamp given."""
736         
737         before = until if until is not None else inf
738         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
739         if props is None and until is not None:
740             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
741         if props is None:
742             raise NameError('Path does not exist')
743         return props
744     
745     def _get_statistics(self, node, until=None):
746         """Return count, sum of size and latest timestamp of everything under node."""
747         
748         if until is None:
749             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
750         else:
751             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
752         if stats is None:
753             stats = (0, 0, 0)
754         return stats
755     
756     def _get_version(self, node, version=None):
757         if version is None:
758             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
759             if props is None:
760                 raise NameError('Object does not exist')
761         else:
762             try:
763                 version = int(version)
764             except ValueError:
765                 raise IndexError('Version does not exist')
766             props = self.node.version_get_properties(version)
767             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
768                 raise IndexError('Version does not exist')
769         return props
770     
771     def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
772         """Create a new version of the node."""
773         
774         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
775         if props is not None:
776             src_version_id = props[self.SERIAL]
777             src_hash = props[self.HASH]
778             src_size = props[self.SIZE]
779         else:
780             src_version_id = None
781             src_hash = None
782             src_size = 0
783         if size is None:
784             hash = src_hash # This way hash can be set to None.
785             size = src_size
786         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
787         
788         if src_node is None:
789             pre_version_id = src_version_id
790         else:
791             pre_version_id = None
792             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
793             if props is not None:
794                 pre_version_id = props[self.SERIAL]
795         if pre_version_id is not None:
796             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
797         
798         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
799         return pre_version_id, dest_version_id
800     
801     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
802         if src_version_id is not None:
803             self.node.attribute_copy(src_version_id, dest_version_id)
804         if not replace:
805             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
806             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
807         else:
808             self.node.attribute_del(dest_version_id, domain)
809             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
810     
811     def _put_metadata(self, user, node, domain, meta, replace=False):
812         """Create a new version and store metadata."""
813         
814         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
815         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
816         return src_version_id, dest_version_id
817     
818     def _list_limits(self, listing, marker, limit):
819         start = 0
820         if marker:
821             try:
822                 start = listing.index(marker) + 1
823             except ValueError:
824                 pass
825         if not limit or limit > 10000:
826             limit = 10000
827         return start, limit
828     
829     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
830         cont_prefix = path + '/'
831         prefix = cont_prefix + prefix
832         start = cont_prefix + marker if marker else None
833         before = until if until is not None else inf
834         filterq = keys if domain else []
835         sizeq = size_range
836         
837         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
838         objects.extend([(p, None) for p in prefixes] if virtual else [])
839         objects.sort(key=lambda x: x[0])
840         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
841         
842         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
843         return objects[start:start + limit]
844     
845     # Policy functions.
846     
847     def _check_policy(self, policy):
848         for k in policy.keys():
849             if policy[k] == '':
850                 policy[k] = self.default_policy.get(k)
851         for k, v in policy.iteritems():
852             if k == 'quota':
853                 q = int(v) # May raise ValueError.
854                 if q < 0:
855                     raise ValueError
856             elif k == 'versioning':
857                 if v not in ['auto', 'none']:
858                     raise ValueError
859             else:
860                 raise ValueError
861     
862     def _put_policy(self, node, policy, replace):
863         if replace:
864             for k, v in self.default_policy.iteritems():
865                 if k not in policy:
866                     policy[k] = v
867         self.node.policy_set(node, policy)
868     
869     def _get_policy(self, node):
870         policy = self.default_policy.copy()
871         policy.update(self.node.policy_get(node))
872         return policy
873     
874     def _apply_versioning(self, account, container, version_id):
875         if version_id is None:
876             return
877         path, node = self._lookup_container(account, container)
878         versioning = self._get_policy(node)['versioning']
879         if versioning != 'auto':
880             hash = self.node.version_remove(version_id)
881             self.store.map_delete(hash)
882     
883     # Access control functions.
884     
885     def _check_groups(self, groups):
886         # raise ValueError('Bad characters in groups')
887         pass
888     
889     def _check_permissions(self, path, permissions):
890         # raise ValueError('Bad characters in permissions')
891         
892         # Check for existing permissions.
893         paths = self.permissions.access_list(path)
894         if paths:
895             ae = AttributeError()
896             ae.data = paths
897             raise ae
898     
899     def _can_read(self, user, account, container, name):
900         if user == account:
901             return True
902         path = '/'.join((account, container, name))
903         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
904             raise NotAllowedError
905     
906     def _can_write(self, user, account, container, name):
907         if user == account:
908             return True
909         path = '/'.join((account, container, name))
910         if not self.permissions.access_check(path, self.WRITE, user):
911             raise NotAllowedError
912     
913     def _allowed_accounts(self, user):
914         allow = set()
915         for path in self.permissions.access_list_paths(user):
916             allow.add(path.split('/', 1)[0])
917         return sorted(allow)
918     
919     def _allowed_containers(self, user, account):
920         allow = set()
921         for path in self.permissions.access_list_paths(user, account):
922             allow.add(path.split('/', 2)[1])
923         return sorted(allow)