Write more realistic tests
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44
45 # Stripped-down version of the HashMap class found in tools.
46 class HashMap(list):
47
48     def __init__(self, blocksize, blockhash):
49         super(HashMap, self).__init__()
50         self.blocksize = blocksize
51         self.blockhash = blockhash
52
53     def _hash_raw(self, v):
54         h = hashlib.new(self.blockhash)
55         h.update(v)
56         return h.digest()
57
58     def hash(self):
59         if len(self) == 0:
60             return self._hash_raw('')
61         if len(self) == 1:
62             return self.__getitem__(0)
63
64         h = list(self)
65         s = 2
66         while s < len(h):
67             s = s * 2
68         h += [('\x00' * len(h[0]))] * (s - len(h))
69         while len(h) > 1:
70             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71         return h[0]
72
73 # Default modules and settings.
74 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77 DEFAULT_BLOCK_PATH = 'data/'
78 DEFAULT_BLOCK_UMASK = 0o022
79 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
82 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 QUEUE_CLIENT_ID = 'pithos'
84 QUEUE_INSTANCE_ID = '1'
85
86 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87
88 inf = float('inf')
89
90 ULTIMATE_ANSWER = 42
91
92
93 logger = logging.getLogger(__name__)
94
95
96 def backend_method(func=None, autocommit=1):
97     if func is None:
98         def fn(func):
99             return backend_method(func, autocommit)
100         return fn
101
102     if not autocommit:
103         return func
104     def fn(self, *args, **kw):
105         self.wrapper.execute()
106         try:
107             self.messages = []
108             ret = func(self, *args, **kw)
109             for m in self.messages:
110                 self.queue.send(*m)
111             self.wrapper.commit()
112             return ret
113         except:
114             self.wrapper.rollback()
115             raise
116     return fn
117
118
119 class ModularBackend(BaseBackend):
120     """A modular backend.
121     
122     Uses modules for SQL functions and storage.
123     """
124     
125     def __init__(self, db_module=None, db_connection=None,
126                  block_module=None, block_path=None, block_umask=None,
127                  queue_module=None, queue_connection=None):
128         db_module = db_module or DEFAULT_DB_MODULE
129         db_connection = db_connection or DEFAULT_DB_CONNECTION
130         block_module = block_module or DEFAULT_BLOCK_MODULE
131         block_path = block_path or DEFAULT_BLOCK_PATH
132         block_umask = block_umask or DEFAULT_BLOCK_UMASK
133         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135         
136         self.hash_algorithm = 'sha256'
137         self.block_size = 4 * 1024 * 1024 # 4MB
138         
139         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140         
141         def load_module(m):
142             __import__(m)
143             return sys.modules[m]
144         
145         self.db_module = load_module(db_module)
146         self.wrapper = self.db_module.DBWrapper(db_connection)
147         params = {'wrapper': self.wrapper}
148         self.permissions = self.db_module.Permissions(**params)
149         for x in ['READ', 'WRITE']:
150             setattr(self, x, getattr(self.db_module, x))
151         self.node = self.db_module.Node(**params)
152         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153             setattr(self, x, getattr(self.db_module, x))
154         
155         self.block_module = load_module(block_module)
156         params = {'path': block_path,
157                   'block_size': self.block_size,
158                   'hash_algorithm': self.hash_algorithm,
159                   'umask': block_umask}
160         self.store = self.block_module.Store(**params)
161
162         if queue_module and queue_connection:
163             self.queue_module = load_module(queue_module)
164             params = {'exchange': queue_connection,
165                       'client_id': QUEUE_CLIENT_ID}
166             self.queue = self.queue_module.Queue(**params)
167         else:
168             class NoQueue:
169                 def send(self, *args):
170                     pass
171                 
172                 def close(self):
173                     pass
174             
175             self.queue = NoQueue()
176     
177     def close(self):
178         self.wrapper.close()
179         self.queue.close()
180     
181     @backend_method
182     def list_accounts(self, user, marker=None, limit=10000):
183         """Return a list of accounts the user can access."""
184         
185         logger.debug("list_accounts: %s %s %s", user, marker, limit)
186         allowed = self._allowed_accounts(user)
187         start, limit = self._list_limits(allowed, marker, limit)
188         return allowed[start:start + limit]
189     
190     @backend_method
191     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192         """Return a dictionary with the account metadata for the domain."""
193         
194         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195         path, node = self._lookup_account(account, user == account)
196         if user != account:
197             if until or node is None or account not in self._allowed_accounts(user):
198                 raise NotAllowedError
199         try:
200             props = self._get_properties(node, until)
201             mtime = props[self.MTIME]
202         except NameError:
203             props = None
204             mtime = until
205         count, bytes, tstamp = self._get_statistics(node, until)
206         tstamp = max(tstamp, mtime)
207         if until is None:
208             modified = tstamp
209         else:
210             modified = self._get_statistics(node)[2] # Overall last modification.
211             modified = max(modified, mtime)
212         
213         if user != account:
214             meta = {'name': account}
215         else:
216             meta = {}
217             if props is not None and include_user_defined:
218                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219             if until is not None:
220                 meta.update({'until_timestamp': tstamp})
221             meta.update({'name': account, 'count': count, 'bytes': bytes})
222         meta.update({'modified': modified})
223         return meta
224     
225     @backend_method
226     def update_account_meta(self, user, account, domain, meta, replace=False):
227         """Update the metadata associated with the account for the domain."""
228         
229         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230         if user != account:
231             raise NotAllowedError
232         path, node = self._lookup_account(account, True)
233         self._put_metadata(user, node, domain, meta, replace)
234     
235     @backend_method
236     def get_account_groups(self, user, account):
237         """Return a dictionary with the user groups defined for this account."""
238         
239         logger.debug("get_account_groups: %s %s", user, account)
240         if user != account:
241             if account not in self._allowed_accounts(user):
242                 raise NotAllowedError
243             return {}
244         self._lookup_account(account, True)
245         return self.permissions.group_dict(account)
246     
247     @backend_method
248     def update_account_groups(self, user, account, groups, replace=False):
249         """Update the groups associated with the account."""
250         
251         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252         if user != account:
253             raise NotAllowedError
254         self._lookup_account(account, True)
255         self._check_groups(groups)
256         if replace:
257             self.permissions.group_destroy(account)
258         for k, v in groups.iteritems():
259             if not replace: # If not already deleted.
260                 self.permissions.group_delete(account, k)
261             if v:
262                 self.permissions.group_addmany(account, k, v)
263     
264     @backend_method
265     def get_account_policy(self, user, account):
266         """Return a dictionary with the account policy."""
267         
268         logger.debug("get_account_policy: %s %s", user, account)
269         if user != account:
270             if account not in self._allowed_accounts(user):
271                 raise NotAllowedError
272             return {}
273         path, node = self._lookup_account(account, True)
274         return self._get_policy(node)
275     
276     @backend_method
277     def update_account_policy(self, user, account, policy, replace=False):
278         """Update the policy associated with the account."""
279         
280         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281         if user != account:
282             raise NotAllowedError
283         path, node = self._lookup_account(account, True)
284         self._check_policy(policy)
285         self._put_policy(node, policy, replace)
286     
287     @backend_method
288     def put_account(self, user, account, policy={}):
289         """Create a new account with the given name."""
290         
291         logger.debug("put_account: %s %s %s", user, account, policy)
292         if user != account:
293             raise NotAllowedError
294         node = self.node.node_lookup(account)
295         if node is not None:
296             raise AccountExists('Account already exists')
297         if policy:
298             self._check_policy(policy)
299         node = self._put_path(user, self.ROOTNODE, account)
300         self._put_policy(node, policy, True)
301     
302     @backend_method
303     def delete_account(self, user, account):
304         """Delete the account with the given name."""
305         
306         logger.debug("delete_account: %s %s", user, account)
307         if user != account:
308             raise NotAllowedError
309         node = self.node.node_lookup(account)
310         if node is None:
311             return
312         if not self.node.node_remove(node):
313             raise AccountNotEmpty('Account is not empty')
314         self.permissions.group_destroy(account)
315     
316     @backend_method
317     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318         """Return a list of containers existing under an account."""
319         
320         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321         if user != account:
322             if until or account not in self._allowed_accounts(user):
323                 raise NotAllowedError
324             allowed = self._allowed_containers(user, account)
325             start, limit = self._list_limits(allowed, marker, limit)
326             return allowed[start:start + limit]
327         if shared or public:
328             allowed = set()
329             if shared:
330                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331             if public:
332                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333             allowed = sorted(allowed)
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339         return containers[start:start + limit]
340     
341     @backend_method
342     def list_container_meta(self, user, account, container, domain, until=None):
343         """Return a list with all the container's object meta keys for the domain."""
344         
345         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346         allowed = []
347         if user != account:
348             if until:
349                 raise NotAllowedError
350             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351             if not allowed:
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         before = until if until is not None else inf
355         allowed = self._get_formatted_paths(allowed)
356         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357     
358     @backend_method
359     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360         """Return a dictionary with the container metadata for the domain."""
361         
362         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363         if user != account:
364             if until or container not in self._allowed_containers(user, account):
365                 raise NotAllowedError
366         path, node = self._lookup_container(account, container)
367         props = self._get_properties(node, until)
368         mtime = props[self.MTIME]
369         count, bytes, tstamp = self._get_statistics(node, until)
370         tstamp = max(tstamp, mtime)
371         if until is None:
372             modified = tstamp
373         else:
374             modified = self._get_statistics(node)[2] # Overall last modification.
375             modified = max(modified, mtime)
376         
377         if user != account:
378             meta = {'name': container}
379         else:
380             meta = {}
381             if include_user_defined:
382                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383             if until is not None:
384                 meta.update({'until_timestamp': tstamp})
385             meta.update({'name': container, 'count': count, 'bytes': bytes})
386         meta.update({'modified': modified})
387         return meta
388     
389     @backend_method
390     def update_container_meta(self, user, account, container, domain, meta, replace=False):
391         """Update the metadata associated with the container for the domain."""
392         
393         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394         if user != account:
395             raise NotAllowedError
396         path, node = self._lookup_container(account, container)
397         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398         if src_version_id is not None:
399             versioning = self._get_policy(node)['versioning']
400             if versioning != 'auto':
401                 self.node.version_remove(src_version_id)
402     
403     @backend_method
404     def get_container_policy(self, user, account, container):
405         """Return a dictionary with the container policy."""
406         
407         logger.debug("get_container_policy: %s %s %s", user, account, container)
408         if user != account:
409             if container not in self._allowed_containers(user, account):
410                 raise NotAllowedError
411             return {}
412         path, node = self._lookup_container(account, container)
413         return self._get_policy(node)
414     
415     @backend_method
416     def update_container_policy(self, user, account, container, policy, replace=False):
417         """Update the policy associated with the container."""
418         
419         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420         if user != account:
421             raise NotAllowedError
422         path, node = self._lookup_container(account, container)
423         self._check_policy(policy)
424         self._put_policy(node, policy, replace)
425     
426     @backend_method
427     def put_container(self, user, account, container, policy={}):
428         """Create a new container with the given name."""
429         
430         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431         if user != account:
432             raise NotAllowedError
433         try:
434             path, node = self._lookup_container(account, container)
435         except NameError:
436             pass
437         else:
438             raise ContainerExists('Container already exists')
439         if policy:
440             self._check_policy(policy)
441         path = '/'.join((account, container))
442         node = self._put_path(user, self._lookup_account(account, True)[1], path)
443         self._put_policy(node, policy, True)
444     
445     @backend_method
446     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447         """Delete/purge the container with the given name."""
448         
449         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450         if user != account:
451             raise NotAllowedError
452         path, node = self._lookup_container(account, container)
453         
454         if until is not None:
455             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456             for h in hashes:
457                 self.store.map_delete(h)
458             self.node.node_purge_children(node, until, CLUSTER_DELETED)
459             self._report_size_change(user, account, -size, {'action': 'container purge'})
460             return
461         
462         if not delimiter:
463             if self._get_statistics(node)[0] > 0:
464                 raise ContainerNotEmpty('Container is not empty')
465             hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
466             for h in hashes:
467                 self.store.map_delete(h)
468             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
469             self.node.node_remove(node)
470             self._report_size_change(user, account, -size, {'action': 'container delete'})
471         else:
472                 # remove only contents
473             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
474             paths = []
475             for t in src_names:
476                 path = '/'.join((account, container, t[0]))
477                 node = t[2]
478                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
479                 del_size = self._apply_versioning(account, container, src_version_id)
480                 if del_size:
481                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
482                 self._report_object_change(user, account, path, details={'action': 'object delete'})
483                 paths.append(path)
484             self.permissions.access_clear_bulk(paths)
485     
486     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
487         if user != account and until:
488             raise NotAllowedError
489         if shared and public:
490             # get shared first
491             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
492             objects = set()
493             if shared:
494                 path, node = self._lookup_container(account, container)
495                 shared = self._get_formatted_paths(shared)
496                 objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
497             
498             # get public
499             objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
500             objects = list(objects)
501             
502             objects.sort(key=lambda x: x[0])
503             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
504             return objects[start:start + limit]
505         elif public:
506             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
507             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
508             return objects[start:start + limit]
509         
510         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
511         if shared and not allowed:
512             return []
513         path, node = self._lookup_container(account, container)
514         allowed = self._get_formatted_paths(allowed)
515         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
516         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
517         return objects[start:start + limit]
518     
519     def _list_public_object_properties(self, user, account, container, prefix, all_props):
520         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
521         paths, nodes = self._lookup_objects(public)
522         path = '/'.join((account, container))
523         cont_prefix = path + '/'
524         paths = [x[len(cont_prefix):] for x in paths]
525         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
526         objects = [(path,) + props for path, props in zip(paths, props)]
527         return objects
528         
529     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
530         objects = []
531         while True:
532             marker = objects[-1] if objects else None
533             limit = 10000
534             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
535             objects.extend(l)
536             if not l or len(l) < limit:
537                 break
538         return objects
539     
540     def _list_object_permissions(self, user, account, container, prefix, shared, public):
541         allowed = []
542         path = '/'.join((account, container, prefix)).rstrip('/')
543         if user != account:
544             allowed = self.permissions.access_list_paths(user, path)
545             if not allowed:
546                 raise NotAllowedError
547         else:
548             allowed = set()
549             if shared:
550                 allowed.update(self.permissions.access_list_shared(path))
551             if public:
552                 allowed.update([x[0] for x in self.permissions.public_list(path)])
553             allowed = sorted(allowed)
554             if not allowed:
555                 return []
556         return allowed
557     
558     @backend_method
559     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
560         """Return a list of object (name, version_id) tuples existing under a container."""
561         
562         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
563         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
564     
565     @backend_method
566     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
567         """Return a list of object metadata dicts existing under a container."""
568         
569         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
570         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
571         objects = []
572         for p in props:
573             if len(p) == 2:
574                 objects.append({'subdir': p[0]})
575             else:
576                 objects.append({'name': p[0],
577                                 'bytes': p[self.SIZE + 1],
578                                 'type': p[self.TYPE + 1],
579                                 'hash': p[self.HASH + 1],
580                                 'version': p[self.SERIAL + 1],
581                                 'version_timestamp': p[self.MTIME + 1],
582                                 'modified': p[self.MTIME + 1] if until is None else None,
583                                 'modified_by': p[self.MUSER + 1],
584                                 'uuid': p[self.UUID + 1],
585                                 'checksum': p[self.CHECKSUM + 1]})
586         return objects
587     
588     @backend_method
589     def list_object_permissions(self, user, account, container, prefix=''):
590         """Return a list of paths that enforce permissions under a container."""
591         
592         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
593         return self._list_object_permissions(user, account, container, prefix, True, False)
594     
595     @backend_method
596     def list_object_public(self, user, account, container, prefix=''):
597         """Return a dict mapping paths to public ids for objects that are public under a container."""
598         
599         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
600         public = {}
601         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
602             public[path] = p + ULTIMATE_ANSWER
603         return public
604     
605     @backend_method
606     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
607         """Return a dictionary with the object metadata for the domain."""
608         
609         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
610         self._can_read(user, account, container, name)
611         path, node = self._lookup_object(account, container, name)
612         props = self._get_version(node, version)
613         if version is None:
614             modified = props[self.MTIME]
615         else:
616             try:
617                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
618             except NameError: # Object may be deleted.
619                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
620                 if del_props is None:
621                     raise ItemNotExists('Object does not exist')
622                 modified = del_props[self.MTIME]
623         
624         meta = {}
625         if include_user_defined:
626             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
627         meta.update({'name': name,
628                      'bytes': props[self.SIZE],
629                      'type': props[self.TYPE],
630                      'hash': props[self.HASH],
631                      'version': props[self.SERIAL],
632                      'version_timestamp': props[self.MTIME],
633                      'modified': modified,
634                      'modified_by': props[self.MUSER],
635                      'uuid': props[self.UUID],
636                      'checksum': props[self.CHECKSUM]})
637         return meta
638     
639     @backend_method
640     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
641         """Update the metadata associated with the object for the domain and return the new version."""
642         
643         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
644         self._can_write(user, account, container, name)
645         path, node = self._lookup_object(account, container, name)
646         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
647         self._apply_versioning(account, container, src_version_id)
648         return dest_version_id
649     
650     @backend_method
651     def get_object_permissions(self, user, account, container, name):
652         """Return the action allowed on the object, the path
653         from which the object gets its permissions from,
654         along with a dictionary containing the permissions."""
655         
656         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
657         allowed = 'write'
658         permissions_path = self._get_permissions_path(account, container, name)
659         if user != account:
660             if self.permissions.access_check(permissions_path, self.WRITE, user):
661                 allowed = 'write'
662             elif self.permissions.access_check(permissions_path, self.READ, user):
663                 allowed = 'read'
664             else:
665                 raise NotAllowedError
666         self._lookup_object(account, container, name)
667         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
668     
669     @backend_method
670     def update_object_permissions(self, user, account, container, name, permissions):
671         """Update the permissions associated with the object."""
672         
673         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
674         if user != account:
675             raise NotAllowedError
676         path = self._lookup_object(account, container, name)[0]
677         self._check_permissions(path, permissions)
678         self.permissions.access_set(path, permissions)
679         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
680     
681     @backend_method
682     def get_object_public(self, user, account, container, name):
683         """Return the public id of the object if applicable."""
684         
685         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
686         self._can_read(user, account, container, name)
687         path = self._lookup_object(account, container, name)[0]
688         p = self.permissions.public_get(path)
689         if p is not None:
690             p += ULTIMATE_ANSWER
691         return p
692     
693     @backend_method
694     def update_object_public(self, user, account, container, name, public):
695         """Update the public status of the object."""
696         
697         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
698         self._can_write(user, account, container, name)
699         path = self._lookup_object(account, container, name)[0]
700         if not public:
701             self.permissions.public_unset(path)
702         else:
703             self.permissions.public_set(path)
704     
705     @backend_method
706     def get_object_hashmap(self, user, account, container, name, version=None):
707         """Return the object's size and a list with partial hashes."""
708         
709         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
710         self._can_read(user, account, container, name)
711         path, node = self._lookup_object(account, container, name)
712         props = self._get_version(node, version)
713         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
714         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
715     
716     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
717         if permissions is not None and user != account:
718             raise NotAllowedError
719         self._can_write(user, account, container, name)
720         if permissions is not None:
721             path = '/'.join((account, container, name))
722             self._check_permissions(path, permissions)
723         
724         account_path, account_node = self._lookup_account(account, True)
725         container_path, container_node = self._lookup_container(account, container)
726         path, node = self._put_object_node(container_path, container_node, name)
727         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
728         
729         # Handle meta.
730         if src_version_id is None:
731             src_version_id = pre_version_id
732         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
733         
734         # Check quota.
735         del_size = self._apply_versioning(account, container, pre_version_id)
736         size_delta = size - del_size
737         if size_delta > 0:
738             account_quota = long(self._get_policy(account_node)['quota'])
739             container_quota = long(self._get_policy(container_node)['quota'])
740             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
741                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
742                 # This must be executed in a transaction, so the version is never created if it fails.
743                 raise QuotaError
744         self._report_size_change(user, account, size_delta, {'action': 'object update'})
745         
746         if permissions is not None:
747             self.permissions.access_set(path, permissions)
748             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
749         
750         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
751         return dest_version_id
752     
753     @backend_method
754     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
755         """Create/update an object with the specified size and partial hashes."""
756         
757         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
758         if size == 0: # No such thing as an empty hashmap.
759             hashmap = [self.put_block('')]
760         map = HashMap(self.block_size, self.hash_algorithm)
761         map.extend([binascii.unhexlify(x) for x in hashmap])
762         missing = self.store.block_search(map)
763         if missing:
764             ie = IndexError()
765             ie.data = [binascii.hexlify(x) for x in missing]
766             raise ie
767         
768         hash = map.hash()
769         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
770         self.store.map_put(hash, map)
771         return dest_version_id
772     
773     @backend_method
774     def update_object_checksum(self, user, account, container, name, version, checksum):
775         """Update an object's checksum."""
776         
777         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
778         # Update objects with greater version and same hashmap and size (fix metadata updates).
779         self._can_write(user, account, container, name)
780         path, node = self._lookup_object(account, container, name)
781         props = self._get_version(node, version)
782         versions = self.node.node_get_versions(node)
783         for x in versions:
784             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
785                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
786     
787     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
788         dest_version_ids = []
789         self._can_read(user, src_account, src_container, src_name)
790         path, node = self._lookup_object(src_account, src_container, src_name)
791         # TODO: Will do another fetch of the properties in duplicate version...
792         props = self._get_version(node, src_version) # Check to see if source exists.
793         src_version_id = props[self.SERIAL]
794         hash = props[self.HASH]
795         size = props[self.SIZE]
796         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
797         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
798         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
799                 self._delete_object(user, src_account, src_container, src_name)
800         
801         if delimiter:
802             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
803             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
804             src_names.sort(key=lambda x: x[2]) # order by nodes
805             paths = [elem[0] for elem in src_names]
806             nodes = [elem[2] for elem in src_names]
807             # TODO: Will do another fetch of the properties in duplicate version...
808             props = self._get_versions(nodes) # Check to see if source exists.
809             
810             for prop, path, node in zip(props, paths, nodes):
811                 src_version_id = prop[self.SERIAL]
812                 hash = prop[self.HASH]
813                 vtype = prop[self.TYPE]
814                 size = prop[self.SIZE]
815                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
816                 vdest_name = path.replace(prefix, dest_prefix, 1)
817                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
818                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
819                         self._delete_object(user, src_account, src_container, path)
820         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
821     
822     @backend_method
823     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
824         """Copy an object's data and metadata."""
825         
826         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
827         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
828         return dest_version_id
829     
830     @backend_method
831     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
832         """Move an object's data and metadata."""
833         
834         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
835         if user != src_account:
836             raise NotAllowedError
837         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
838         return dest_version_id
839     
840     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
841         if user != account:
842             raise NotAllowedError
843         
844         if until is not None:
845             path = '/'.join((account, container, name))
846             node = self.node.node_lookup(path)
847             if node is None:
848                 return
849             hashes = []
850             size = 0
851             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
852             hashes += h
853             size += s
854             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
855             hashes += h
856             size += s
857             for h in hashes:
858                 self.store.map_delete(h)
859             self.node.node_purge(node, until, CLUSTER_DELETED)
860             try:
861                 props = self._get_version(node)
862             except NameError:
863                 self.permissions.access_clear(path)
864             self._report_size_change(user, account, -size, {'action': 'object purge'})
865             return
866         
867         path, node = self._lookup_object(account, container, name)
868         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
869         del_size = self._apply_versioning(account, container, src_version_id)
870         if del_size:
871             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
872         self._report_object_change(user, account, path, details={'action': 'object delete'})
873         self.permissions.access_clear(path)
874         
875         if delimiter:
876             prefix = name + delimiter if not name.endswith(delimiter) else name
877             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
878             paths = []
879             for t in src_names:
880                 path = '/'.join((account, container, t[0]))
881                 node = t[2]
882                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
883                 del_size = self._apply_versioning(account, container, src_version_id)
884                 if del_size:
885                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
886                 self._report_object_change(user, account, path, details={'action': 'object delete'})
887                 paths.append(path)
888             self.permissions.access_clear_bulk(paths)
889     
890     @backend_method
891     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
892         """Delete/purge an object."""
893         
894         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
895         self._delete_object(user, account, container, name, until, delimiter)
896     
897     @backend_method
898     def list_versions(self, user, account, container, name):
899         """Return a list of all (version, version_timestamp) tuples for an object."""
900         
901         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
902         self._can_read(user, account, container, name)
903         path, node = self._lookup_object(account, container, name)
904         versions = self.node.node_get_versions(node)
905         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
906     
907     @backend_method
908     def get_uuid(self, user, uuid):
909         """Return the (account, container, name) for the UUID given."""
910         
911         logger.debug("get_uuid: %s %s", user, uuid)
912         info = self.node.latest_uuid(uuid)
913         if info is None:
914             raise NameError
915         path, serial = info
916         account, container, name = path.split('/', 2)
917         self._can_read(user, account, container, name)
918         return (account, container, name)
919     
920     @backend_method
921     def get_public(self, user, public):
922         """Return the (account, container, name) for the public id given."""
923         
924         logger.debug("get_public: %s %s", user, public)
925         if public is None or public < ULTIMATE_ANSWER:
926             raise NameError
927         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
928         if path is None:
929             raise NameError
930         account, container, name = path.split('/', 2)
931         self._can_read(user, account, container, name)
932         return (account, container, name)
933     
934     @backend_method(autocommit=0)
935     def get_block(self, hash):
936         """Return a block's data."""
937         
938         logger.debug("get_block: %s", hash)
939         block = self.store.block_get(binascii.unhexlify(hash))
940         if not block:
941             raise ItemNotExists('Block does not exist')
942         return block
943     
944     @backend_method(autocommit=0)
945     def put_block(self, data):
946         """Store a block and return the hash."""
947         
948         logger.debug("put_block: %s", len(data))
949         return binascii.hexlify(self.store.block_put(data))
950     
951     @backend_method(autocommit=0)
952     def update_block(self, hash, data, offset=0):
953         """Update a known block and return the hash."""
954         
955         logger.debug("update_block: %s %s %s", hash, len(data), offset)
956         if offset == 0 and len(data) == self.block_size:
957             return self.put_block(data)
958         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
959         return binascii.hexlify(h)
960     
961     # Path functions.
962     
963     def _generate_uuid(self):
964         return str(uuidlib.uuid4())
965     
966     def _put_object_node(self, path, parent, name):
967         path = '/'.join((path, name))
968         node = self.node.node_lookup(path)
969         if node is None:
970             node = self.node.node_create(parent, path)
971         return path, node
972     
973     def _put_path(self, user, parent, path):
974         node = self.node.node_create(parent, path)
975         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
976         return node
977     
978     def _lookup_account(self, account, create=True):
979         node = self.node.node_lookup(account)
980         if node is None and create:
981             node = self._put_path(account, self.ROOTNODE, account) # User is account.
982         return account, node
983     
984     def _lookup_container(self, account, container):
985         path = '/'.join((account, container))
986         node = self.node.node_lookup(path)
987         if node is None:
988             raise ItemNotExists('Container does not exist')
989         return path, node
990     
991     def _lookup_object(self, account, container, name):
992         path = '/'.join((account, container, name))
993         node = self.node.node_lookup(path)
994         if node is None:
995             raise ItemNotExists('Object does not exist')
996         return path, node
997     
998     def _lookup_objects(self, paths):
999         nodes = self.node.node_lookup_bulk(paths)
1000         return paths, nodes
1001     
1002     def _get_properties(self, node, until=None):
1003         """Return properties until the timestamp given."""
1004         
1005         before = until if until is not None else inf
1006         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1007         if props is None and until is not None:
1008             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1009         if props is None:
1010             raise ItemNotExists('Path does not exist')
1011         return props
1012     
1013     def _get_statistics(self, node, until=None):
1014         """Return count, sum of size and latest timestamp of everything under node."""
1015         
1016         if until is None:
1017             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1018         else:
1019             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1020         if stats is None:
1021             stats = (0, 0, 0)
1022         return stats
1023     
1024     def _get_version(self, node, version=None):
1025         if version is None:
1026             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1027             if props is None:
1028                 raise ItemNotExists('Object does not exist')
1029         else:
1030             try:
1031                 version = int(version)
1032             except ValueError:
1033                 raise VersionNotExists('Version does not exist')
1034             props = self.node.version_get_properties(version)
1035             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1036                 raise VersionNotExists('Version does not exist')
1037         return props
1038
1039     def _get_versions(self, nodes):
1040         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1041     
1042     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1043         """Create a new version of the node."""
1044         
1045         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1046         if props is not None:
1047             src_version_id = props[self.SERIAL]
1048             src_hash = props[self.HASH]
1049             src_size = props[self.SIZE]
1050             src_type = props[self.TYPE]
1051             src_checksum = props[self.CHECKSUM]
1052         else:
1053             src_version_id = None
1054             src_hash = None
1055             src_size = 0
1056             src_type = ''
1057             src_checksum = ''
1058         if size is None: # Set metadata.
1059             hash = src_hash # This way hash can be set to None (account or container).
1060             size = src_size
1061         if type is None:
1062             type = src_type
1063         if checksum is None:
1064             checksum = src_checksum
1065         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1066         
1067         if src_node is None:
1068             pre_version_id = src_version_id
1069         else:
1070             pre_version_id = None
1071             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1072             if props is not None:
1073                 pre_version_id = props[self.SERIAL]
1074         if pre_version_id is not None:
1075             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1076         
1077         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1078         return pre_version_id, dest_version_id
1079     
1080     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1081         if src_version_id is not None:
1082             self.node.attribute_copy(src_version_id, dest_version_id)
1083         if not replace:
1084             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1085             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1086         else:
1087             self.node.attribute_del(dest_version_id, domain)
1088             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1089     
1090     def _put_metadata(self, user, node, domain, meta, replace=False):
1091         """Create a new version and store metadata."""
1092         
1093         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1094         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1095         return src_version_id, dest_version_id
1096     
1097     def _list_limits(self, listing, marker, limit):
1098         start = 0
1099         if marker:
1100             try:
1101                 start = listing.index(marker) + 1
1102             except ValueError:
1103                 pass
1104         if not limit or limit > 10000:
1105             limit = 10000
1106         return start, limit
1107     
1108     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1109         cont_prefix = path + '/'
1110         prefix = cont_prefix + prefix
1111         start = cont_prefix + marker if marker else None
1112         before = until if until is not None else inf
1113         filterq = keys if domain else []
1114         sizeq = size_range
1115         
1116         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1117         objects.extend([(p, None) for p in prefixes] if virtual else [])
1118         objects.sort(key=lambda x: x[0])
1119         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1120         return objects
1121         
1122     # Reporting functions.
1123     
1124     def _report_size_change(self, user, account, size, details={}):
1125         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1126         account_node = self._lookup_account(account, True)[1]
1127         total = self._get_statistics(account_node)[1]
1128         details.update({'user': user, 'total': total})
1129         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1130     
1131     def _report_object_change(self, user, account, path, details={}):
1132         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1133         details.update({'user': user, 'filename':path})
1134         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1135     
1136     def _report_sharing_change(self, user, account, path, details={}):
1137         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1138         details.update({'user': user})
1139         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1140     
1141     # Policy functions.
1142     
1143     def _check_policy(self, policy):
1144         for k in policy.keys():
1145             if policy[k] == '':
1146                 policy[k] = self.default_policy.get(k)
1147         for k, v in policy.iteritems():
1148             if k == 'quota':
1149                 q = int(v) # May raise ValueError.
1150                 if q < 0:
1151                     raise ValueError
1152             elif k == 'versioning':
1153                 if v not in ['auto', 'none']:
1154                     raise ValueError
1155             else:
1156                 raise ValueError
1157     
1158     def _put_policy(self, node, policy, replace):
1159         if replace:
1160             for k, v in self.default_policy.iteritems():
1161                 if k not in policy:
1162                     policy[k] = v
1163         self.node.policy_set(node, policy)
1164     
1165     def _get_policy(self, node):
1166         policy = self.default_policy.copy()
1167         policy.update(self.node.policy_get(node))
1168         return policy
1169     
1170     def _apply_versioning(self, account, container, version_id):
1171         """Delete the provided version if such is the policy.
1172            Return size of object removed.
1173         """
1174         
1175         if version_id is None:
1176             return 0
1177         path, node = self._lookup_container(account, container)
1178         versioning = self._get_policy(node)['versioning']
1179         if versioning != 'auto':
1180             hash, size = self.node.version_remove(version_id)
1181             self.store.map_delete(hash)
1182             return size
1183         return 0
1184     
1185     # Access control functions.
1186     
1187     def _check_groups(self, groups):
1188         # raise ValueError('Bad characters in groups')
1189         pass
1190     
1191     def _check_permissions(self, path, permissions):
1192         # raise ValueError('Bad characters in permissions')
1193         pass
1194     
1195     def _get_formatted_paths(self, paths):
1196         formatted = []
1197         for p in paths:
1198             node = self.node.node_lookup(p)
1199             if node is not None:
1200                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1201             if props is not None:
1202                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1203                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1204                 formatted.append((p, self.MATCH_EXACT))
1205         return formatted
1206     
1207     def _get_permissions_path(self, account, container, name):
1208         path = '/'.join((account, container, name))
1209         permission_paths = self.permissions.access_inherit(path)
1210         permission_paths.sort()
1211         permission_paths.reverse()
1212         for p in permission_paths:
1213             if p == path:
1214                 return p
1215             else:
1216                 if p.count('/') < 2:
1217                     continue
1218                 node = self.node.node_lookup(p)
1219                 if node is not None:
1220                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1221                 if props is not None:
1222                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1223                         return p
1224         return None
1225     
1226     def _can_read(self, user, account, container, name):
1227         if user == account:
1228             return True
1229         path = '/'.join((account, container, name))
1230         if self.permissions.public_get(path) is not None:
1231             return True
1232         path = self._get_permissions_path(account, container, name)
1233         if not path:
1234             raise NotAllowedError
1235         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1236             raise NotAllowedError
1237     
1238     def _can_write(self, user, account, container, name):
1239         if user == account:
1240             return True
1241         path = '/'.join((account, container, name))
1242         path = self._get_permissions_path(account, container, name)
1243         if not path:
1244             raise NotAllowedError
1245         if not self.permissions.access_check(path, self.WRITE, user):
1246             raise NotAllowedError
1247     
1248     def _allowed_accounts(self, user):
1249         allow = set()
1250         for path in self.permissions.access_list_paths(user):
1251             allow.add(path.split('/', 1)[0])
1252         return sorted(allow)
1253     
1254     def _allowed_containers(self, user, account):
1255         allow = set()
1256         for path in self.permissions.access_list_paths(user, account):
1257             allow.add(path.split('/', 2)[1])
1258         return sorted(allow)