Call for delete container contents
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44
45 # Stripped-down version of the HashMap class found in tools.
46 class HashMap(list):
47
48     def __init__(self, blocksize, blockhash):
49         super(HashMap, self).__init__()
50         self.blocksize = blocksize
51         self.blockhash = blockhash
52
53     def _hash_raw(self, v):
54         h = hashlib.new(self.blockhash)
55         h.update(v)
56         return h.digest()
57
58     def hash(self):
59         if len(self) == 0:
60             return self._hash_raw('')
61         if len(self) == 1:
62             return self.__getitem__(0)
63
64         h = list(self)
65         s = 2
66         while s < len(h):
67             s = s * 2
68         h += [('\x00' * len(h[0]))] * (s - len(h))
69         while len(h) > 1:
70             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71         return h[0]
72
73 # Default modules and settings.
74 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77 DEFAULT_BLOCK_PATH = 'data/'
78 DEFAULT_BLOCK_UMASK = 0o022
79 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
82 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 QUEUE_CLIENT_ID = 'pithos'
84 QUEUE_INSTANCE_ID = '1'
85
86 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87
88 inf = float('inf')
89
90 ULTIMATE_ANSWER = 42
91
92
93 logger = logging.getLogger(__name__)
94
95
96 def backend_method(func=None, autocommit=1):
97     if func is None:
98         def fn(func):
99             return backend_method(func, autocommit)
100         return fn
101
102     if not autocommit:
103         return func
104     def fn(self, *args, **kw):
105         self.wrapper.execute()
106         try:
107             self.messages = []
108             ret = func(self, *args, **kw)
109             for m in self.messages:
110                 self.queue.send(*m)
111             self.wrapper.commit()
112             return ret
113         except:
114             self.wrapper.rollback()
115             raise
116     return fn
117
118
119 class ModularBackend(BaseBackend):
120     """A modular backend.
121     
122     Uses modules for SQL functions and storage.
123     """
124     
125     def __init__(self, db_module=None, db_connection=None,
126                  block_module=None, block_path=None, block_umask=None,
127                  queue_module=None, queue_connection=None):
128         db_module = db_module or DEFAULT_DB_MODULE
129         db_connection = db_connection or DEFAULT_DB_CONNECTION
130         block_module = block_module or DEFAULT_BLOCK_MODULE
131         block_path = block_path or DEFAULT_BLOCK_PATH
132         block_umask = block_umask or DEFAULT_BLOCK_UMASK
133         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135         
136         self.hash_algorithm = 'sha256'
137         self.block_size = 4 * 1024 * 1024 # 4MB
138         
139         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140         
141         def load_module(m):
142             __import__(m)
143             return sys.modules[m]
144         
145         self.db_module = load_module(db_module)
146         self.wrapper = self.db_module.DBWrapper(db_connection)
147         params = {'wrapper': self.wrapper}
148         self.permissions = self.db_module.Permissions(**params)
149         for x in ['READ', 'WRITE']:
150             setattr(self, x, getattr(self.db_module, x))
151         self.node = self.db_module.Node(**params)
152         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153             setattr(self, x, getattr(self.db_module, x))
154         
155         self.block_module = load_module(block_module)
156         params = {'path': block_path,
157                   'block_size': self.block_size,
158                   'hash_algorithm': self.hash_algorithm,
159                   'umask': block_umask}
160         self.store = self.block_module.Store(**params)
161
162         if queue_module and queue_connection:
163             self.queue_module = load_module(queue_module)
164             params = {'exchange': queue_connection,
165                       'client_id': QUEUE_CLIENT_ID}
166             self.queue = self.queue_module.Queue(**params)
167         else:
168             class NoQueue:
169                 def send(self, *args):
170                     pass
171                 
172                 def close(self):
173                     pass
174             
175             self.queue = NoQueue()
176     
177     def close(self):
178         self.wrapper.close()
179         self.queue.close()
180     
181     @backend_method
182     def list_accounts(self, user, marker=None, limit=10000):
183         """Return a list of accounts the user can access."""
184         
185         logger.debug("list_accounts: %s %s %s", user, marker, limit)
186         allowed = self._allowed_accounts(user)
187         start, limit = self._list_limits(allowed, marker, limit)
188         return allowed[start:start + limit]
189     
190     @backend_method
191     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192         """Return a dictionary with the account metadata for the domain."""
193         
194         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195         path, node = self._lookup_account(account, user == account)
196         if user != account:
197             if until or node is None or account not in self._allowed_accounts(user):
198                 raise NotAllowedError
199         try:
200             props = self._get_properties(node, until)
201             mtime = props[self.MTIME]
202         except NameError:
203             props = None
204             mtime = until
205         count, bytes, tstamp = self._get_statistics(node, until)
206         tstamp = max(tstamp, mtime)
207         if until is None:
208             modified = tstamp
209         else:
210             modified = self._get_statistics(node)[2] # Overall last modification.
211             modified = max(modified, mtime)
212         
213         if user != account:
214             meta = {'name': account}
215         else:
216             meta = {}
217             if props is not None and include_user_defined:
218                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219             if until is not None:
220                 meta.update({'until_timestamp': tstamp})
221             meta.update({'name': account, 'count': count, 'bytes': bytes})
222         meta.update({'modified': modified})
223         return meta
224     
225     @backend_method
226     def update_account_meta(self, user, account, domain, meta, replace=False):
227         """Update the metadata associated with the account for the domain."""
228         
229         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230         if user != account:
231             raise NotAllowedError
232         path, node = self._lookup_account(account, True)
233         self._put_metadata(user, node, domain, meta, replace)
234     
235     @backend_method
236     def get_account_groups(self, user, account):
237         """Return a dictionary with the user groups defined for this account."""
238         
239         logger.debug("get_account_groups: %s %s", user, account)
240         if user != account:
241             if account not in self._allowed_accounts(user):
242                 raise NotAllowedError
243             return {}
244         self._lookup_account(account, True)
245         return self.permissions.group_dict(account)
246     
247     @backend_method
248     def update_account_groups(self, user, account, groups, replace=False):
249         """Update the groups associated with the account."""
250         
251         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252         if user != account:
253             raise NotAllowedError
254         self._lookup_account(account, True)
255         self._check_groups(groups)
256         if replace:
257             self.permissions.group_destroy(account)
258         for k, v in groups.iteritems():
259             if not replace: # If not already deleted.
260                 self.permissions.group_delete(account, k)
261             if v:
262                 self.permissions.group_addmany(account, k, v)
263     
264     @backend_method
265     def get_account_policy(self, user, account):
266         """Return a dictionary with the account policy."""
267         
268         logger.debug("get_account_policy: %s %s", user, account)
269         if user != account:
270             if account not in self._allowed_accounts(user):
271                 raise NotAllowedError
272             return {}
273         path, node = self._lookup_account(account, True)
274         return self._get_policy(node)
275     
276     @backend_method
277     def update_account_policy(self, user, account, policy, replace=False):
278         """Update the policy associated with the account."""
279         
280         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281         if user != account:
282             raise NotAllowedError
283         path, node = self._lookup_account(account, True)
284         self._check_policy(policy)
285         self._put_policy(node, policy, replace)
286     
287     @backend_method
288     def put_account(self, user, account, policy={}):
289         """Create a new account with the given name."""
290         
291         logger.debug("put_account: %s %s %s", user, account, policy)
292         if user != account:
293             raise NotAllowedError
294         node = self.node.node_lookup(account)
295         if node is not None:
296             raise AccountExists('Account already exists')
297         if policy:
298             self._check_policy(policy)
299         node = self._put_path(user, self.ROOTNODE, account)
300         self._put_policy(node, policy, True)
301     
302     @backend_method
303     def delete_account(self, user, account):
304         """Delete the account with the given name."""
305         
306         logger.debug("delete_account: %s %s", user, account)
307         if user != account:
308             raise NotAllowedError
309         node = self.node.node_lookup(account)
310         if node is None:
311             return
312         if not self.node.node_remove(node):
313             raise AccountNotEmpty('Account is not empty')
314         self.permissions.group_destroy(account)
315     
316     @backend_method
317     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318         """Return a list of containers existing under an account."""
319         
320         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321         if user != account:
322             if until or account not in self._allowed_accounts(user):
323                 raise NotAllowedError
324             allowed = self._allowed_containers(user, account)
325             start, limit = self._list_limits(allowed, marker, limit)
326             return allowed[start:start + limit]
327         if shared or public:
328             allowed = set()
329             if shared:
330                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331             if public:
332                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333             allowed = sorted(allowed)
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339         return containers[start:start + limit]
340     
341     @backend_method
342     def list_container_meta(self, user, account, container, domain, until=None):
343         """Return a list with all the container's object meta keys for the domain."""
344         
345         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346         allowed = []
347         if user != account:
348             if until:
349                 raise NotAllowedError
350             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351             if not allowed:
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         before = until if until is not None else inf
355         allowed = self._get_formatted_paths(allowed)
356         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357     
358     @backend_method
359     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360         """Return a dictionary with the container metadata for the domain."""
361         
362         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363         if user != account:
364             if until or container not in self._allowed_containers(user, account):
365                 raise NotAllowedError
366         path, node = self._lookup_container(account, container)
367         props = self._get_properties(node, until)
368         mtime = props[self.MTIME]
369         count, bytes, tstamp = self._get_statistics(node, until)
370         tstamp = max(tstamp, mtime)
371         if until is None:
372             modified = tstamp
373         else:
374             modified = self._get_statistics(node)[2] # Overall last modification.
375             modified = max(modified, mtime)
376         
377         if user != account:
378             meta = {'name': container}
379         else:
380             meta = {}
381             if include_user_defined:
382                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383             if until is not None:
384                 meta.update({'until_timestamp': tstamp})
385             meta.update({'name': container, 'count': count, 'bytes': bytes})
386         meta.update({'modified': modified})
387         return meta
388     
389     @backend_method
390     def update_container_meta(self, user, account, container, domain, meta, replace=False):
391         """Update the metadata associated with the container for the domain."""
392         
393         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394         if user != account:
395             raise NotAllowedError
396         path, node = self._lookup_container(account, container)
397         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398         if src_version_id is not None:
399             versioning = self._get_policy(node)['versioning']
400             if versioning != 'auto':
401                 self.node.version_remove(src_version_id)
402     
403     @backend_method
404     def get_container_policy(self, user, account, container):
405         """Return a dictionary with the container policy."""
406         
407         logger.debug("get_container_policy: %s %s %s", user, account, container)
408         if user != account:
409             if container not in self._allowed_containers(user, account):
410                 raise NotAllowedError
411             return {}
412         path, node = self._lookup_container(account, container)
413         return self._get_policy(node)
414     
415     @backend_method
416     def update_container_policy(self, user, account, container, policy, replace=False):
417         """Update the policy associated with the container."""
418         
419         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420         if user != account:
421             raise NotAllowedError
422         path, node = self._lookup_container(account, container)
423         self._check_policy(policy)
424         self._put_policy(node, policy, replace)
425     
426     @backend_method
427     def put_container(self, user, account, container, policy={}):
428         """Create a new container with the given name."""
429         
430         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431         if user != account:
432             raise NotAllowedError
433         try:
434             path, node = self._lookup_container(account, container)
435         except NameError:
436             pass
437         else:
438             raise ContainerExists('Container already exists')
439         if policy:
440             self._check_policy(policy)
441         path = '/'.join((account, container))
442         node = self._put_path(user, self._lookup_account(account, True)[1], path)
443         self._put_policy(node, policy, True)
444     
445     @backend_method
446     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447         """Delete/purge the container with the given name."""
448         
449         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450         if user != account:
451             raise NotAllowedError
452         path, node = self._lookup_container(account, container)
453         
454         if until is not None:
455             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456             for h in hashes:
457                 self.store.map_delete(h)
458             self.node.node_purge_children(node, until, CLUSTER_DELETED)
459             self._report_size_change(user, account, -size, {'action': 'container purge'})
460             return
461         
462         if not delimiter:
463             if self._get_statistics(node)[0] > 0:
464                 raise ContainerNotEmpty('Container is not empty')
465             hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
466             for h in hashes:
467                 self.store.map_delete(h)
468             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
469             self.node.node_remove(node)
470             self._report_size_change(user, account, -size, {'action': 'container delete'})
471         else:
472                 # remove only contents
473             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
474             paths = []
475             for t in src_names:
476                 path = '/'.join((account, container, t[0]))
477                 node = t[2]
478                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
479                 del_size = self._apply_versioning(account, container, src_version_id)
480                 if del_size:
481                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
482                 self._report_object_change(user, account, path, details={'action': 'object delete'})
483                 paths.append(path)
484             self.permissions.access_clear_bulk(paths)
485     
486     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
487         if user != account and until:
488             raise NotAllowedError
489         if shared and public:
490             # get shared first
491             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
492             objects = []
493             if shared:
494                 path, node = self._lookup_container(account, container)
495                 shared = self._get_formatted_paths(shared)
496                 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
497             
498             # get public
499             objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
500             
501             objects.sort(key=lambda x: x[0])
502             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
503             return objects[start:start + limit]
504         elif public:
505             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
506             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
507             return objects[start:start + limit]
508         
509         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
510         if shared and not allowed:
511             return []
512         path, node = self._lookup_container(account, container)
513         allowed = self._get_formatted_paths(allowed)
514         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
515         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
516         return objects[start:start + limit]
517     
518     def _list_public_object_properties(self, user, account, container, prefix, all_props):
519         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
520         paths, nodes = self._lookup_objects(public)
521         path = '/'.join((account, container))
522         cont_prefix = path + '/'
523         paths = [x[len(cont_prefix):] for x in paths]
524         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
525         objects = [(path,) + props for path, props in zip(paths, props)]
526         return objects
527         
528     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
529         objects = []
530         while True:
531             marker = objects[-1] if objects else None
532             limit = 10000
533             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
534             objects.extend(l)
535             if not l or len(l) < limit:
536                 break
537         return objects
538     
539     def _list_object_permissions(self, user, account, container, prefix, shared, public):
540         allowed = []
541         path = '/'.join((account, container, prefix)).rstrip('/')
542         if user != account:
543             allowed = self.permissions.access_list_paths(user, path)
544             if not allowed:
545                 raise NotAllowedError
546         else:
547             allowed = set()
548             if shared:
549                 allowed.update(self.permissions.access_list_shared(path))
550             if public:
551                 allowed.update([x[0] for x in self.permissions.public_list(path)])
552             allowed = sorted(allowed)
553             if not allowed:
554                 return []
555         return allowed
556     
557     @backend_method
558     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
559         """Return a list of object (name, version_id) tuples existing under a container."""
560         
561         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
562         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
563     
564     @backend_method
565     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
566         """Return a list of object metadata dicts existing under a container."""
567         
568         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
569         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
570         objects = []
571         for p in props:
572             if len(p) == 2:
573                 objects.append({'subdir': p[0]})
574             else:
575                 objects.append({'name': p[0],
576                                 'bytes': p[self.SIZE + 1],
577                                 'type': p[self.TYPE + 1],
578                                 'hash': p[self.HASH + 1],
579                                 'version': p[self.SERIAL + 1],
580                                 'version_timestamp': p[self.MTIME + 1],
581                                 'modified': p[self.MTIME + 1] if until is None else None,
582                                 'modified_by': p[self.MUSER + 1],
583                                 'uuid': p[self.UUID + 1],
584                                 'checksum': p[self.CHECKSUM + 1]})
585         return objects
586     
587     @backend_method
588     def list_object_permissions(self, user, account, container, prefix=''):
589         """Return a list of paths that enforce permissions under a container."""
590         
591         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
592         return self._list_object_permissions(user, account, container, prefix, True, False)
593     
594     @backend_method
595     def list_object_public(self, user, account, container, prefix=''):
596         """Return a dict mapping paths to public ids for objects that are public under a container."""
597         
598         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
599         public = {}
600         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
601             public[path] = p + ULTIMATE_ANSWER
602         return public
603     
604     @backend_method
605     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
606         """Return a dictionary with the object metadata for the domain."""
607         
608         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
609         self._can_read(user, account, container, name)
610         path, node = self._lookup_object(account, container, name)
611         props = self._get_version(node, version)
612         if version is None:
613             modified = props[self.MTIME]
614         else:
615             try:
616                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
617             except NameError: # Object may be deleted.
618                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
619                 if del_props is None:
620                     raise ItemNotExists('Object does not exist')
621                 modified = del_props[self.MTIME]
622         
623         meta = {}
624         if include_user_defined:
625             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
626         meta.update({'name': name,
627                      'bytes': props[self.SIZE],
628                      'type': props[self.TYPE],
629                      'hash': props[self.HASH],
630                      'version': props[self.SERIAL],
631                      'version_timestamp': props[self.MTIME],
632                      'modified': modified,
633                      'modified_by': props[self.MUSER],
634                      'uuid': props[self.UUID],
635                      'checksum': props[self.CHECKSUM]})
636         return meta
637     
638     @backend_method
639     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
640         """Update the metadata associated with the object for the domain and return the new version."""
641         
642         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
643         self._can_write(user, account, container, name)
644         path, node = self._lookup_object(account, container, name)
645         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
646         self._apply_versioning(account, container, src_version_id)
647         return dest_version_id
648     
649     @backend_method
650     def get_object_permissions(self, user, account, container, name):
651         """Return the action allowed on the object, the path
652         from which the object gets its permissions from,
653         along with a dictionary containing the permissions."""
654         
655         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
656         allowed = 'write'
657         permissions_path = self._get_permissions_path(account, container, name)
658         if user != account:
659             if self.permissions.access_check(permissions_path, self.WRITE, user):
660                 allowed = 'write'
661             elif self.permissions.access_check(permissions_path, self.READ, user):
662                 allowed = 'read'
663             else:
664                 raise NotAllowedError
665         self._lookup_object(account, container, name)
666         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
667     
668     @backend_method
669     def update_object_permissions(self, user, account, container, name, permissions):
670         """Update the permissions associated with the object."""
671         
672         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
673         if user != account:
674             raise NotAllowedError
675         path = self._lookup_object(account, container, name)[0]
676         self._check_permissions(path, permissions)
677         self.permissions.access_set(path, permissions)
678         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
679     
680     @backend_method
681     def get_object_public(self, user, account, container, name):
682         """Return the public id of the object if applicable."""
683         
684         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
685         self._can_read(user, account, container, name)
686         path = self._lookup_object(account, container, name)[0]
687         p = self.permissions.public_get(path)
688         if p is not None:
689             p += ULTIMATE_ANSWER
690         return p
691     
692     @backend_method
693     def update_object_public(self, user, account, container, name, public):
694         """Update the public status of the object."""
695         
696         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
697         self._can_write(user, account, container, name)
698         path = self._lookup_object(account, container, name)[0]
699         if not public:
700             self.permissions.public_unset(path)
701         else:
702             self.permissions.public_set(path)
703     
704     @backend_method
705     def get_object_hashmap(self, user, account, container, name, version=None):
706         """Return the object's size and a list with partial hashes."""
707         
708         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
709         self._can_read(user, account, container, name)
710         path, node = self._lookup_object(account, container, name)
711         props = self._get_version(node, version)
712         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
713         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
714     
715     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
716         if permissions is not None and user != account:
717             raise NotAllowedError
718         self._can_write(user, account, container, name)
719         if permissions is not None:
720             path = '/'.join((account, container, name))
721             self._check_permissions(path, permissions)
722         
723         account_path, account_node = self._lookup_account(account, True)
724         container_path, container_node = self._lookup_container(account, container)
725         path, node = self._put_object_node(container_path, container_node, name)
726         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
727         
728         # Handle meta.
729         if src_version_id is None:
730             src_version_id = pre_version_id
731         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
732         
733         # Check quota.
734         del_size = self._apply_versioning(account, container, pre_version_id)
735         size_delta = size - del_size
736         if size_delta > 0:
737             account_quota = long(self._get_policy(account_node)['quota'])
738             container_quota = long(self._get_policy(container_node)['quota'])
739             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
740                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
741                 # This must be executed in a transaction, so the version is never created if it fails.
742                 raise QuotaError
743         self._report_size_change(user, account, size_delta, {'action': 'object update'})
744         
745         if permissions is not None:
746             self.permissions.access_set(path, permissions)
747             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
748         
749         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
750         return dest_version_id
751     
752     @backend_method
753     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
754         """Create/update an object with the specified size and partial hashes."""
755         
756         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
757         if size == 0: # No such thing as an empty hashmap.
758             hashmap = [self.put_block('')]
759         map = HashMap(self.block_size, self.hash_algorithm)
760         map.extend([binascii.unhexlify(x) for x in hashmap])
761         missing = self.store.block_search(map)
762         if missing:
763             ie = IndexError()
764             ie.data = [binascii.hexlify(x) for x in missing]
765             raise ie
766         
767         hash = map.hash()
768         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
769         self.store.map_put(hash, map)
770         return dest_version_id
771     
772     @backend_method
773     def update_object_checksum(self, user, account, container, name, version, checksum):
774         """Update an object's checksum."""
775         
776         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
777         # Update objects with greater version and same hashmap and size (fix metadata updates).
778         self._can_write(user, account, container, name)
779         path, node = self._lookup_object(account, container, name)
780         props = self._get_version(node, version)
781         versions = self.node.node_get_versions(node)
782         for x in versions:
783             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
784                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
785     
786     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
787         dest_version_ids = []
788         self._can_read(user, src_account, src_container, src_name)
789         path, node = self._lookup_object(src_account, src_container, src_name)
790         # TODO: Will do another fetch of the properties in duplicate version...
791         props = self._get_version(node, src_version) # Check to see if source exists.
792         src_version_id = props[self.SERIAL]
793         hash = props[self.HASH]
794         size = props[self.SIZE]
795         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
796         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
797         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
798                 self._delete_object(user, src_account, src_container, src_name)
799         
800         if delimiter:
801             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
802             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
803             src_names.sort(key=lambda x: x[2]) # order by nodes
804             paths = [elem[0] for elem in src_names]
805             nodes = [elem[2] for elem in src_names]
806             # TODO: Will do another fetch of the properties in duplicate version...
807             props = self._get_versions(nodes) # Check to see if source exists.
808             
809             for prop, path, node in zip(props, paths, nodes):
810                 src_version_id = prop[self.SERIAL]
811                 hash = prop[self.HASH]
812                 vtype = prop[self.TYPE]
813                 size = prop[self.SIZE]
814                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
815                 vdest_name = path.replace(prefix, dest_prefix, 1)
816                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
817                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
818                         self._delete_object(user, src_account, src_container, path)
819         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
820     
821     @backend_method
822     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
823         """Copy an object's data and metadata."""
824         
825         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
826         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
827         return dest_version_id
828     
829     @backend_method
830     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
831         """Move an object's data and metadata."""
832         
833         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
834         if user != src_account:
835             raise NotAllowedError
836         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
837         return dest_version_id
838     
839     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
840         if user != account:
841             raise NotAllowedError
842         
843         if until is not None:
844             path = '/'.join((account, container, name))
845             node = self.node.node_lookup(path)
846             if node is None:
847                 return
848             hashes = []
849             size = 0
850             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
851             hashes += h
852             size += s
853             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
854             hashes += h
855             size += s
856             for h in hashes:
857                 self.store.map_delete(h)
858             self.node.node_purge(node, until, CLUSTER_DELETED)
859             try:
860                 props = self._get_version(node)
861             except NameError:
862                 self.permissions.access_clear(path)
863             self._report_size_change(user, account, -size, {'action': 'object purge'})
864             return
865         
866         path, node = self._lookup_object(account, container, name)
867         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
868         del_size = self._apply_versioning(account, container, src_version_id)
869         if del_size:
870             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
871         self._report_object_change(user, account, path, details={'action': 'object delete'})
872         self.permissions.access_clear(path)
873         
874         if delimiter:
875             prefix = name + delimiter if not name.endswith(delimiter) else name
876             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
877             paths = []
878             for t in src_names:
879                 path = '/'.join((account, container, t[0]))
880                 node = t[2]
881                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
882                 del_size = self._apply_versioning(account, container, src_version_id)
883                 if del_size:
884                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
885                 self._report_object_change(user, account, path, details={'action': 'object delete'})
886                 paths.append(path)
887             self.permissions.access_clear_bulk(paths)
888     
889     @backend_method
890     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
891         """Delete/purge an object."""
892         
893         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
894         self._delete_object(user, account, container, name, until, delimiter)
895     
896     @backend_method
897     def list_versions(self, user, account, container, name):
898         """Return a list of all (version, version_timestamp) tuples for an object."""
899         
900         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
901         self._can_read(user, account, container, name)
902         path, node = self._lookup_object(account, container, name)
903         versions = self.node.node_get_versions(node)
904         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
905     
906     @backend_method
907     def get_uuid(self, user, uuid):
908         """Return the (account, container, name) for the UUID given."""
909         
910         logger.debug("get_uuid: %s %s", user, uuid)
911         info = self.node.latest_uuid(uuid)
912         if info is None:
913             raise NameError
914         path, serial = info
915         account, container, name = path.split('/', 2)
916         self._can_read(user, account, container, name)
917         return (account, container, name)
918     
919     @backend_method
920     def get_public(self, user, public):
921         """Return the (account, container, name) for the public id given."""
922         
923         logger.debug("get_public: %s %s", user, public)
924         if public is None or public < ULTIMATE_ANSWER:
925             raise NameError
926         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
927         if path is None:
928             raise NameError
929         account, container, name = path.split('/', 2)
930         self._can_read(user, account, container, name)
931         return (account, container, name)
932     
933     @backend_method(autocommit=0)
934     def get_block(self, hash):
935         """Return a block's data."""
936         
937         logger.debug("get_block: %s", hash)
938         block = self.store.block_get(binascii.unhexlify(hash))
939         if not block:
940             raise ItemNotExists('Block does not exist')
941         return block
942     
943     @backend_method(autocommit=0)
944     def put_block(self, data):
945         """Store a block and return the hash."""
946         
947         logger.debug("put_block: %s", len(data))
948         return binascii.hexlify(self.store.block_put(data))
949     
950     @backend_method(autocommit=0)
951     def update_block(self, hash, data, offset=0):
952         """Update a known block and return the hash."""
953         
954         logger.debug("update_block: %s %s %s", hash, len(data), offset)
955         if offset == 0 and len(data) == self.block_size:
956             return self.put_block(data)
957         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
958         return binascii.hexlify(h)
959     
960     # Path functions.
961     
962     def _generate_uuid(self):
963         return str(uuidlib.uuid4())
964     
965     def _put_object_node(self, path, parent, name):
966         path = '/'.join((path, name))
967         node = self.node.node_lookup(path)
968         if node is None:
969             node = self.node.node_create(parent, path)
970         return path, node
971     
972     def _put_path(self, user, parent, path):
973         node = self.node.node_create(parent, path)
974         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
975         return node
976     
977     def _lookup_account(self, account, create=True):
978         node = self.node.node_lookup(account)
979         if node is None and create:
980             node = self._put_path(account, self.ROOTNODE, account) # User is account.
981         return account, node
982     
983     def _lookup_container(self, account, container):
984         path = '/'.join((account, container))
985         node = self.node.node_lookup(path)
986         if node is None:
987             raise ItemNotExists('Container does not exist')
988         return path, node
989     
990     def _lookup_object(self, account, container, name):
991         path = '/'.join((account, container, name))
992         node = self.node.node_lookup(path)
993         if node is None:
994             raise ItemNotExists('Object does not exist')
995         return path, node
996     
997     def _lookup_objects(self, paths):
998         nodes = self.node.node_lookup_bulk(paths)
999         return paths, nodes
1000     
1001     def _get_properties(self, node, until=None):
1002         """Return properties until the timestamp given."""
1003         
1004         before = until if until is not None else inf
1005         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1006         if props is None and until is not None:
1007             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1008         if props is None:
1009             raise ItemNotExists('Path does not exist')
1010         return props
1011     
1012     def _get_statistics(self, node, until=None):
1013         """Return count, sum of size and latest timestamp of everything under node."""
1014         
1015         if until is None:
1016             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1017         else:
1018             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1019         if stats is None:
1020             stats = (0, 0, 0)
1021         return stats
1022     
1023     def _get_version(self, node, version=None):
1024         if version is None:
1025             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1026             if props is None:
1027                 raise ItemNotExists('Object does not exist')
1028         else:
1029             try:
1030                 version = int(version)
1031             except ValueError:
1032                 raise VersionNotExists('Version does not exist')
1033             props = self.node.version_get_properties(version)
1034             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1035                 raise VersionNotExists('Version does not exist')
1036         return props
1037
1038     def _get_versions(self, nodes):
1039         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1040     
1041     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1042         """Create a new version of the node."""
1043         
1044         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1045         if props is not None:
1046             src_version_id = props[self.SERIAL]
1047             src_hash = props[self.HASH]
1048             src_size = props[self.SIZE]
1049             src_type = props[self.TYPE]
1050             src_checksum = props[self.CHECKSUM]
1051         else:
1052             src_version_id = None
1053             src_hash = None
1054             src_size = 0
1055             src_type = ''
1056             src_checksum = ''
1057         if size is None: # Set metadata.
1058             hash = src_hash # This way hash can be set to None (account or container).
1059             size = src_size
1060         if type is None:
1061             type = src_type
1062         if checksum is None:
1063             checksum = src_checksum
1064         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1065         
1066         if src_node is None:
1067             pre_version_id = src_version_id
1068         else:
1069             pre_version_id = None
1070             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1071             if props is not None:
1072                 pre_version_id = props[self.SERIAL]
1073         if pre_version_id is not None:
1074             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1075         
1076         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1077         return pre_version_id, dest_version_id
1078     
1079     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1080         if src_version_id is not None:
1081             self.node.attribute_copy(src_version_id, dest_version_id)
1082         if not replace:
1083             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1084             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1085         else:
1086             self.node.attribute_del(dest_version_id, domain)
1087             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1088     
1089     def _put_metadata(self, user, node, domain, meta, replace=False):
1090         """Create a new version and store metadata."""
1091         
1092         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1093         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1094         return src_version_id, dest_version_id
1095     
1096     def _list_limits(self, listing, marker, limit):
1097         start = 0
1098         if marker:
1099             try:
1100                 start = listing.index(marker) + 1
1101             except ValueError:
1102                 pass
1103         if not limit or limit > 10000:
1104             limit = 10000
1105         return start, limit
1106     
1107     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1108         cont_prefix = path + '/'
1109         prefix = cont_prefix + prefix
1110         start = cont_prefix + marker if marker else None
1111         before = until if until is not None else inf
1112         filterq = keys if domain else []
1113         sizeq = size_range
1114         
1115         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1116         objects.extend([(p, None) for p in prefixes] if virtual else [])
1117         objects.sort(key=lambda x: x[0])
1118         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1119         return objects
1120         
1121     # Reporting functions.
1122     
1123     def _report_size_change(self, user, account, size, details={}):
1124         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1125         account_node = self._lookup_account(account, True)[1]
1126         total = self._get_statistics(account_node)[1]
1127         details.update({'user': user, 'total': total})
1128         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1129     
1130     def _report_object_change(self, user, account, path, details={}):
1131         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1132         details.update({'user': user})
1133         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1134     
1135     def _report_sharing_change(self, user, account, path, details={}):
1136         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1137         details.update({'user': user})
1138         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1139     
1140     # Policy functions.
1141     
1142     def _check_policy(self, policy):
1143         for k in policy.keys():
1144             if policy[k] == '':
1145                 policy[k] = self.default_policy.get(k)
1146         for k, v in policy.iteritems():
1147             if k == 'quota':
1148                 q = int(v) # May raise ValueError.
1149                 if q < 0:
1150                     raise ValueError
1151             elif k == 'versioning':
1152                 if v not in ['auto', 'none']:
1153                     raise ValueError
1154             else:
1155                 raise ValueError
1156     
1157     def _put_policy(self, node, policy, replace):
1158         if replace:
1159             for k, v in self.default_policy.iteritems():
1160                 if k not in policy:
1161                     policy[k] = v
1162         self.node.policy_set(node, policy)
1163     
1164     def _get_policy(self, node):
1165         policy = self.default_policy.copy()
1166         policy.update(self.node.policy_get(node))
1167         return policy
1168     
1169     def _apply_versioning(self, account, container, version_id):
1170         """Delete the provided version if such is the policy.
1171            Return size of object removed.
1172         """
1173         
1174         if version_id is None:
1175             return 0
1176         path, node = self._lookup_container(account, container)
1177         versioning = self._get_policy(node)['versioning']
1178         if versioning != 'auto':
1179             hash, size = self.node.version_remove(version_id)
1180             self.store.map_delete(hash)
1181             return size
1182         return 0
1183     
1184     # Access control functions.
1185     
1186     def _check_groups(self, groups):
1187         # raise ValueError('Bad characters in groups')
1188         pass
1189     
1190     def _check_permissions(self, path, permissions):
1191         # raise ValueError('Bad characters in permissions')
1192         pass
1193     
1194     def _get_formatted_paths(self, paths):
1195         formatted = []
1196         for p in paths:
1197             node = self.node.node_lookup(p)
1198             if node is not None:
1199                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1200             if props is not None:
1201                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1202                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1203                 formatted.append((p, self.MATCH_EXACT))
1204         return formatted
1205     
1206     def _get_permissions_path(self, account, container, name):
1207         path = '/'.join((account, container, name))
1208         permission_paths = self.permissions.access_inherit(path)
1209         permission_paths.sort()
1210         permission_paths.reverse()
1211         for p in permission_paths:
1212             if p == path:
1213                 return p
1214             else:
1215                 if p.count('/') < 2:
1216                     continue
1217                 node = self.node.node_lookup(p)
1218                 if node is not None:
1219                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1220                 if props is not None:
1221                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1222                         return p
1223         return None
1224     
1225     def _can_read(self, user, account, container, name):
1226         if user == account:
1227             return True
1228         path = '/'.join((account, container, name))
1229         if self.permissions.public_get(path) is not None:
1230             return True
1231         path = self._get_permissions_path(account, container, name)
1232         if not path:
1233             raise NotAllowedError
1234         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1235             raise NotAllowedError
1236     
1237     def _can_write(self, user, account, container, name):
1238         if user == account:
1239             return True
1240         path = '/'.join((account, container, name))
1241         path = self._get_permissions_path(account, container, name)
1242         if not path:
1243             raise NotAllowedError
1244         if not self.permissions.access_check(path, self.WRITE, user):
1245             raise NotAllowedError
1246     
1247     def _allowed_accounts(self, user):
1248         allow = set()
1249         for path in self.permissions.access_list_paths(user):
1250             allow.add(path.split('/', 1)[0])
1251         return sorted(allow)
1252     
1253     def _allowed_containers(self, user, account):
1254         allow = set()
1255         for path in self.permissions.access_list_paths(user, account):
1256             allow.add(path.split('/', 2)[1])
1257         return sorted(allow)