Fix node path escaping for SQLite.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import logging
38 import binascii
39
40 from base import NotAllowedError, QuotaError, BaseBackend
41
42 from pithos.lib.hashmap import HashMap
43
44 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45
46 inf = float('inf')
47
48 ULTIMATE_ANSWER = 42
49
50
51 logger = logging.getLogger(__name__)
52
53
54 def backend_method(func=None, autocommit=1):
55     if func is None:
56         def fn(func):
57             return backend_method(func, autocommit)
58         return fn
59
60     if not autocommit:
61         return func
62     def fn(self, *args, **kw):
63         self.wrapper.execute()
64         try:
65             ret = func(self, *args, **kw)
66             self.wrapper.commit()
67             return ret
68         except:
69             self.wrapper.rollback()
70             raise
71     return fn
72
73
74 class ModularBackend(BaseBackend):
75     """A modular backend.
76     
77     Uses modules for SQL functions and storage.
78     """
79     
80     def __init__(self, db_module, db_connection, block_module, block_path):
81         self.hash_algorithm = 'sha256'
82         self.block_size = 4 * 1024 * 1024 # 4MB
83         
84         self.default_policy = {'quota': 0, 'versioning': 'auto'}
85         
86         __import__(db_module)
87         self.db_module = sys.modules[db_module]
88         self.wrapper = self.db_module.DBWrapper(db_connection)
89         
90         params = {'wrapper': self.wrapper}
91         self.permissions = self.db_module.Permissions(**params)
92         for x in ['READ', 'WRITE']:
93             setattr(self, x, getattr(self.db_module, x))
94         self.node = self.db_module.Node(**params)
95         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
96             setattr(self, x, getattr(self.db_module, x))
97         
98         __import__(block_module)
99         self.block_module = sys.modules[block_module]
100         
101         params = {'path': block_path,
102                   'block_size': self.block_size,
103                   'hash_algorithm': self.hash_algorithm}
104         self.store = self.block_module.Store(**params)
105     
106     def close(self):
107         self.wrapper.close()
108     
109     @backend_method
110     def list_accounts(self, user, marker=None, limit=10000):
111         """Return a list of accounts the user can access."""
112         
113         logger.debug("list_accounts: %s %s %s", user, marker, limit)
114         allowed = self._allowed_accounts(user)
115         start, limit = self._list_limits(allowed, marker, limit)
116         return allowed[start:start + limit]
117     
118     @backend_method
119     def get_account_meta(self, user, account, until=None):
120         """Return a dictionary with the account metadata."""
121         
122         logger.debug("get_account_meta: %s %s", account, until)
123         path, node = self._lookup_account(account, user == account)
124         if user != account:
125             if until or node is None or account not in self._allowed_accounts(user):
126                 raise NotAllowedError
127         try:
128             props = self._get_properties(node, until)
129             mtime = props[self.MTIME]
130         except NameError:
131             props = None
132             mtime = until
133         count, bytes, tstamp = self._get_statistics(node, until)
134         tstamp = max(tstamp, mtime)
135         if until is None:
136             modified = tstamp
137         else:
138             modified = self._get_statistics(node)[2] # Overall last modification.
139             modified = max(modified, mtime)
140         
141         if user != account:
142             meta = {'name': account}
143         else:
144             meta = {}
145             if props is not None:
146                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
147             if until is not None:
148                 meta.update({'until_timestamp': tstamp})
149             meta.update({'name': account, 'count': count, 'bytes': bytes})
150         meta.update({'modified': modified})
151         return meta
152     
153     @backend_method
154     def update_account_meta(self, user, account, meta, replace=False):
155         """Update the metadata associated with the account."""
156         
157         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
158         if user != account:
159             raise NotAllowedError
160         path, node = self._lookup_account(account, True)
161         self._put_metadata(user, node, meta, replace)
162     
163     @backend_method
164     def get_account_groups(self, user, account):
165         """Return a dictionary with the user groups defined for this account."""
166         
167         logger.debug("get_account_groups: %s", account)
168         if user != account:
169             if account not in self._allowed_accounts(user):
170                 raise NotAllowedError
171             return {}
172         self._lookup_account(account, True)
173         return self.permissions.group_dict(account)
174     
175     @backend_method
176     def update_account_groups(self, user, account, groups, replace=False):
177         """Update the groups associated with the account."""
178         
179         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
180         if user != account:
181             raise NotAllowedError
182         self._lookup_account(account, True)
183         self._check_groups(groups)
184         if replace:
185             self.permissions.group_destroy(account)
186         for k, v in groups.iteritems():
187             if not replace: # If not already deleted.
188                 self.permissions.group_delete(account, k)
189             if v:
190                 self.permissions.group_addmany(account, k, v)
191     
192     @backend_method
193     def get_account_policy(self, user, account):
194         """Return a dictionary with the account policy."""
195         
196         logger.debug("get_account_policy: %s", account)
197         if user != account:
198             if account not in self._allowed_accounts(user):
199                 raise NotAllowedError
200             return {}
201         path, node = self._lookup_account(account, True)
202         return self._get_policy(node)
203     
204     @backend_method
205     def update_account_policy(self, user, account, policy, replace=False):
206         """Update the policy associated with the account."""
207         
208         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
209         if user != account:
210             raise NotAllowedError
211         path, node = self._lookup_account(account, True)
212         self._check_policy(policy)
213         self._put_policy(node, policy, replace)
214     
215     @backend_method
216     def put_account(self, user, account, policy={}):
217         """Create a new account with the given name."""
218         
219         logger.debug("put_account: %s %s", account, policy)
220         if user != account:
221             raise NotAllowedError
222         node = self.node.node_lookup(account)
223         if node is not None:
224             raise NameError('Account already exists')
225         if policy:
226             self._check_policy(policy)
227         node = self._put_path(user, self.ROOTNODE, account)
228         self._put_policy(node, policy, True)
229     
230     @backend_method
231     def delete_account(self, user, account):
232         """Delete the account with the given name."""
233         
234         logger.debug("delete_account: %s", account)
235         if user != account:
236             raise NotAllowedError
237         node = self.node.node_lookup(account)
238         if node is None:
239             return
240         if not self.node.node_remove(node):
241             raise IndexError('Account is not empty')
242         self.permissions.group_destroy(account)
243     
244     @backend_method
245     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
246         """Return a list of containers existing under an account."""
247         
248         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
249         if user != account:
250             if until or account not in self._allowed_accounts(user):
251                 raise NotAllowedError
252             allowed = self._allowed_containers(user, account)
253             start, limit = self._list_limits(allowed, marker, limit)
254             return allowed[start:start + limit]
255         if shared:
256             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
257             allowed = list(set(allowed))
258             start, limit = self._list_limits(allowed, marker, limit)
259             return allowed[start:start + limit]
260         node = self.node.node_lookup(account)
261         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
262     
263     @backend_method
264     def get_container_meta(self, user, account, container, until=None):
265         """Return a dictionary with the container metadata."""
266         
267         logger.debug("get_container_meta: %s %s %s", account, container, until)
268         if user != account:
269             if until or container not in self._allowed_containers(user, account):
270                 raise NotAllowedError
271         path, node = self._lookup_container(account, container)
272         props = self._get_properties(node, until)
273         mtime = props[self.MTIME]
274         count, bytes, tstamp = self._get_statistics(node, until)
275         tstamp = max(tstamp, mtime)
276         if until is None:
277             modified = tstamp
278         else:
279             modified = self._get_statistics(node)[2] # Overall last modification.
280             modified = max(modified, mtime)
281         
282         if user != account:
283             meta = {'name': container}
284         else:
285             meta = dict(self.node.attribute_get(props[self.SERIAL]))
286             if until is not None:
287                 meta.update({'until_timestamp': tstamp})
288             meta.update({'name': container, 'count': count, 'bytes': bytes})
289         meta.update({'modified': modified})
290         return meta
291     
292     @backend_method
293     def update_container_meta(self, user, account, container, meta, replace=False):
294         """Update the metadata associated with the container."""
295         
296         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
297         if user != account:
298             raise NotAllowedError
299         path, node = self._lookup_container(account, container)
300         self._put_metadata(user, node, meta, replace)
301     
302     @backend_method
303     def get_container_policy(self, user, account, container):
304         """Return a dictionary with the container policy."""
305         
306         logger.debug("get_container_policy: %s %s", account, container)
307         if user != account:
308             if container not in self._allowed_containers(user, account):
309                 raise NotAllowedError
310             return {}
311         path, node = self._lookup_container(account, container)
312         return self._get_policy(node)
313     
314     @backend_method
315     def update_container_policy(self, user, account, container, policy, replace=False):
316         """Update the policy associated with the container."""
317         
318         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
319         if user != account:
320             raise NotAllowedError
321         path, node = self._lookup_container(account, container)
322         self._check_policy(policy)
323         self._put_policy(node, policy, replace)
324     
325     @backend_method
326     def put_container(self, user, account, container, policy={}):
327         """Create a new container with the given name."""
328         
329         logger.debug("put_container: %s %s %s", account, container, policy)
330         if user != account:
331             raise NotAllowedError
332         try:
333             path, node = self._lookup_container(account, container)
334         except NameError:
335             pass
336         else:
337             raise NameError('Container already exists')
338         if policy:
339             self._check_policy(policy)
340         path = '/'.join((account, container))
341         node = self._put_path(user, self._lookup_account(account, True)[1], path)
342         self._put_policy(node, policy, True)
343     
344     @backend_method
345     def delete_container(self, user, account, container, until=None):
346         """Delete/purge the container with the given name."""
347         
348         logger.debug("delete_container: %s %s %s", account, container, until)
349         if user != account:
350             raise NotAllowedError
351         path, node = self._lookup_container(account, container)
352         
353         if until is not None:
354             hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
355             for h in hashes:
356                 self.store.map_delete(h)
357             self.node.node_purge_children(node, until, CLUSTER_DELETED)
358             return
359         
360         if self._get_statistics(node)[0] > 0:
361             raise IndexError('Container is not empty')
362         hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
363         for h in hashes:
364             self.store.map_delete(h)
365         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
366         self.node.node_remove(node)
367     
368     @backend_method
369     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
370         """Return a list of objects existing under a container."""
371         
372         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
373         allowed = []
374         if user != account:
375             if until:
376                 raise NotAllowedError
377             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
378             if not allowed:
379                 raise NotAllowedError
380         else:
381             if shared:
382                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
383                 if not allowed:
384                     return []
385         path, node = self._lookup_container(account, container)
386         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
387     
388     @backend_method
389     def list_object_meta(self, user, account, container, until=None):
390         """Return a list with all the container's object meta keys."""
391         
392         logger.debug("list_object_meta: %s %s %s", account, container, until)
393         allowed = []
394         if user != account:
395             if until:
396                 raise NotAllowedError
397             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
398             if not allowed:
399                 raise NotAllowedError
400         path, node = self._lookup_container(account, container)
401         before = until if until is not None else inf
402         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
403     
404     @backend_method
405     def get_object_meta(self, user, account, container, name, version=None):
406         """Return a dictionary with the object metadata."""
407         
408         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
409         self._can_read(user, account, container, name)
410         path, node = self._lookup_object(account, container, name)
411         props = self._get_version(node, version)
412         if version is None:
413             modified = props[self.MTIME]
414         else:
415             try:
416                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
417             except NameError: # Object may be deleted.
418                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
419                 if del_props is None:
420                     raise NameError('Object does not exist')
421                 modified = del_props[self.MTIME]
422         
423         meta = dict(self.node.attribute_get(props[self.SERIAL]))
424         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
425         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
426         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
427         return meta
428     
429     @backend_method
430     def update_object_meta(self, user, account, container, name, meta, replace=False):
431         """Update the metadata associated with the object."""
432         
433         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
434         self._can_write(user, account, container, name)
435         path, node = self._lookup_object(account, container, name)
436         src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
437         self._apply_versioning(account, container, src_version_id)
438         return dest_version_id
439     
440     @backend_method
441     def get_object_permissions(self, user, account, container, name):
442         """Return the action allowed on the object, the path
443         from which the object gets its permissions from,
444         along with a dictionary containing the permissions."""
445         
446         logger.debug("get_object_permissions: %s %s %s", account, container, name)
447         allowed = 'write'
448         if user != account:
449             path = '/'.join((account, container, name))
450             if self.permissions.access_check(path, self.WRITE, user):
451                 allowed = 'write'
452             elif self.permissions.access_check(path, self.READ, user):
453                 allowed = 'read'
454             else:
455                 raise NotAllowedError
456         path = self._lookup_object(account, container, name)[0]
457         return (allowed,) + self.permissions.access_inherit(path)
458     
459     @backend_method
460     def update_object_permissions(self, user, account, container, name, permissions):
461         """Update the permissions associated with the object."""
462         
463         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
464         if user != account:
465             raise NotAllowedError
466         path = self._lookup_object(account, container, name)[0]
467         self._check_permissions(path, permissions)
468         self.permissions.access_set(path, permissions)
469     
470     @backend_method
471     def get_object_public(self, user, account, container, name):
472         """Return the public id of the object if applicable."""
473         
474         logger.debug("get_object_public: %s %s %s", account, container, name)
475         self._can_read(user, account, container, name)
476         path = self._lookup_object(account, container, name)[0]
477         p = self.permissions.public_get(path)
478         if p is not None:
479             p += ULTIMATE_ANSWER
480         return p
481     
482     @backend_method
483     def update_object_public(self, user, account, container, name, public):
484         """Update the public status of the object."""
485         
486         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
487         self._can_write(user, account, container, name)
488         path = self._lookup_object(account, container, name)[0]
489         if not public:
490             self.permissions.public_unset(path)
491         else:
492             self.permissions.public_set(path)
493     
494     @backend_method
495     def get_object_hashmap(self, user, account, container, name, version=None):
496         """Return the object's size and a list with partial hashes."""
497         
498         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
499         self._can_read(user, account, container, name)
500         path, node = self._lookup_object(account, container, name)
501         props = self._get_version(node, version)
502         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
503         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
504     
505     def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
506         if permissions is not None and user != account:
507             raise NotAllowedError
508         self._can_write(user, account, container, name)
509         if permissions is not None:
510             path = '/'.join((account, container, name))
511             self._check_permissions(path, permissions)
512         
513         account_path, account_node = self._lookup_account(account, True)
514         container_path, container_node = self._lookup_container(account, container)
515         path, node = self._put_object_node(container_path, container_node, name)
516         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
517         
518         # Check quota.
519         size_delta = size # Change with versioning.
520         if size_delta > 0:
521             account_quota = long(self._get_policy(account_node)['quota'])
522             container_quota = long(self._get_policy(container_node)['quota'])
523             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
524                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
525                 # This must be executed in a transaction, so the version is never created if it fails.
526                 raise QuotaError
527         
528         if not replace_meta and src_version_id is not None:
529             self.node.attribute_copy(src_version_id, dest_version_id)
530         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
531         if permissions is not None:
532             self.permissions.access_set(path, permissions)
533         self._apply_versioning(account, container, src_version_id)
534         return dest_version_id
535     
536     @backend_method
537     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
538         """Create/update an object with the specified size and partial hashes."""
539         
540         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
541         if size == 0: # No such thing as an empty hashmap.
542             hashmap = [self.put_block('')]
543         map = HashMap(self.block_size, self.hash_algorithm)
544         map.extend([binascii.unhexlify(x) for x in hashmap])
545         missing = self.store.block_search(map)
546         if missing:
547             ie = IndexError()
548             ie.data = [binascii.hexlify(x) for x in missing]
549             raise ie
550         
551         hash = map.hash()
552         dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
553         self.store.map_put(hash, map)
554         return dest_version_id
555     
556     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
557         self._can_read(user, src_account, src_container, src_name)
558         path, node = self._lookup_object(src_account, src_container, src_name)
559         props = self._get_version(node, src_version)
560         src_version_id = props[self.SERIAL]
561         hash = props[self.HASH]
562         size = props[self.SIZE]
563         
564         if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
565             dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
566         else:
567             if replace_meta:
568                 meta = dest_meta
569             else:
570                 meta = {}
571             dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
572             if not replace_meta:
573                 self.node.attribute_copy(src_version_id, dest_version_id)
574                 self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
575         return dest_version_id
576     
577     @backend_method
578     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
579         """Copy an object's data and metadata."""
580         
581         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
582         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
583     
584     @backend_method
585     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
586         """Move an object's data and metadata."""
587         
588         logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
589         if user != src_account:
590             raise NotAllowedError
591         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
592         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
593             self._delete_object(user, src_account, src_container, src_name)
594         return dest_version_id
595     
596     def _delete_object(self, user, account, container, name, until=None):
597         if user != account:
598             raise NotAllowedError
599         
600         if until is not None:
601             path = '/'.join((account, container, name))
602             node = self.node.node_lookup(path)
603             if node is None:
604                 return
605             hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
606             hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
607             for h in hashes:
608                 self.store.map_delete(h)
609             self.node.node_purge(node, until, CLUSTER_DELETED)
610             try:
611                 props = self._get_version(node)
612             except NameError:
613                 self.permissions.access_clear(path)
614             return
615         
616         path, node = self._lookup_object(account, container, name)
617         src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
618         self._apply_versioning(account, container, src_version_id)
619         self.permissions.access_clear(path)
620     
621     @backend_method
622     def delete_object(self, user, account, container, name, until=None):
623         """Delete/purge an object."""
624         
625         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
626         self._delete_object(user, account, container, name, until)
627     
628     @backend_method
629     def list_versions(self, user, account, container, name):
630         """Return a list of all (version, version_timestamp) tuples for an object."""
631         
632         logger.debug("list_versions: %s %s %s", account, container, name)
633         self._can_read(user, account, container, name)
634         path, node = self._lookup_object(account, container, name)
635         versions = self.node.node_get_versions(node)
636         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
637     
638     @backend_method
639     def get_public(self, user, public):
640         """Return the (account, container, name) for the public id given."""
641         logger.debug("get_public: %s", public)
642         if public is None or public < ULTIMATE_ANSWER:
643             raise NameError
644         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
645         account, container, name = path.split('/', 2)
646         self._can_read(user, account, container, name)
647         return (account, container, name)
648     
649     @backend_method(autocommit=0)
650     def get_block(self, hash):
651         """Return a block's data."""
652         
653         logger.debug("get_block: %s", hash)
654         block = self.store.block_get(binascii.unhexlify(hash))
655         if not block:
656             raise NameError('Block does not exist')
657         return block
658     
659     @backend_method(autocommit=0)
660     def put_block(self, data):
661         """Store a block and return the hash."""
662         
663         logger.debug("put_block: %s", len(data))
664         return binascii.hexlify(self.store.block_put(data))
665     
666     @backend_method(autocommit=0)
667     def update_block(self, hash, data, offset=0):
668         """Update a known block and return the hash."""
669         
670         logger.debug("update_block: %s %s %s", hash, len(data), offset)
671         if offset == 0 and len(data) == self.block_size:
672             return self.put_block(data)
673         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
674         return binascii.hexlify(h)
675     
676     # Path functions.
677     
678     def _put_object_node(self, path, parent, name):
679         path = '/'.join((path, name))
680         node = self.node.node_lookup(path)
681         if node is None:
682             node = self.node.node_create(parent, path)
683         return path, node
684     
685     def _put_path(self, user, parent, path):
686         node = self.node.node_create(parent, path)
687         self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
688         return node
689     
690     def _lookup_account(self, account, create=True):
691         node = self.node.node_lookup(account)
692         if node is None and create:
693             node = self._put_path(account, self.ROOTNODE, account) # User is account.
694         return account, node
695     
696     def _lookup_container(self, account, container):
697         path = '/'.join((account, container))
698         node = self.node.node_lookup(path)
699         if node is None:
700             raise NameError('Container does not exist')
701         return path, node
702     
703     def _lookup_object(self, account, container, name):
704         path = '/'.join((account, container, name))
705         node = self.node.node_lookup(path)
706         if node is None:
707             raise NameError('Object does not exist')
708         return path, node
709     
710     def _get_properties(self, node, until=None):
711         """Return properties until the timestamp given."""
712         
713         before = until if until is not None else inf
714         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
715         if props is None and until is not None:
716             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
717         if props is None:
718             raise NameError('Path does not exist')
719         return props
720     
721     def _get_statistics(self, node, until=None):
722         """Return count, sum of size and latest timestamp of everything under node."""
723         
724         if until is None:
725             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
726         else:
727             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
728         if stats is None:
729             stats = (0, 0, 0)
730         return stats
731     
732     def _get_version(self, node, version=None):
733         if version is None:
734             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
735             if props is None:
736                 raise NameError('Object does not exist')
737         else:
738             try:
739                 version = int(version)
740             except ValueError:
741                 raise IndexError('Version does not exist')
742             props = self.node.version_get_properties(version)
743             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
744                 raise IndexError('Version does not exist')
745         return props
746     
747     def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
748         """Create a new version of the node."""
749         
750         props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
751         if props is not None:
752             src_version_id = props[self.SERIAL]
753             src_hash = props[self.HASH]
754             src_size = props[self.SIZE]
755         else:
756             src_version_id = None
757             src_hash = None
758             src_size = 0
759         if size is None:
760             hash = src_hash # This way hash can be set to None.
761             size = src_size
762         
763         if src_version_id is not None:
764             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
765         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
766         return src_version_id, dest_version_id
767     
768     def _put_metadata(self, user, node, meta, replace=False):
769         """Create a new version and store metadata."""
770         
771         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
772         
773         # TODO: Merge with other functions that update metadata...
774         if not replace:
775             if src_version_id is not None:
776                 self.node.attribute_copy(src_version_id, dest_version_id)
777             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
778             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
779         else:
780             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
781         return src_version_id, dest_version_id
782     
783     def _list_limits(self, listing, marker, limit):
784         start = 0
785         if marker:
786             try:
787                 start = listing.index(marker) + 1
788             except ValueError:
789                 pass
790         if not limit or limit > 10000:
791             limit = 10000
792         return start, limit
793     
794     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
795         cont_prefix = path + '/'
796         prefix = cont_prefix + prefix
797         start = cont_prefix + marker if marker else None
798         before = until if until is not None else inf
799         filterq = ','.join(keys) if keys else None
800         
801         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
802         objects.extend([(p, None) for p in prefixes] if virtual else [])
803         objects.sort(key=lambda x: x[0])
804         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
805         
806         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
807         return objects[start:start + limit]
808     
809     # Policy functions.
810     
811     def _check_policy(self, policy):
812         for k in policy.keys():
813             if policy[k] == '':
814                 policy[k] = self.default_policy.get(k)
815         for k, v in policy.iteritems():
816             if k == 'quota':
817                 q = int(v) # May raise ValueError.
818                 if q < 0:
819                     raise ValueError
820             elif k == 'versioning':
821                 if v not in ['auto', 'none']:
822                     raise ValueError
823             else:
824                 raise ValueError
825     
826     def _put_policy(self, node, policy, replace):
827         if replace:
828             for k, v in self.default_policy.iteritems():
829                 if k not in policy:
830                     policy[k] = v
831         self.node.policy_set(node, policy)
832     
833     def _get_policy(self, node):
834         policy = self.default_policy.copy()
835         policy.update(self.node.policy_get(node))
836         return policy
837     
838     def _apply_versioning(self, account, container, version_id):
839         if version_id is None:
840             return
841         path, node = self._lookup_container(account, container)
842         versioning = self._get_policy(node)['versioning']
843         if versioning != 'auto':
844             hash = self.node.version_remove(version_id)
845             self.store.map_delete(hash)
846     
847     # Access control functions.
848     
849     def _check_groups(self, groups):
850         # raise ValueError('Bad characters in groups')
851         pass
852     
853     def _check_permissions(self, path, permissions):
854         # raise ValueError('Bad characters in permissions')
855         
856         # Check for existing permissions.
857         paths = self.permissions.access_list(path)
858         if paths:
859             ae = AttributeError()
860             ae.data = paths
861             raise ae
862     
863     def _can_read(self, user, account, container, name):
864         if user == account:
865             return True
866         path = '/'.join((account, container, name))
867         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
868             raise NotAllowedError
869     
870     def _can_write(self, user, account, container, name):
871         if user == account:
872             return True
873         path = '/'.join((account, container, name))
874         if not self.permissions.access_check(path, self.WRITE, user):
875             raise NotAllowedError
876     
877     def _allowed_accounts(self, user):
878         allow = set()
879         for path in self.permissions.access_list_paths(user):
880             allow.add(path.split('/', 1)[0])
881         return sorted(allow)
882     
883     def _allowed_containers(self, user, account):
884         allow = set()
885         for path in self.permissions.access_list_paths(user, account):
886             allow.add(path.split('/', 2)[1])
887         return sorted(allow)