Do not include children of public objects in listings
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s %s", user, account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s %s", user, account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s %s", user, account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s %s", user, account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared or public:
327             allowed = []
328             if shared:
329                 allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330             if public:
331                 allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332             allowed = list(set(allowed))
333             allowed.sort()
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339         return containers[start:start + limit]
340     
341     @backend_method
342     def list_container_meta(self, user, account, container, domain, until=None):
343         """Return a list with all the container's object meta keys for the domain."""
344         
345         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346         allowed = []
347         if user != account:
348             if until:
349                 raise NotAllowedError
350             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351             if not allowed:
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         before = until if until is not None else inf
355         allowed = self._get_formatted_paths(allowed)
356         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357     
358     @backend_method
359     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360         """Return a dictionary with the container metadata for the domain."""
361         
362         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363         if user != account:
364             if until or container not in self._allowed_containers(user, account):
365                 raise NotAllowedError
366         path, node = self._lookup_container(account, container)
367         props = self._get_properties(node, until)
368         mtime = props[self.MTIME]
369         count, bytes, tstamp = self._get_statistics(node, until)
370         tstamp = max(tstamp, mtime)
371         if until is None:
372             modified = tstamp
373         else:
374             modified = self._get_statistics(node)[2] # Overall last modification.
375             modified = max(modified, mtime)
376         
377         if user != account:
378             meta = {'name': container}
379         else:
380             meta = {}
381             if include_user_defined:
382                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383             if until is not None:
384                 meta.update({'until_timestamp': tstamp})
385             meta.update({'name': container, 'count': count, 'bytes': bytes})
386         meta.update({'modified': modified})
387         return meta
388     
389     @backend_method
390     def update_container_meta(self, user, account, container, domain, meta, replace=False):
391         """Update the metadata associated with the container for the domain."""
392         
393         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394         if user != account:
395             raise NotAllowedError
396         path, node = self._lookup_container(account, container)
397         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398         if src_version_id is not None:
399             versioning = self._get_policy(node)['versioning']
400             if versioning != 'auto':
401                 self.node.version_remove(src_version_id)
402     
403     @backend_method
404     def get_container_policy(self, user, account, container):
405         """Return a dictionary with the container policy."""
406         
407         logger.debug("get_container_policy: %s %s %s", user, account, container)
408         if user != account:
409             if container not in self._allowed_containers(user, account):
410                 raise NotAllowedError
411             return {}
412         path, node = self._lookup_container(account, container)
413         return self._get_policy(node)
414     
415     @backend_method
416     def update_container_policy(self, user, account, container, policy, replace=False):
417         """Update the policy associated with the container."""
418         
419         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420         if user != account:
421             raise NotAllowedError
422         path, node = self._lookup_container(account, container)
423         self._check_policy(policy)
424         self._put_policy(node, policy, replace)
425     
426     @backend_method
427     def put_container(self, user, account, container, policy={}):
428         """Create a new container with the given name."""
429         
430         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431         if user != account:
432             raise NotAllowedError
433         try:
434             path, node = self._lookup_container(account, container)
435         except NameError:
436             pass
437         else:
438             raise NameError('Container already exists')
439         if policy:
440             self._check_policy(policy)
441         path = '/'.join((account, container))
442         node = self._put_path(user, self._lookup_account(account, True)[1], path)
443         self._put_policy(node, policy, True)
444     
445     @backend_method
446     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447         """Delete/purge the container with the given name."""
448         
449         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450         if user != account:
451             raise NotAllowedError
452         path, node = self._lookup_container(account, container)
453         
454         if until is not None:
455             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456             for h in hashes:
457                 self.store.map_delete(h)
458             self.node.node_purge_children(node, until, CLUSTER_DELETED)
459             self._report_size_change(user, account, -size, {'action': 'container purge'})
460             return
461         
462         if self._get_statistics(node)[0] > 0:
463             raise IndexError('Container is not empty')
464         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465         for h in hashes:
466             self.store.map_delete(h)
467         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468         self.node.node_remove(node)
469         self._report_size_change(user, account, -size, {'action': 'container delete'})
470     
471     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472         if user != account and until:
473             raise NotAllowedError
474         if shared and public:
475             # get shared first
476             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477             objects = []
478             if shared:
479                 path, node = self._lookup_container(account, container)
480                 shared = self._get_formatted_paths(shared)
481                 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482             
483             # get public
484             objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485             
486             objects.sort(key=lambda x: x[0])
487             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488             return objects[start:start + limit]
489         elif public:
490             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492             return objects[start:start + limit]
493         
494         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495         if shared and not allowed:
496             return []
497         path, node = self._lookup_container(account, container)
498         allowed = self._get_formatted_paths(allowed)
499         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501         return objects[start:start + limit]
502     
503     def _list_public_object_properties(self, user, account, container, prefix, all_props):
504         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505         paths, nodes = self._lookup_objects(public)
506         path = '/'.join((account, container))
507         cont_prefix = path + '/'
508         paths = [x[len(cont_prefix):] for x in paths]
509         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510         objects = [(path,) + props for path, props in zip(paths, props)]
511         return objects
512         
513     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514         objects = []
515         while True:
516             marker = objects[-1] if objects else None
517             limit = 10000
518             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519             objects.extend(l)
520             if not l or len(l) < limit:
521                 break
522         return objects
523     
524     def _list_object_permissions(self, user, account, container, prefix, shared, public):
525         allowed = []
526         path = '/'.join((account, container, prefix)).rstrip('/')
527         if user != account:
528             allowed = self.permissions.access_list_paths(user, path)
529             if not allowed:
530                 raise NotAllowedError
531         else:
532             allowed = []
533             if shared:
534                 allowed.extend(self.permissions.access_list_shared(path))
535             if public:
536                 allowed.extend([x[0] for x in self.permissions.public_list(path)])
537             allowed = list(set(allowed))
538             allowed.sort()
539             if not allowed:
540                 return []
541         return allowed
542     
543     @backend_method
544     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
545         """Return a list of object (name, version_id) tuples existing under a container."""
546         
547         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
548         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
549     
550     @backend_method
551     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
552         """Return a list of object metadata dicts existing under a container."""
553         
554         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
555         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
556         objects = []
557         for p in props:
558             if len(p) == 2:
559                 objects.append({'subdir': p[0]})
560             else:
561                 objects.append({'name': p[0],
562                                 'bytes': p[self.SIZE + 1],
563                                 'type': p[self.TYPE + 1],
564                                 'hash': p[self.HASH + 1],
565                                 'version': p[self.SERIAL + 1],
566                                 'version_timestamp': p[self.MTIME + 1],
567                                 'modified': p[self.MTIME + 1] if until is None else None,
568                                 'modified_by': p[self.MUSER + 1],
569                                 'uuid': p[self.UUID + 1],
570                                 'checksum': p[self.CHECKSUM + 1]})
571         return objects
572     
573     @backend_method
574     def list_object_permissions(self, user, account, container, prefix=''):
575         """Return a list of paths that enforce permissions under a container."""
576         
577         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
578         return self._list_object_permissions(user, account, container, prefix, True, False)
579     
580     @backend_method
581     def list_object_public(self, user, account, container, prefix=''):
582         """Return a dict mapping paths to public ids for objects that are public under a container."""
583         
584         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
585         public = {}
586         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
587             public[path] = p + ULTIMATE_ANSWER
588         return public
589     
590     @backend_method
591     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
592         """Return a dictionary with the object metadata for the domain."""
593         
594         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
595         self._can_read(user, account, container, name)
596         path, node = self._lookup_object(account, container, name)
597         props = self._get_version(node, version)
598         if version is None:
599             modified = props[self.MTIME]
600         else:
601             try:
602                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
603             except NameError: # Object may be deleted.
604                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
605                 if del_props is None:
606                     raise NameError('Object does not exist')
607                 modified = del_props[self.MTIME]
608         
609         meta = {}
610         if include_user_defined:
611             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
612         meta.update({'name': name,
613                      'bytes': props[self.SIZE],
614                      'type': props[self.TYPE],
615                      'hash': props[self.HASH],
616                      'version': props[self.SERIAL],
617                      'version_timestamp': props[self.MTIME],
618                      'modified': modified,
619                      'modified_by': props[self.MUSER],
620                      'uuid': props[self.UUID],
621                      'checksum': props[self.CHECKSUM]})
622         return meta
623     
624     @backend_method
625     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
626         """Update the metadata associated with the object for the domain and return the new version."""
627         
628         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
629         self._can_write(user, account, container, name)
630         path, node = self._lookup_object(account, container, name)
631         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
632         self._apply_versioning(account, container, src_version_id)
633         return dest_version_id
634     
635     @backend_method
636     def get_object_permissions(self, user, account, container, name):
637         """Return the action allowed on the object, the path
638         from which the object gets its permissions from,
639         along with a dictionary containing the permissions."""
640         
641         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
642         allowed = 'write'
643         permissions_path = self._get_permissions_path(account, container, name)
644         if user != account:
645             if self.permissions.access_check(permissions_path, self.WRITE, user):
646                 allowed = 'write'
647             elif self.permissions.access_check(permissions_path, self.READ, user):
648                 allowed = 'read'
649             else:
650                 raise NotAllowedError
651         self._lookup_object(account, container, name)
652         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
653     
654     @backend_method
655     def update_object_permissions(self, user, account, container, name, permissions):
656         """Update the permissions associated with the object."""
657         
658         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
659         if user != account:
660             raise NotAllowedError
661         path = self._lookup_object(account, container, name)[0]
662         self._check_permissions(path, permissions)
663         self.permissions.access_set(path, permissions)
664         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
665     
666     @backend_method
667     def get_object_public(self, user, account, container, name):
668         """Return the public id of the object if applicable."""
669         
670         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
671         self._can_read(user, account, container, name)
672         path = self._lookup_object(account, container, name)[0]
673         p = self.permissions.public_get(path)
674         if p is not None:
675             p += ULTIMATE_ANSWER
676         return p
677     
678     @backend_method
679     def update_object_public(self, user, account, container, name, public):
680         """Update the public status of the object."""
681         
682         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
683         self._can_write(user, account, container, name)
684         path = self._lookup_object(account, container, name)[0]
685         if not public:
686             self.permissions.public_unset(path)
687         else:
688             self.permissions.public_set(path)
689     
690     @backend_method
691     def get_object_hashmap(self, user, account, container, name, version=None):
692         """Return the object's size and a list with partial hashes."""
693         
694         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
695         self._can_read(user, account, container, name)
696         path, node = self._lookup_object(account, container, name)
697         props = self._get_version(node, version)
698         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
699         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
700     
701     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
702         if permissions is not None and user != account:
703             raise NotAllowedError
704         self._can_write(user, account, container, name)
705         if permissions is not None:
706             path = '/'.join((account, container, name))
707             self._check_permissions(path, permissions)
708         
709         account_path, account_node = self._lookup_account(account, True)
710         container_path, container_node = self._lookup_container(account, container)
711         path, node = self._put_object_node(container_path, container_node, name)
712         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
713         
714         # Handle meta.
715         if src_version_id is None:
716             src_version_id = pre_version_id
717         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
718         
719         # Check quota.
720         del_size = self._apply_versioning(account, container, pre_version_id)
721         size_delta = size - del_size
722         if size_delta > 0:
723             account_quota = long(self._get_policy(account_node)['quota'])
724             container_quota = long(self._get_policy(container_node)['quota'])
725             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
726                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
727                 # This must be executed in a transaction, so the version is never created if it fails.
728                 raise QuotaError
729         self._report_size_change(user, account, size_delta, {'action': 'object update'})
730         
731         if permissions is not None:
732             self.permissions.access_set(path, permissions)
733             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
734         
735         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
736         return dest_version_id
737     
738     @backend_method
739     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
740         """Create/update an object with the specified size and partial hashes."""
741         
742         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
743         if size == 0: # No such thing as an empty hashmap.
744             hashmap = [self.put_block('')]
745         map = HashMap(self.block_size, self.hash_algorithm)
746         map.extend([binascii.unhexlify(x) for x in hashmap])
747         missing = self.store.block_search(map)
748         if missing:
749             ie = IndexError()
750             ie.data = [binascii.hexlify(x) for x in missing]
751             raise ie
752         
753         hash = map.hash()
754         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
755         self.store.map_put(hash, map)
756         return dest_version_id
757     
758     @backend_method
759     def update_object_checksum(self, user, account, container, name, version, checksum):
760         """Update an object's checksum."""
761         
762         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
763         # Update objects with greater version and same hashmap and size (fix metadata updates).
764         self._can_write(user, account, container, name)
765         path, node = self._lookup_object(account, container, name)
766         props = self._get_version(node, version)
767         versions = self.node.node_get_versions(node)
768         for x in versions:
769             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
770                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
771     
772     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
773         dest_version_ids = []
774         self._can_read(user, src_account, src_container, src_name)
775         path, node = self._lookup_object(src_account, src_container, src_name)
776         # TODO: Will do another fetch of the properties in duplicate version...
777         props = self._get_version(node, src_version) # Check to see if source exists.
778         src_version_id = props[self.SERIAL]
779         hash = props[self.HASH]
780         size = props[self.SIZE]
781         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
782         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
783         if is_move:
784                 self._delete_object(user, src_account, src_container, src_name)
785         
786         if delimiter:
787             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
788             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
789             paths = [elem[0] for elem in src_names]
790             nodes = [elem[2] for elem in src_names]
791             # TODO: Will do another fetch of the properties in duplicate version...
792             props = self._get_versions(nodes) # Check to see if source exists.
793             
794             for prop, path, node in zip(props, paths, nodes):
795                 src_version_id = prop[self.SERIAL]
796                 hash = prop[self.HASH]
797                 vtype = prop[self.TYPE]
798                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
799                 vdest_name = path.replace(prefix, dest_prefix, 1)
800                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
801                 if is_move:
802                         self._delete_object(user, src_account, src_container, path)
803         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
804     
805     @backend_method
806     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
807         """Copy an object's data and metadata."""
808         
809         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
810         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
811         return dest_version_id
812     
813     @backend_method
814     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
815         """Move an object's data and metadata."""
816         
817         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
818         if user != src_account:
819             raise NotAllowedError
820         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
821         return dest_version_id
822     
823     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
824         if user != account:
825             raise NotAllowedError
826         
827         if until is not None:
828             path = '/'.join((account, container, name))
829             node = self.node.node_lookup(path)
830             if node is None:
831                 return
832             hashes = []
833             size = 0
834             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
835             hashes += h
836             size += s
837             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
838             hashes += h
839             size += s
840             for h in hashes:
841                 self.store.map_delete(h)
842             self.node.node_purge(node, until, CLUSTER_DELETED)
843             try:
844                 props = self._get_version(node)
845             except NameError:
846                 self.permissions.access_clear(path)
847             self._report_size_change(user, account, -size, {'action': 'object purge'})
848             return
849         
850         path, node = self._lookup_object(account, container, name)
851         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
852         del_size = self._apply_versioning(account, container, src_version_id)
853         if del_size:
854             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
855         self._report_object_change(user, account, path, details={'action': 'object delete'})
856         self.permissions.access_clear(path)
857         
858         if delimiter:
859             prefix = name + delimiter if not name.endswith(delimiter) else name
860             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
861             paths = []
862             for t in src_names:
863                 path = '/'.join((account, container, t[0]))
864                 node = t[2]
865                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
866                 del_size = self._apply_versioning(account, container, src_version_id)
867                 if del_size:
868                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
869                 self._report_object_change(user, account, path, details={'action': 'object delete'})
870                 paths.append(path)
871             self.permissions.access_clear_bulk(paths)
872     
873     @backend_method
874     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
875         """Delete/purge an object."""
876         
877         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
878         self._delete_object(user, account, container, name, until, delimiter)
879     
880     @backend_method
881     def list_versions(self, user, account, container, name):
882         """Return a list of all (version, version_timestamp) tuples for an object."""
883         
884         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
885         self._can_read(user, account, container, name)
886         path, node = self._lookup_object(account, container, name)
887         versions = self.node.node_get_versions(node)
888         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
889     
890     @backend_method
891     def get_uuid(self, user, uuid):
892         """Return the (account, container, name) for the UUID given."""
893         
894         logger.debug("get_uuid: %s %s", user, uuid)
895         info = self.node.latest_uuid(uuid)
896         if info is None:
897             raise NameError
898         path, serial = info
899         account, container, name = path.split('/', 2)
900         self._can_read(user, account, container, name)
901         return (account, container, name)
902     
903     @backend_method
904     def get_public(self, user, public):
905         """Return the (account, container, name) for the public id given."""
906         
907         logger.debug("get_public: %s %s", user, public)
908         if public is None or public < ULTIMATE_ANSWER:
909             raise NameError
910         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
911         if path is None:
912             raise NameError
913         account, container, name = path.split('/', 2)
914         self._can_read(user, account, container, name)
915         return (account, container, name)
916     
917     @backend_method(autocommit=0)
918     def get_block(self, hash):
919         """Return a block's data."""
920         
921         logger.debug("get_block: %s", hash)
922         block = self.store.block_get(binascii.unhexlify(hash))
923         if not block:
924             raise NameError('Block does not exist')
925         return block
926     
927     @backend_method(autocommit=0)
928     def put_block(self, data):
929         """Store a block and return the hash."""
930         
931         logger.debug("put_block: %s", len(data))
932         return binascii.hexlify(self.store.block_put(data))
933     
934     @backend_method(autocommit=0)
935     def update_block(self, hash, data, offset=0):
936         """Update a known block and return the hash."""
937         
938         logger.debug("update_block: %s %s %s", hash, len(data), offset)
939         if offset == 0 and len(data) == self.block_size:
940             return self.put_block(data)
941         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
942         return binascii.hexlify(h)
943     
944     # Path functions.
945     
946     def _generate_uuid(self):
947         return str(uuidlib.uuid4())
948     
949     def _put_object_node(self, path, parent, name):
950         path = '/'.join((path, name))
951         node = self.node.node_lookup(path)
952         if node is None:
953             node = self.node.node_create(parent, path)
954         return path, node
955     
956     def _put_path(self, user, parent, path):
957         node = self.node.node_create(parent, path)
958         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
959         return node
960     
961     def _lookup_account(self, account, create=True):
962         node = self.node.node_lookup(account)
963         if node is None and create:
964             node = self._put_path(account, self.ROOTNODE, account) # User is account.
965         return account, node
966     
967     def _lookup_container(self, account, container):
968         path = '/'.join((account, container))
969         node = self.node.node_lookup(path)
970         if node is None:
971             raise NameError('Container does not exist')
972         return path, node
973     
974     def _lookup_object(self, account, container, name):
975         path = '/'.join((account, container, name))
976         node = self.node.node_lookup(path)
977         if node is None:
978             raise NameError('Object does not exist')
979         return path, node
980     
981     def _lookup_objects(self, paths):
982         nodes = self.node.node_lookup_bulk(paths)
983         if nodes is None:
984             raise NameError('Object does not exist')
985         return paths, nodes
986     
987     def _get_properties(self, node, until=None):
988         """Return properties until the timestamp given."""
989         
990         before = until if until is not None else inf
991         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
992         if props is None and until is not None:
993             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
994         if props is None:
995             raise NameError('Path does not exist')
996         return props
997     
998     def _get_statistics(self, node, until=None):
999         """Return count, sum of size and latest timestamp of everything under node."""
1000         
1001         if until is None:
1002             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1003         else:
1004             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1005         if stats is None:
1006             stats = (0, 0, 0)
1007         return stats
1008     
1009     def _get_version(self, node, version=None):
1010         if version is None:
1011             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1012             if props is None:
1013                 raise NameError('Object does not exist')
1014         else:
1015             try:
1016                 version = int(version)
1017             except ValueError:
1018                 raise IndexError('Version does not exist')
1019             props = self.node.version_get_properties(version)
1020             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1021                 raise IndexError('Version does not exist')
1022         return props
1023
1024     def _get_versions(self, nodes, version=None):
1025         if version is None:
1026             props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1027             if not props:
1028                 raise NameError('Object does not exist')
1029         else:
1030             try:
1031                 version = int(version)
1032             except ValueError:
1033                 raise IndexError('Version does not exist')
1034             props = self.node.version_get_properties(version)
1035             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1036                 raise IndexError('Version does not exist')
1037         return props
1038     
1039     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1040         """Create a new version of the node."""
1041         
1042         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1043         if props is not None:
1044             src_version_id = props[self.SERIAL]
1045             src_hash = props[self.HASH]
1046             src_size = props[self.SIZE]
1047             src_type = props[self.TYPE]
1048             src_checksum = props[self.CHECKSUM]
1049         else:
1050             src_version_id = None
1051             src_hash = None
1052             src_size = 0
1053             src_type = ''
1054             src_checksum = ''
1055         if size is None: # Set metadata.
1056             hash = src_hash # This way hash can be set to None (account or container).
1057             size = src_size
1058         if type is None:
1059             type = src_type
1060         if checksum is None:
1061             checksum = src_checksum
1062         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1063         
1064         if src_node is None:
1065             pre_version_id = src_version_id
1066         else:
1067             pre_version_id = None
1068             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1069             if props is not None:
1070                 pre_version_id = props[self.SERIAL]
1071         if pre_version_id is not None:
1072             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1073         
1074         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1075         return pre_version_id, dest_version_id
1076     
1077     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1078         if src_version_id is not None:
1079             self.node.attribute_copy(src_version_id, dest_version_id)
1080         if not replace:
1081             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1082             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1083         else:
1084             self.node.attribute_del(dest_version_id, domain)
1085             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1086     
1087     def _put_metadata(self, user, node, domain, meta, replace=False):
1088         """Create a new version and store metadata."""
1089         
1090         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1091         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1092         return src_version_id, dest_version_id
1093     
1094     def _list_limits(self, listing, marker, limit):
1095         start = 0
1096         if marker:
1097             try:
1098                 start = listing.index(marker) + 1
1099             except ValueError:
1100                 pass
1101         if not limit or limit > 10000:
1102             limit = 10000
1103         return start, limit
1104     
1105     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1106         cont_prefix = path + '/'
1107         prefix = cont_prefix + prefix
1108         start = cont_prefix + marker if marker else None
1109         before = until if until is not None else inf
1110         filterq = keys if domain else []
1111         sizeq = size_range
1112         
1113         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1114         objects.extend([(p, None) for p in prefixes] if virtual else [])
1115         objects.sort(key=lambda x: x[0])
1116         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1117         return objects
1118         
1119     # Reporting functions.
1120     
1121     def _report_size_change(self, user, account, size, details={}):
1122         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1123         account_node = self._lookup_account(account, True)[1]
1124         total = self._get_statistics(account_node)[1]
1125         details.update({'user': user, 'total': total})
1126         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1127     
1128     def _report_object_change(self, user, account, path, details={}):
1129         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1130         details.update({'user': user})
1131         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1132     
1133     def _report_sharing_change(self, user, account, path, details={}):
1134         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1135         details.update({'user': user})
1136         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1137     
1138     # Policy functions.
1139     
1140     def _check_policy(self, policy):
1141         for k in policy.keys():
1142             if policy[k] == '':
1143                 policy[k] = self.default_policy.get(k)
1144         for k, v in policy.iteritems():
1145             if k == 'quota':
1146                 q = int(v) # May raise ValueError.
1147                 if q < 0:
1148                     raise ValueError
1149             elif k == 'versioning':
1150                 if v not in ['auto', 'none']:
1151                     raise ValueError
1152             else:
1153                 raise ValueError
1154     
1155     def _put_policy(self, node, policy, replace):
1156         if replace:
1157             for k, v in self.default_policy.iteritems():
1158                 if k not in policy:
1159                     policy[k] = v
1160         self.node.policy_set(node, policy)
1161     
1162     def _get_policy(self, node):
1163         policy = self.default_policy.copy()
1164         policy.update(self.node.policy_get(node))
1165         return policy
1166     
1167     def _apply_versioning(self, account, container, version_id):
1168         """Delete the provided version if such is the policy.
1169            Return size of object removed.
1170         """
1171         
1172         if version_id is None:
1173             return 0
1174         path, node = self._lookup_container(account, container)
1175         versioning = self._get_policy(node)['versioning']
1176         if versioning != 'auto':
1177             hash, size = self.node.version_remove(version_id)
1178             self.store.map_delete(hash)
1179             return size
1180         return 0
1181     
1182     # Access control functions.
1183     
1184     def _check_groups(self, groups):
1185         # raise ValueError('Bad characters in groups')
1186         pass
1187     
1188     def _check_permissions(self, path, permissions):
1189         # raise ValueError('Bad characters in permissions')
1190         pass
1191     
1192     def _get_formatted_paths(self, paths):
1193         formatted = []
1194         for p in paths:
1195             node = self.node.node_lookup(p)
1196             if node is not None:
1197                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1198             if props is not None:
1199                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1200                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1201                 formatted.append((p, self.MATCH_EXACT))
1202         return formatted
1203     
1204     def _get_permissions_path(self, account, container, name):
1205         path = '/'.join((account, container, name))
1206         permission_paths = self.permissions.access_inherit(path)
1207         permission_paths.sort()
1208         permission_paths.reverse()
1209         for p in permission_paths:
1210             if p == path:
1211                 return p
1212             else:
1213                 if p.count('/') < 2:
1214                     continue
1215                 node = self.node.node_lookup(p)
1216                 if node is not None:
1217                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1218                 if props is not None:
1219                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1220                         return p
1221         return None
1222     
1223     def _can_read(self, user, account, container, name):
1224         if user == account:
1225             return True
1226         path = '/'.join((account, container, name))
1227         if self.permissions.public_get(path) is not None:
1228             return True
1229         path = self._get_permissions_path(account, container, name)
1230         if not path:
1231             raise NotAllowedError
1232         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1233             raise NotAllowedError
1234     
1235     def _can_write(self, user, account, container, name):
1236         if user == account:
1237             return True
1238         path = '/'.join((account, container, name))
1239         path = self._get_permissions_path(account, container, name)
1240         if not path:
1241             raise NotAllowedError
1242         if not self.permissions.access_check(path, self.WRITE, user):
1243             raise NotAllowedError
1244     
1245     def _allowed_accounts(self, user):
1246         allow = set()
1247         for path in self.permissions.access_list_paths(user):
1248             allow.add(path.split('/', 1)[0])
1249         return sorted(allow)
1250     
1251     def _allowed_containers(self, user, account):
1252         allow = set()
1253         for path in self.permissions.access_list_paths(user, account):
1254             allow.add(path.split('/', 2)[1])
1255         return sorted(allow)