change backend to raise custom exceptions
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44
45 # Stripped-down version of the HashMap class found in tools.
46 class HashMap(list):
47
48     def __init__(self, blocksize, blockhash):
49         super(HashMap, self).__init__()
50         self.blocksize = blocksize
51         self.blockhash = blockhash
52
53     def _hash_raw(self, v):
54         h = hashlib.new(self.blockhash)
55         h.update(v)
56         return h.digest()
57
58     def hash(self):
59         if len(self) == 0:
60             return self._hash_raw('')
61         if len(self) == 1:
62             return self.__getitem__(0)
63
64         h = list(self)
65         s = 2
66         while s < len(h):
67             s = s * 2
68         h += [('\x00' * len(h[0]))] * (s - len(h))
69         while len(h) > 1:
70             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71         return h[0]
72
73 # Default modules and settings.
74 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77 DEFAULT_BLOCK_PATH = 'data/'
78 DEFAULT_BLOCK_UMASK = 0o022
79 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
82 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 QUEUE_CLIENT_ID = 'pithos'
84 QUEUE_INSTANCE_ID = '1'
85
86 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87
88 inf = float('inf')
89
90 ULTIMATE_ANSWER = 42
91
92
93 logger = logging.getLogger(__name__)
94
95
96 def backend_method(func=None, autocommit=1):
97     if func is None:
98         def fn(func):
99             return backend_method(func, autocommit)
100         return fn
101
102     if not autocommit:
103         return func
104     def fn(self, *args, **kw):
105         self.wrapper.execute()
106         try:
107             self.messages = []
108             ret = func(self, *args, **kw)
109             for m in self.messages:
110                 self.queue.send(*m)
111             self.wrapper.commit()
112             return ret
113         except:
114             self.wrapper.rollback()
115             raise
116     return fn
117
118
119 class ModularBackend(BaseBackend):
120     """A modular backend.
121     
122     Uses modules for SQL functions and storage.
123     """
124     
125     def __init__(self, db_module=None, db_connection=None,
126                  block_module=None, block_path=None, block_umask=None,
127                  queue_module=None, queue_connection=None):
128         db_module = db_module or DEFAULT_DB_MODULE
129         db_connection = db_connection or DEFAULT_DB_CONNECTION
130         block_module = block_module or DEFAULT_BLOCK_MODULE
131         block_path = block_path or DEFAULT_BLOCK_PATH
132         block_umask = block_umask or DEFAULT_BLOCK_UMASK
133         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135         
136         self.hash_algorithm = 'sha256'
137         self.block_size = 4 * 1024 * 1024 # 4MB
138         
139         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140         
141         def load_module(m):
142             __import__(m)
143             return sys.modules[m]
144         
145         self.db_module = load_module(db_module)
146         self.wrapper = self.db_module.DBWrapper(db_connection)
147         params = {'wrapper': self.wrapper}
148         self.permissions = self.db_module.Permissions(**params)
149         for x in ['READ', 'WRITE']:
150             setattr(self, x, getattr(self.db_module, x))
151         self.node = self.db_module.Node(**params)
152         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153             setattr(self, x, getattr(self.db_module, x))
154         
155         self.block_module = load_module(block_module)
156         params = {'path': block_path,
157                   'block_size': self.block_size,
158                   'hash_algorithm': self.hash_algorithm,
159                   'umask': block_umask}
160         self.store = self.block_module.Store(**params)
161
162         if queue_module and queue_connection:
163             self.queue_module = load_module(queue_module)
164             params = {'exchange': queue_connection,
165                       'client_id': QUEUE_CLIENT_ID}
166             self.queue = self.queue_module.Queue(**params)
167         else:
168             class NoQueue:
169                 def send(self, *args):
170                     pass
171                 
172                 def close(self):
173                     pass
174             
175             self.queue = NoQueue()
176     
177     def close(self):
178         self.wrapper.close()
179         self.queue.close()
180     
181     @backend_method
182     def list_accounts(self, user, marker=None, limit=10000):
183         """Return a list of accounts the user can access."""
184         
185         logger.debug("list_accounts: %s %s %s", user, marker, limit)
186         allowed = self._allowed_accounts(user)
187         start, limit = self._list_limits(allowed, marker, limit)
188         return allowed[start:start + limit]
189     
190     @backend_method
191     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192         """Return a dictionary with the account metadata for the domain."""
193         
194         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195         path, node = self._lookup_account(account, user == account)
196         if user != account:
197             if until or node is None or account not in self._allowed_accounts(user):
198                 raise NotAllowedError
199         try:
200             props = self._get_properties(node, until)
201             mtime = props[self.MTIME]
202         except NameError:
203             props = None
204             mtime = until
205         count, bytes, tstamp = self._get_statistics(node, until)
206         tstamp = max(tstamp, mtime)
207         if until is None:
208             modified = tstamp
209         else:
210             modified = self._get_statistics(node)[2] # Overall last modification.
211             modified = max(modified, mtime)
212         
213         if user != account:
214             meta = {'name': account}
215         else:
216             meta = {}
217             if props is not None and include_user_defined:
218                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219             if until is not None:
220                 meta.update({'until_timestamp': tstamp})
221             meta.update({'name': account, 'count': count, 'bytes': bytes})
222         meta.update({'modified': modified})
223         return meta
224     
225     @backend_method
226     def update_account_meta(self, user, account, domain, meta, replace=False):
227         """Update the metadata associated with the account for the domain."""
228         
229         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230         if user != account:
231             raise NotAllowedError
232         path, node = self._lookup_account(account, True)
233         self._put_metadata(user, node, domain, meta, replace)
234     
235     @backend_method
236     def get_account_groups(self, user, account):
237         """Return a dictionary with the user groups defined for this account."""
238         
239         logger.debug("get_account_groups: %s %s", user, account)
240         if user != account:
241             if account not in self._allowed_accounts(user):
242                 raise NotAllowedError
243             return {}
244         self._lookup_account(account, True)
245         return self.permissions.group_dict(account)
246     
247     @backend_method
248     def update_account_groups(self, user, account, groups, replace=False):
249         """Update the groups associated with the account."""
250         
251         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252         if user != account:
253             raise NotAllowedError
254         self._lookup_account(account, True)
255         self._check_groups(groups)
256         if replace:
257             self.permissions.group_destroy(account)
258         for k, v in groups.iteritems():
259             if not replace: # If not already deleted.
260                 self.permissions.group_delete(account, k)
261             if v:
262                 self.permissions.group_addmany(account, k, v)
263     
264     @backend_method
265     def get_account_policy(self, user, account):
266         """Return a dictionary with the account policy."""
267         
268         logger.debug("get_account_policy: %s %s", user, account)
269         if user != account:
270             if account not in self._allowed_accounts(user):
271                 raise NotAllowedError
272             return {}
273         path, node = self._lookup_account(account, True)
274         return self._get_policy(node)
275     
276     @backend_method
277     def update_account_policy(self, user, account, policy, replace=False):
278         """Update the policy associated with the account."""
279         
280         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281         if user != account:
282             raise NotAllowedError
283         path, node = self._lookup_account(account, True)
284         self._check_policy(policy)
285         self._put_policy(node, policy, replace)
286     
287     @backend_method
288     def put_account(self, user, account, policy={}):
289         """Create a new account with the given name."""
290         
291         logger.debug("put_account: %s %s %s", user, account, policy)
292         if user != account:
293             raise NotAllowedError
294         node = self.node.node_lookup(account)
295         if node is not None:
296             raise AccountExists('Account already exists')
297         if policy:
298             self._check_policy(policy)
299         node = self._put_path(user, self.ROOTNODE, account)
300         self._put_policy(node, policy, True)
301     
302     @backend_method
303     def delete_account(self, user, account):
304         """Delete the account with the given name."""
305         
306         logger.debug("delete_account: %s %s", user, account)
307         if user != account:
308             raise NotAllowedError
309         node = self.node.node_lookup(account)
310         if node is None:
311             return
312         if not self.node.node_remove(node):
313             raise AccountNotEmpty('Account is not empty')
314         self.permissions.group_destroy(account)
315     
316     @backend_method
317     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318         """Return a list of containers existing under an account."""
319         
320         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321         if user != account:
322             if until or account not in self._allowed_accounts(user):
323                 raise NotAllowedError
324             allowed = self._allowed_containers(user, account)
325             start, limit = self._list_limits(allowed, marker, limit)
326             return allowed[start:start + limit]
327         if shared or public:
328             allowed = set()
329             if shared:
330                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331             if public:
332                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333             allowed = sorted(allowed)
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339         return containers[start:start + limit]
340     
341     @backend_method
342     def list_container_meta(self, user, account, container, domain, until=None):
343         """Return a list with all the container's object meta keys for the domain."""
344         
345         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346         allowed = []
347         if user != account:
348             if until:
349                 raise NotAllowedError
350             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351             if not allowed:
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         before = until if until is not None else inf
355         allowed = self._get_formatted_paths(allowed)
356         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357     
358     @backend_method
359     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360         """Return a dictionary with the container metadata for the domain."""
361         
362         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363         if user != account:
364             if until or container not in self._allowed_containers(user, account):
365                 raise NotAllowedError
366         path, node = self._lookup_container(account, container)
367         props = self._get_properties(node, until)
368         mtime = props[self.MTIME]
369         count, bytes, tstamp = self._get_statistics(node, until)
370         tstamp = max(tstamp, mtime)
371         if until is None:
372             modified = tstamp
373         else:
374             modified = self._get_statistics(node)[2] # Overall last modification.
375             modified = max(modified, mtime)
376         
377         if user != account:
378             meta = {'name': container}
379         else:
380             meta = {}
381             if include_user_defined:
382                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383             if until is not None:
384                 meta.update({'until_timestamp': tstamp})
385             meta.update({'name': container, 'count': count, 'bytes': bytes})
386         meta.update({'modified': modified})
387         return meta
388     
389     @backend_method
390     def update_container_meta(self, user, account, container, domain, meta, replace=False):
391         """Update the metadata associated with the container for the domain."""
392         
393         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394         if user != account:
395             raise NotAllowedError
396         path, node = self._lookup_container(account, container)
397         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398         if src_version_id is not None:
399             versioning = self._get_policy(node)['versioning']
400             if versioning != 'auto':
401                 self.node.version_remove(src_version_id)
402     
403     @backend_method
404     def get_container_policy(self, user, account, container):
405         """Return a dictionary with the container policy."""
406         
407         logger.debug("get_container_policy: %s %s %s", user, account, container)
408         if user != account:
409             if container not in self._allowed_containers(user, account):
410                 raise NotAllowedError
411             return {}
412         path, node = self._lookup_container(account, container)
413         return self._get_policy(node)
414     
415     @backend_method
416     def update_container_policy(self, user, account, container, policy, replace=False):
417         """Update the policy associated with the container."""
418         
419         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420         if user != account:
421             raise NotAllowedError
422         path, node = self._lookup_container(account, container)
423         self._check_policy(policy)
424         self._put_policy(node, policy, replace)
425     
426     @backend_method
427     def put_container(self, user, account, container, policy={}):
428         """Create a new container with the given name."""
429         
430         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431         if user != account:
432             raise NotAllowedError
433         try:
434             path, node = self._lookup_container(account, container)
435         except NameError:
436             pass
437         else:
438             raise ContainerExists('Container already exists')
439         if policy:
440             self._check_policy(policy)
441         path = '/'.join((account, container))
442         node = self._put_path(user, self._lookup_account(account, True)[1], path)
443         self._put_policy(node, policy, True)
444     
445     @backend_method
446     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447         """Delete/purge the container with the given name."""
448         
449         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450         if user != account:
451             raise NotAllowedError
452         path, node = self._lookup_container(account, container)
453         
454         if until is not None:
455             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456             for h in hashes:
457                 self.store.map_delete(h)
458             self.node.node_purge_children(node, until, CLUSTER_DELETED)
459             self._report_size_change(user, account, -size, {'action': 'container purge'})
460             return
461         
462         if self._get_statistics(node)[0] > 0:
463             raise ContainerNotEmpty('Container is not empty')
464         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465         for h in hashes:
466             self.store.map_delete(h)
467         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468         self.node.node_remove(node)
469         self._report_size_change(user, account, -size, {'action': 'container delete'})
470     
471     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472         if user != account and until:
473             raise NotAllowedError
474         if shared and public:
475             # get shared first
476             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477             objects = []
478             if shared:
479                 path, node = self._lookup_container(account, container)
480                 shared = self._get_formatted_paths(shared)
481                 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482             
483             # get public
484             objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485             
486             objects.sort(key=lambda x: x[0])
487             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488             return objects[start:start + limit]
489         elif public:
490             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492             return objects[start:start + limit]
493         
494         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495         if shared and not allowed:
496             return []
497         path, node = self._lookup_container(account, container)
498         allowed = self._get_formatted_paths(allowed)
499         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501         return objects[start:start + limit]
502     
503     def _list_public_object_properties(self, user, account, container, prefix, all_props):
504         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505         paths, nodes = self._lookup_objects(public)
506         path = '/'.join((account, container))
507         cont_prefix = path + '/'
508         paths = [x[len(cont_prefix):] for x in paths]
509         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510         objects = [(path,) + props for path, props in zip(paths, props)]
511         return objects
512         
513     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514         objects = []
515         while True:
516             marker = objects[-1] if objects else None
517             limit = 10000
518             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519             objects.extend(l)
520             if not l or len(l) < limit:
521                 break
522         return objects
523     
524     def _list_object_permissions(self, user, account, container, prefix, shared, public):
525         allowed = []
526         path = '/'.join((account, container, prefix)).rstrip('/')
527         if user != account:
528             allowed = self.permissions.access_list_paths(user, path)
529             if not allowed:
530                 raise NotAllowedError
531         else:
532             allowed = set()
533             if shared:
534                 allowed.update(self.permissions.access_list_shared(path))
535             if public:
536                 allowed.update([x[0] for x in self.permissions.public_list(path)])
537             allowed = sorted(allowed)
538             if not allowed:
539                 return []
540         return allowed
541     
542     @backend_method
543     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
544         """Return a list of object (name, version_id) tuples existing under a container."""
545         
546         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
547         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
548     
549     @backend_method
550     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
551         """Return a list of object metadata dicts existing under a container."""
552         
553         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
554         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
555         objects = []
556         for p in props:
557             if len(p) == 2:
558                 objects.append({'subdir': p[0]})
559             else:
560                 objects.append({'name': p[0],
561                                 'bytes': p[self.SIZE + 1],
562                                 'type': p[self.TYPE + 1],
563                                 'hash': p[self.HASH + 1],
564                                 'version': p[self.SERIAL + 1],
565                                 'version_timestamp': p[self.MTIME + 1],
566                                 'modified': p[self.MTIME + 1] if until is None else None,
567                                 'modified_by': p[self.MUSER + 1],
568                                 'uuid': p[self.UUID + 1],
569                                 'checksum': p[self.CHECKSUM + 1]})
570         return objects
571     
572     @backend_method
573     def list_object_permissions(self, user, account, container, prefix=''):
574         """Return a list of paths that enforce permissions under a container."""
575         
576         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
577         return self._list_object_permissions(user, account, container, prefix, True, False)
578     
579     @backend_method
580     def list_object_public(self, user, account, container, prefix=''):
581         """Return a dict mapping paths to public ids for objects that are public under a container."""
582         
583         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
584         public = {}
585         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
586             public[path] = p + ULTIMATE_ANSWER
587         return public
588     
589     @backend_method
590     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
591         """Return a dictionary with the object metadata for the domain."""
592         
593         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
594         self._can_read(user, account, container, name)
595         path, node = self._lookup_object(account, container, name)
596         props = self._get_version(node, version)
597         if version is None:
598             modified = props[self.MTIME]
599         else:
600             try:
601                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
602             except NameError: # Object may be deleted.
603                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
604                 if del_props is None:
605                     raise ItemNotExists('Object does not exist')
606                 modified = del_props[self.MTIME]
607         
608         meta = {}
609         if include_user_defined:
610             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
611         meta.update({'name': name,
612                      'bytes': props[self.SIZE],
613                      'type': props[self.TYPE],
614                      'hash': props[self.HASH],
615                      'version': props[self.SERIAL],
616                      'version_timestamp': props[self.MTIME],
617                      'modified': modified,
618                      'modified_by': props[self.MUSER],
619                      'uuid': props[self.UUID],
620                      'checksum': props[self.CHECKSUM]})
621         return meta
622     
623     @backend_method
624     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
625         """Update the metadata associated with the object for the domain and return the new version."""
626         
627         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
628         self._can_write(user, account, container, name)
629         path, node = self._lookup_object(account, container, name)
630         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
631         self._apply_versioning(account, container, src_version_id)
632         return dest_version_id
633     
634     @backend_method
635     def get_object_permissions(self, user, account, container, name):
636         """Return the action allowed on the object, the path
637         from which the object gets its permissions from,
638         along with a dictionary containing the permissions."""
639         
640         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
641         allowed = 'write'
642         permissions_path = self._get_permissions_path(account, container, name)
643         if user != account:
644             if self.permissions.access_check(permissions_path, self.WRITE, user):
645                 allowed = 'write'
646             elif self.permissions.access_check(permissions_path, self.READ, user):
647                 allowed = 'read'
648             else:
649                 raise NotAllowedError
650         self._lookup_object(account, container, name)
651         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
652     
653     @backend_method
654     def update_object_permissions(self, user, account, container, name, permissions):
655         """Update the permissions associated with the object."""
656         
657         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
658         if user != account:
659             raise NotAllowedError
660         path = self._lookup_object(account, container, name)[0]
661         self._check_permissions(path, permissions)
662         self.permissions.access_set(path, permissions)
663         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
664     
665     @backend_method
666     def get_object_public(self, user, account, container, name):
667         """Return the public id of the object if applicable."""
668         
669         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
670         self._can_read(user, account, container, name)
671         path = self._lookup_object(account, container, name)[0]
672         p = self.permissions.public_get(path)
673         if p is not None:
674             p += ULTIMATE_ANSWER
675         return p
676     
677     @backend_method
678     def update_object_public(self, user, account, container, name, public):
679         """Update the public status of the object."""
680         
681         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
682         self._can_write(user, account, container, name)
683         path = self._lookup_object(account, container, name)[0]
684         if not public:
685             self.permissions.public_unset(path)
686         else:
687             self.permissions.public_set(path)
688     
689     @backend_method
690     def get_object_hashmap(self, user, account, container, name, version=None):
691         """Return the object's size and a list with partial hashes."""
692         
693         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
694         self._can_read(user, account, container, name)
695         path, node = self._lookup_object(account, container, name)
696         props = self._get_version(node, version)
697         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
698         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
699     
700     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
701         if permissions is not None and user != account:
702             raise NotAllowedError
703         self._can_write(user, account, container, name)
704         if permissions is not None:
705             path = '/'.join((account, container, name))
706             self._check_permissions(path, permissions)
707         
708         account_path, account_node = self._lookup_account(account, True)
709         container_path, container_node = self._lookup_container(account, container)
710         path, node = self._put_object_node(container_path, container_node, name)
711         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
712         
713         # Handle meta.
714         if src_version_id is None:
715             src_version_id = pre_version_id
716         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
717         
718         # Check quota.
719         del_size = self._apply_versioning(account, container, pre_version_id)
720         size_delta = size - del_size
721         if size_delta > 0:
722             account_quota = long(self._get_policy(account_node)['quota'])
723             container_quota = long(self._get_policy(container_node)['quota'])
724             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
725                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
726                 # This must be executed in a transaction, so the version is never created if it fails.
727                 raise QuotaError
728         self._report_size_change(user, account, size_delta, {'action': 'object update'})
729         
730         if permissions is not None:
731             self.permissions.access_set(path, permissions)
732             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
733         
734         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
735         return dest_version_id
736     
737     @backend_method
738     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
739         """Create/update an object with the specified size and partial hashes."""
740         
741         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
742         if size == 0: # No such thing as an empty hashmap.
743             hashmap = [self.put_block('')]
744         map = HashMap(self.block_size, self.hash_algorithm)
745         map.extend([binascii.unhexlify(x) for x in hashmap])
746         missing = self.store.block_search(map)
747         if missing:
748             ie = IndexError()
749             ie.data = [binascii.hexlify(x) for x in missing]
750             raise ie
751         
752         hash = map.hash()
753         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
754         self.store.map_put(hash, map)
755         return dest_version_id
756     
757     @backend_method
758     def update_object_checksum(self, user, account, container, name, version, checksum):
759         """Update an object's checksum."""
760         
761         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
762         # Update objects with greater version and same hashmap and size (fix metadata updates).
763         self._can_write(user, account, container, name)
764         path, node = self._lookup_object(account, container, name)
765         props = self._get_version(node, version)
766         versions = self.node.node_get_versions(node)
767         for x in versions:
768             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
769                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
770     
771     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
772         dest_version_ids = []
773         self._can_read(user, src_account, src_container, src_name)
774         path, node = self._lookup_object(src_account, src_container, src_name)
775         # TODO: Will do another fetch of the properties in duplicate version...
776         props = self._get_version(node, src_version) # Check to see if source exists.
777         src_version_id = props[self.SERIAL]
778         hash = props[self.HASH]
779         size = props[self.SIZE]
780         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
781         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
782         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
783                 self._delete_object(user, src_account, src_container, src_name)
784         
785         if delimiter:
786             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
787             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
788             paths = [elem[0] for elem in src_names]
789             nodes = [elem[2] for elem in src_names]
790             # TODO: Will do another fetch of the properties in duplicate version...
791             props = self._get_versions(nodes) # Check to see if source exists.
792             
793             for prop, path, node in zip(props, paths, nodes):
794                 src_version_id = prop[self.SERIAL]
795                 hash = prop[self.HASH]
796                 vtype = prop[self.TYPE]
797                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
798                 vdest_name = path.replace(prefix, dest_prefix, 1)
799                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
800                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
801                         self._delete_object(user, src_account, src_container, path)
802         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
803     
804     @backend_method
805     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
806         """Copy an object's data and metadata."""
807         
808         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
809         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
810         return dest_version_id
811     
812     @backend_method
813     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
814         """Move an object's data and metadata."""
815         
816         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
817         if user != src_account:
818             raise NotAllowedError
819         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
820         return dest_version_id
821     
822     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
823         if user != account:
824             raise NotAllowedError
825         
826         if until is not None:
827             path = '/'.join((account, container, name))
828             node = self.node.node_lookup(path)
829             if node is None:
830                 return
831             hashes = []
832             size = 0
833             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
834             hashes += h
835             size += s
836             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
837             hashes += h
838             size += s
839             for h in hashes:
840                 self.store.map_delete(h)
841             self.node.node_purge(node, until, CLUSTER_DELETED)
842             try:
843                 props = self._get_version(node)
844             except NameError:
845                 self.permissions.access_clear(path)
846             self._report_size_change(user, account, -size, {'action': 'object purge'})
847             return
848         
849         path, node = self._lookup_object(account, container, name)
850         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
851         del_size = self._apply_versioning(account, container, src_version_id)
852         if del_size:
853             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
854         self._report_object_change(user, account, path, details={'action': 'object delete'})
855         self.permissions.access_clear(path)
856         
857         if delimiter:
858             prefix = name + delimiter if not name.endswith(delimiter) else name
859             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
860             paths = []
861             for t in src_names:
862                 path = '/'.join((account, container, t[0]))
863                 node = t[2]
864                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
865                 del_size = self._apply_versioning(account, container, src_version_id)
866                 if del_size:
867                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
868                 self._report_object_change(user, account, path, details={'action': 'object delete'})
869                 paths.append(path)
870             self.permissions.access_clear_bulk(paths)
871     
872     @backend_method
873     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
874         """Delete/purge an object."""
875         
876         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
877         self._delete_object(user, account, container, name, until, delimiter)
878     
879     @backend_method
880     def list_versions(self, user, account, container, name):
881         """Return a list of all (version, version_timestamp) tuples for an object."""
882         
883         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
884         self._can_read(user, account, container, name)
885         path, node = self._lookup_object(account, container, name)
886         versions = self.node.node_get_versions(node)
887         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
888     
889     @backend_method
890     def get_uuid(self, user, uuid):
891         """Return the (account, container, name) for the UUID given."""
892         
893         logger.debug("get_uuid: %s %s", user, uuid)
894         info = self.node.latest_uuid(uuid)
895         if info is None:
896             raise NameError
897         path, serial = info
898         account, container, name = path.split('/', 2)
899         self._can_read(user, account, container, name)
900         return (account, container, name)
901     
902     @backend_method
903     def get_public(self, user, public):
904         """Return the (account, container, name) for the public id given."""
905         
906         logger.debug("get_public: %s %s", user, public)
907         if public is None or public < ULTIMATE_ANSWER:
908             raise NameError
909         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
910         if path is None:
911             raise NameError
912         account, container, name = path.split('/', 2)
913         self._can_read(user, account, container, name)
914         return (account, container, name)
915     
916     @backend_method(autocommit=0)
917     def get_block(self, hash):
918         """Return a block's data."""
919         
920         logger.debug("get_block: %s", hash)
921         block = self.store.block_get(binascii.unhexlify(hash))
922         if not block:
923             raise ItemNotExists('Block does not exist')
924         return block
925     
926     @backend_method(autocommit=0)
927     def put_block(self, data):
928         """Store a block and return the hash."""
929         
930         logger.debug("put_block: %s", len(data))
931         return binascii.hexlify(self.store.block_put(data))
932     
933     @backend_method(autocommit=0)
934     def update_block(self, hash, data, offset=0):
935         """Update a known block and return the hash."""
936         
937         logger.debug("update_block: %s %s %s", hash, len(data), offset)
938         if offset == 0 and len(data) == self.block_size:
939             return self.put_block(data)
940         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
941         return binascii.hexlify(h)
942     
943     # Path functions.
944     
945     def _generate_uuid(self):
946         return str(uuidlib.uuid4())
947     
948     def _put_object_node(self, path, parent, name):
949         path = '/'.join((path, name))
950         node = self.node.node_lookup(path)
951         if node is None:
952             node = self.node.node_create(parent, path)
953         return path, node
954     
955     def _put_path(self, user, parent, path):
956         node = self.node.node_create(parent, path)
957         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
958         return node
959     
960     def _lookup_account(self, account, create=True):
961         node = self.node.node_lookup(account)
962         if node is None and create:
963             node = self._put_path(account, self.ROOTNODE, account) # User is account.
964         return account, node
965     
966     def _lookup_container(self, account, container):
967         path = '/'.join((account, container))
968         node = self.node.node_lookup(path)
969         if node is None:
970             raise ItemNotExists('Container does not exist')
971         return path, node
972     
973     def _lookup_object(self, account, container, name):
974         path = '/'.join((account, container, name))
975         node = self.node.node_lookup(path)
976         if node is None:
977             raise ItemNotExists('Object does not exist')
978         return path, node
979     
980     def _lookup_objects(self, paths):
981         nodes = self.node.node_lookup_bulk(paths)
982         return paths, nodes
983     
984     def _get_properties(self, node, until=None):
985         """Return properties until the timestamp given."""
986         
987         before = until if until is not None else inf
988         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
989         if props is None and until is not None:
990             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
991         if props is None:
992             raise ItemNotExists('Path does not exist')
993         return props
994     
995     def _get_statistics(self, node, until=None):
996         """Return count, sum of size and latest timestamp of everything under node."""
997         
998         if until is None:
999             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1000         else:
1001             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1002         if stats is None:
1003             stats = (0, 0, 0)
1004         return stats
1005     
1006     def _get_version(self, node, version=None):
1007         if version is None:
1008             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1009             if props is None:
1010                 raise ItemNotExists('Object does not exist')
1011         else:
1012             try:
1013                 version = int(version)
1014             except ValueError:
1015                 raise VersionNotExists('Version does not exist')
1016             props = self.node.version_get_properties(version)
1017             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1018                 raise VersionNotExists('Version does not exist')
1019         return props
1020
1021     def _get_versions(self, nodes):
1022         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1023     
1024     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1025         """Create a new version of the node."""
1026         
1027         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1028         if props is not None:
1029             src_version_id = props[self.SERIAL]
1030             src_hash = props[self.HASH]
1031             src_size = props[self.SIZE]
1032             src_type = props[self.TYPE]
1033             src_checksum = props[self.CHECKSUM]
1034         else:
1035             src_version_id = None
1036             src_hash = None
1037             src_size = 0
1038             src_type = ''
1039             src_checksum = ''
1040         if size is None: # Set metadata.
1041             hash = src_hash # This way hash can be set to None (account or container).
1042             size = src_size
1043         if type is None:
1044             type = src_type
1045         if checksum is None:
1046             checksum = src_checksum
1047         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1048         
1049         if src_node is None:
1050             pre_version_id = src_version_id
1051         else:
1052             pre_version_id = None
1053             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1054             if props is not None:
1055                 pre_version_id = props[self.SERIAL]
1056         if pre_version_id is not None:
1057             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1058         
1059         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1060         return pre_version_id, dest_version_id
1061     
1062     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1063         if src_version_id is not None:
1064             self.node.attribute_copy(src_version_id, dest_version_id)
1065         if not replace:
1066             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1067             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1068         else:
1069             self.node.attribute_del(dest_version_id, domain)
1070             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1071     
1072     def _put_metadata(self, user, node, domain, meta, replace=False):
1073         """Create a new version and store metadata."""
1074         
1075         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1076         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1077         return src_version_id, dest_version_id
1078     
1079     def _list_limits(self, listing, marker, limit):
1080         start = 0
1081         if marker:
1082             try:
1083                 start = listing.index(marker) + 1
1084             except ValueError:
1085                 pass
1086         if not limit or limit > 10000:
1087             limit = 10000
1088         return start, limit
1089     
1090     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1091         cont_prefix = path + '/'
1092         prefix = cont_prefix + prefix
1093         start = cont_prefix + marker if marker else None
1094         before = until if until is not None else inf
1095         filterq = keys if domain else []
1096         sizeq = size_range
1097         
1098         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1099         objects.extend([(p, None) for p in prefixes] if virtual else [])
1100         objects.sort(key=lambda x: x[0])
1101         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1102         return objects
1103         
1104     # Reporting functions.
1105     
1106     def _report_size_change(self, user, account, size, details={}):
1107         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1108         account_node = self._lookup_account(account, True)[1]
1109         total = self._get_statistics(account_node)[1]
1110         details.update({'user': user, 'total': total})
1111         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1112     
1113     def _report_object_change(self, user, account, path, details={}):
1114         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1115         details.update({'user': user})
1116         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1117     
1118     def _report_sharing_change(self, user, account, path, details={}):
1119         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1120         details.update({'user': user})
1121         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1122     
1123     # Policy functions.
1124     
1125     def _check_policy(self, policy):
1126         for k in policy.keys():
1127             if policy[k] == '':
1128                 policy[k] = self.default_policy.get(k)
1129         for k, v in policy.iteritems():
1130             if k == 'quota':
1131                 q = int(v) # May raise ValueError.
1132                 if q < 0:
1133                     raise ValueError
1134             elif k == 'versioning':
1135                 if v not in ['auto', 'none']:
1136                     raise ValueError
1137             else:
1138                 raise ValueError
1139     
1140     def _put_policy(self, node, policy, replace):
1141         if replace:
1142             for k, v in self.default_policy.iteritems():
1143                 if k not in policy:
1144                     policy[k] = v
1145         self.node.policy_set(node, policy)
1146     
1147     def _get_policy(self, node):
1148         policy = self.default_policy.copy()
1149         policy.update(self.node.policy_get(node))
1150         return policy
1151     
1152     def _apply_versioning(self, account, container, version_id):
1153         """Delete the provided version if such is the policy.
1154            Return size of object removed.
1155         """
1156         
1157         if version_id is None:
1158             return 0
1159         path, node = self._lookup_container(account, container)
1160         versioning = self._get_policy(node)['versioning']
1161         if versioning != 'auto':
1162             hash, size = self.node.version_remove(version_id)
1163             self.store.map_delete(hash)
1164             return size
1165         return 0
1166     
1167     # Access control functions.
1168     
1169     def _check_groups(self, groups):
1170         # raise ValueError('Bad characters in groups')
1171         pass
1172     
1173     def _check_permissions(self, path, permissions):
1174         # raise ValueError('Bad characters in permissions')
1175         pass
1176     
1177     def _get_formatted_paths(self, paths):
1178         formatted = []
1179         for p in paths:
1180             node = self.node.node_lookup(p)
1181             if node is not None:
1182                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1183             if props is not None:
1184                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1185                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1186                 formatted.append((p, self.MATCH_EXACT))
1187         return formatted
1188     
1189     def _get_permissions_path(self, account, container, name):
1190         path = '/'.join((account, container, name))
1191         permission_paths = self.permissions.access_inherit(path)
1192         permission_paths.sort()
1193         permission_paths.reverse()
1194         for p in permission_paths:
1195             if p == path:
1196                 return p
1197             else:
1198                 if p.count('/') < 2:
1199                     continue
1200                 node = self.node.node_lookup(p)
1201                 if node is not None:
1202                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1203                 if props is not None:
1204                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1205                         return p
1206         return None
1207     
1208     def _can_read(self, user, account, container, name):
1209         if user == account:
1210             return True
1211         path = '/'.join((account, container, name))
1212         if self.permissions.public_get(path) is not None:
1213             return True
1214         path = self._get_permissions_path(account, container, name)
1215         if not path:
1216             raise NotAllowedError
1217         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1218             raise NotAllowedError
1219     
1220     def _can_write(self, user, account, container, name):
1221         if user == account:
1222             return True
1223         path = '/'.join((account, container, name))
1224         path = self._get_permissions_path(account, container, name)
1225         if not path:
1226             raise NotAllowedError
1227         if not self.permissions.access_check(path, self.WRITE, user):
1228             raise NotAllowedError
1229     
1230     def _allowed_accounts(self, user):
1231         allow = set()
1232         for path in self.permissions.access_list_paths(user):
1233             allow.add(path.split('/', 1)[0])
1234         return sorted(allow)
1235     
1236     def _allowed_containers(self, user, account):
1237         allow = set()
1238         for path in self.permissions.access_list_paths(user, account):
1239             allow.add(path.split('/', 2)[1])
1240         return sorted(allow)