Fix size & type of copied files
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44
45 # Stripped-down version of the HashMap class found in tools.
46 class HashMap(list):
47
48     def __init__(self, blocksize, blockhash):
49         super(HashMap, self).__init__()
50         self.blocksize = blocksize
51         self.blockhash = blockhash
52
53     def _hash_raw(self, v):
54         h = hashlib.new(self.blockhash)
55         h.update(v)
56         return h.digest()
57
58     def hash(self):
59         if len(self) == 0:
60             return self._hash_raw('')
61         if len(self) == 1:
62             return self.__getitem__(0)
63
64         h = list(self)
65         s = 2
66         while s < len(h):
67             s = s * 2
68         h += [('\x00' * len(h[0]))] * (s - len(h))
69         while len(h) > 1:
70             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
71         return h[0]
72
73 # Default modules and settings.
74 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77 DEFAULT_BLOCK_PATH = 'data/'
78 DEFAULT_BLOCK_UMASK = 0o022
79 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
82 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 QUEUE_CLIENT_ID = 'pithos'
84 QUEUE_INSTANCE_ID = '1'
85
86 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
87
88 inf = float('inf')
89
90 ULTIMATE_ANSWER = 42
91
92
93 logger = logging.getLogger(__name__)
94
95
96 def backend_method(func=None, autocommit=1):
97     if func is None:
98         def fn(func):
99             return backend_method(func, autocommit)
100         return fn
101
102     if not autocommit:
103         return func
104     def fn(self, *args, **kw):
105         self.wrapper.execute()
106         try:
107             self.messages = []
108             ret = func(self, *args, **kw)
109             for m in self.messages:
110                 self.queue.send(*m)
111             self.wrapper.commit()
112             return ret
113         except:
114             self.wrapper.rollback()
115             raise
116     return fn
117
118
119 class ModularBackend(BaseBackend):
120     """A modular backend.
121     
122     Uses modules for SQL functions and storage.
123     """
124     
125     def __init__(self, db_module=None, db_connection=None,
126                  block_module=None, block_path=None, block_umask=None,
127                  queue_module=None, queue_connection=None):
128         db_module = db_module or DEFAULT_DB_MODULE
129         db_connection = db_connection or DEFAULT_DB_CONNECTION
130         block_module = block_module or DEFAULT_BLOCK_MODULE
131         block_path = block_path or DEFAULT_BLOCK_PATH
132         block_umask = block_umask or DEFAULT_BLOCK_UMASK
133         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135         
136         self.hash_algorithm = 'sha256'
137         self.block_size = 4 * 1024 * 1024 # 4MB
138         
139         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140         
141         def load_module(m):
142             __import__(m)
143             return sys.modules[m]
144         
145         self.db_module = load_module(db_module)
146         self.wrapper = self.db_module.DBWrapper(db_connection)
147         params = {'wrapper': self.wrapper}
148         self.permissions = self.db_module.Permissions(**params)
149         for x in ['READ', 'WRITE']:
150             setattr(self, x, getattr(self.db_module, x))
151         self.node = self.db_module.Node(**params)
152         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153             setattr(self, x, getattr(self.db_module, x))
154         
155         self.block_module = load_module(block_module)
156         params = {'path': block_path,
157                   'block_size': self.block_size,
158                   'hash_algorithm': self.hash_algorithm,
159                   'umask': block_umask}
160         self.store = self.block_module.Store(**params)
161
162         if queue_module and queue_connection:
163             self.queue_module = load_module(queue_module)
164             params = {'exchange': queue_connection,
165                       'client_id': QUEUE_CLIENT_ID}
166             self.queue = self.queue_module.Queue(**params)
167         else:
168             class NoQueue:
169                 def send(self, *args):
170                     pass
171                 
172                 def close(self):
173                     pass
174             
175             self.queue = NoQueue()
176     
177     def close(self):
178         self.wrapper.close()
179         self.queue.close()
180     
181     @backend_method
182     def list_accounts(self, user, marker=None, limit=10000):
183         """Return a list of accounts the user can access."""
184         
185         logger.debug("list_accounts: %s %s %s", user, marker, limit)
186         allowed = self._allowed_accounts(user)
187         start, limit = self._list_limits(allowed, marker, limit)
188         return allowed[start:start + limit]
189     
190     @backend_method
191     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192         """Return a dictionary with the account metadata for the domain."""
193         
194         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195         path, node = self._lookup_account(account, user == account)
196         if user != account:
197             if until or node is None or account not in self._allowed_accounts(user):
198                 raise NotAllowedError
199         try:
200             props = self._get_properties(node, until)
201             mtime = props[self.MTIME]
202         except NameError:
203             props = None
204             mtime = until
205         count, bytes, tstamp = self._get_statistics(node, until)
206         tstamp = max(tstamp, mtime)
207         if until is None:
208             modified = tstamp
209         else:
210             modified = self._get_statistics(node)[2] # Overall last modification.
211             modified = max(modified, mtime)
212         
213         if user != account:
214             meta = {'name': account}
215         else:
216             meta = {}
217             if props is not None and include_user_defined:
218                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219             if until is not None:
220                 meta.update({'until_timestamp': tstamp})
221             meta.update({'name': account, 'count': count, 'bytes': bytes})
222         meta.update({'modified': modified})
223         return meta
224     
225     @backend_method
226     def update_account_meta(self, user, account, domain, meta, replace=False):
227         """Update the metadata associated with the account for the domain."""
228         
229         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230         if user != account:
231             raise NotAllowedError
232         path, node = self._lookup_account(account, True)
233         self._put_metadata(user, node, domain, meta, replace)
234     
235     @backend_method
236     def get_account_groups(self, user, account):
237         """Return a dictionary with the user groups defined for this account."""
238         
239         logger.debug("get_account_groups: %s %s", user, account)
240         if user != account:
241             if account not in self._allowed_accounts(user):
242                 raise NotAllowedError
243             return {}
244         self._lookup_account(account, True)
245         return self.permissions.group_dict(account)
246     
247     @backend_method
248     def update_account_groups(self, user, account, groups, replace=False):
249         """Update the groups associated with the account."""
250         
251         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252         if user != account:
253             raise NotAllowedError
254         self._lookup_account(account, True)
255         self._check_groups(groups)
256         if replace:
257             self.permissions.group_destroy(account)
258         for k, v in groups.iteritems():
259             if not replace: # If not already deleted.
260                 self.permissions.group_delete(account, k)
261             if v:
262                 self.permissions.group_addmany(account, k, v)
263     
264     @backend_method
265     def get_account_policy(self, user, account):
266         """Return a dictionary with the account policy."""
267         
268         logger.debug("get_account_policy: %s %s", user, account)
269         if user != account:
270             if account not in self._allowed_accounts(user):
271                 raise NotAllowedError
272             return {}
273         path, node = self._lookup_account(account, True)
274         return self._get_policy(node)
275     
276     @backend_method
277     def update_account_policy(self, user, account, policy, replace=False):
278         """Update the policy associated with the account."""
279         
280         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281         if user != account:
282             raise NotAllowedError
283         path, node = self._lookup_account(account, True)
284         self._check_policy(policy)
285         self._put_policy(node, policy, replace)
286     
287     @backend_method
288     def put_account(self, user, account, policy={}):
289         """Create a new account with the given name."""
290         
291         logger.debug("put_account: %s %s %s", user, account, policy)
292         if user != account:
293             raise NotAllowedError
294         node = self.node.node_lookup(account)
295         if node is not None:
296             raise AccountExists('Account already exists')
297         if policy:
298             self._check_policy(policy)
299         node = self._put_path(user, self.ROOTNODE, account)
300         self._put_policy(node, policy, True)
301     
302     @backend_method
303     def delete_account(self, user, account):
304         """Delete the account with the given name."""
305         
306         logger.debug("delete_account: %s %s", user, account)
307         if user != account:
308             raise NotAllowedError
309         node = self.node.node_lookup(account)
310         if node is None:
311             return
312         if not self.node.node_remove(node):
313             raise AccountNotEmpty('Account is not empty')
314         self.permissions.group_destroy(account)
315     
316     @backend_method
317     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318         """Return a list of containers existing under an account."""
319         
320         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321         if user != account:
322             if until or account not in self._allowed_accounts(user):
323                 raise NotAllowedError
324             allowed = self._allowed_containers(user, account)
325             start, limit = self._list_limits(allowed, marker, limit)
326             return allowed[start:start + limit]
327         if shared or public:
328             allowed = set()
329             if shared:
330                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331             if public:
332                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333             allowed = sorted(allowed)
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339         return containers[start:start + limit]
340     
341     @backend_method
342     def list_container_meta(self, user, account, container, domain, until=None):
343         """Return a list with all the container's object meta keys for the domain."""
344         
345         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346         allowed = []
347         if user != account:
348             if until:
349                 raise NotAllowedError
350             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
351             if not allowed:
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         before = until if until is not None else inf
355         allowed = self._get_formatted_paths(allowed)
356         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357     
358     @backend_method
359     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360         """Return a dictionary with the container metadata for the domain."""
361         
362         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363         if user != account:
364             if until or container not in self._allowed_containers(user, account):
365                 raise NotAllowedError
366         path, node = self._lookup_container(account, container)
367         props = self._get_properties(node, until)
368         mtime = props[self.MTIME]
369         count, bytes, tstamp = self._get_statistics(node, until)
370         tstamp = max(tstamp, mtime)
371         if until is None:
372             modified = tstamp
373         else:
374             modified = self._get_statistics(node)[2] # Overall last modification.
375             modified = max(modified, mtime)
376         
377         if user != account:
378             meta = {'name': container}
379         else:
380             meta = {}
381             if include_user_defined:
382                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383             if until is not None:
384                 meta.update({'until_timestamp': tstamp})
385             meta.update({'name': container, 'count': count, 'bytes': bytes})
386         meta.update({'modified': modified})
387         return meta
388     
389     @backend_method
390     def update_container_meta(self, user, account, container, domain, meta, replace=False):
391         """Update the metadata associated with the container for the domain."""
392         
393         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394         if user != account:
395             raise NotAllowedError
396         path, node = self._lookup_container(account, container)
397         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398         if src_version_id is not None:
399             versioning = self._get_policy(node)['versioning']
400             if versioning != 'auto':
401                 self.node.version_remove(src_version_id)
402     
403     @backend_method
404     def get_container_policy(self, user, account, container):
405         """Return a dictionary with the container policy."""
406         
407         logger.debug("get_container_policy: %s %s %s", user, account, container)
408         if user != account:
409             if container not in self._allowed_containers(user, account):
410                 raise NotAllowedError
411             return {}
412         path, node = self._lookup_container(account, container)
413         return self._get_policy(node)
414     
415     @backend_method
416     def update_container_policy(self, user, account, container, policy, replace=False):
417         """Update the policy associated with the container."""
418         
419         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420         if user != account:
421             raise NotAllowedError
422         path, node = self._lookup_container(account, container)
423         self._check_policy(policy)
424         self._put_policy(node, policy, replace)
425     
426     @backend_method
427     def put_container(self, user, account, container, policy={}):
428         """Create a new container with the given name."""
429         
430         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431         if user != account:
432             raise NotAllowedError
433         try:
434             path, node = self._lookup_container(account, container)
435         except NameError:
436             pass
437         else:
438             raise ContainerExists('Container already exists')
439         if policy:
440             self._check_policy(policy)
441         path = '/'.join((account, container))
442         node = self._put_path(user, self._lookup_account(account, True)[1], path)
443         self._put_policy(node, policy, True)
444     
445     @backend_method
446     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447         """Delete/purge the container with the given name."""
448         
449         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450         if user != account:
451             raise NotAllowedError
452         path, node = self._lookup_container(account, container)
453         
454         if until is not None:
455             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456             for h in hashes:
457                 self.store.map_delete(h)
458             self.node.node_purge_children(node, until, CLUSTER_DELETED)
459             self._report_size_change(user, account, -size, {'action': 'container purge'})
460             return
461         
462         if self._get_statistics(node)[0] > 0:
463             raise ContainerNotEmpty('Container is not empty')
464         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
465         for h in hashes:
466             self.store.map_delete(h)
467         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468         self.node.node_remove(node)
469         self._report_size_change(user, account, -size, {'action': 'container delete'})
470     
471     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472         if user != account and until:
473             raise NotAllowedError
474         if shared and public:
475             # get shared first
476             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
477             objects = []
478             if shared:
479                 path, node = self._lookup_container(account, container)
480                 shared = self._get_formatted_paths(shared)
481                 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
482             
483             # get public
484             objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
485             
486             objects.sort(key=lambda x: x[0])
487             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488             return objects[start:start + limit]
489         elif public:
490             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492             return objects[start:start + limit]
493         
494         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495         if shared and not allowed:
496             return []
497         path, node = self._lookup_container(account, container)
498         allowed = self._get_formatted_paths(allowed)
499         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501         return objects[start:start + limit]
502     
503     def _list_public_object_properties(self, user, account, container, prefix, all_props):
504         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505         paths, nodes = self._lookup_objects(public)
506         path = '/'.join((account, container))
507         cont_prefix = path + '/'
508         paths = [x[len(cont_prefix):] for x in paths]
509         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510         objects = [(path,) + props for path, props in zip(paths, props)]
511         return objects
512         
513     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
514         objects = []
515         while True:
516             marker = objects[-1] if objects else None
517             limit = 10000
518             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
519             objects.extend(l)
520             if not l or len(l) < limit:
521                 break
522         return objects
523     
524     def _list_object_permissions(self, user, account, container, prefix, shared, public):
525         allowed = []
526         path = '/'.join((account, container, prefix)).rstrip('/')
527         if user != account:
528             allowed = self.permissions.access_list_paths(user, path)
529             if not allowed:
530                 raise NotAllowedError
531         else:
532             allowed = set()
533             if shared:
534                 allowed.update(self.permissions.access_list_shared(path))
535             if public:
536                 allowed.update([x[0] for x in self.permissions.public_list(path)])
537             allowed = sorted(allowed)
538             if not allowed:
539                 return []
540         return allowed
541     
542     @backend_method
543     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
544         """Return a list of object (name, version_id) tuples existing under a container."""
545         
546         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
547         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
548     
549     @backend_method
550     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
551         """Return a list of object metadata dicts existing under a container."""
552         
553         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
554         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
555         objects = []
556         for p in props:
557             if len(p) == 2:
558                 objects.append({'subdir': p[0]})
559             else:
560                 objects.append({'name': p[0],
561                                 'bytes': p[self.SIZE + 1],
562                                 'type': p[self.TYPE + 1],
563                                 'hash': p[self.HASH + 1],
564                                 'version': p[self.SERIAL + 1],
565                                 'version_timestamp': p[self.MTIME + 1],
566                                 'modified': p[self.MTIME + 1] if until is None else None,
567                                 'modified_by': p[self.MUSER + 1],
568                                 'uuid': p[self.UUID + 1],
569                                 'checksum': p[self.CHECKSUM + 1]})
570         return objects
571     
572     @backend_method
573     def list_object_permissions(self, user, account, container, prefix=''):
574         """Return a list of paths that enforce permissions under a container."""
575         
576         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
577         return self._list_object_permissions(user, account, container, prefix, True, False)
578     
579     @backend_method
580     def list_object_public(self, user, account, container, prefix=''):
581         """Return a dict mapping paths to public ids for objects that are public under a container."""
582         
583         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
584         public = {}
585         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
586             public[path] = p + ULTIMATE_ANSWER
587         return public
588     
589     @backend_method
590     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
591         """Return a dictionary with the object metadata for the domain."""
592         
593         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
594         self._can_read(user, account, container, name)
595         path, node = self._lookup_object(account, container, name)
596         props = self._get_version(node, version)
597         if version is None:
598             modified = props[self.MTIME]
599         else:
600             try:
601                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
602             except NameError: # Object may be deleted.
603                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
604                 if del_props is None:
605                     raise ItemNotExists('Object does not exist')
606                 modified = del_props[self.MTIME]
607         
608         meta = {}
609         if include_user_defined:
610             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
611         meta.update({'name': name,
612                      'bytes': props[self.SIZE],
613                      'type': props[self.TYPE],
614                      'hash': props[self.HASH],
615                      'version': props[self.SERIAL],
616                      'version_timestamp': props[self.MTIME],
617                      'modified': modified,
618                      'modified_by': props[self.MUSER],
619                      'uuid': props[self.UUID],
620                      'checksum': props[self.CHECKSUM]})
621         return meta
622     
623     @backend_method
624     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
625         """Update the metadata associated with the object for the domain and return the new version."""
626         
627         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
628         self._can_write(user, account, container, name)
629         path, node = self._lookup_object(account, container, name)
630         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
631         self._apply_versioning(account, container, src_version_id)
632         return dest_version_id
633     
634     @backend_method
635     def get_object_permissions(self, user, account, container, name):
636         """Return the action allowed on the object, the path
637         from which the object gets its permissions from,
638         along with a dictionary containing the permissions."""
639         
640         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
641         allowed = 'write'
642         permissions_path = self._get_permissions_path(account, container, name)
643         if user != account:
644             if self.permissions.access_check(permissions_path, self.WRITE, user):
645                 allowed = 'write'
646             elif self.permissions.access_check(permissions_path, self.READ, user):
647                 allowed = 'read'
648             else:
649                 raise NotAllowedError
650         self._lookup_object(account, container, name)
651         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
652     
653     @backend_method
654     def update_object_permissions(self, user, account, container, name, permissions):
655         """Update the permissions associated with the object."""
656         
657         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
658         if user != account:
659             raise NotAllowedError
660         path = self._lookup_object(account, container, name)[0]
661         self._check_permissions(path, permissions)
662         self.permissions.access_set(path, permissions)
663         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
664     
665     @backend_method
666     def get_object_public(self, user, account, container, name):
667         """Return the public id of the object if applicable."""
668         
669         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
670         self._can_read(user, account, container, name)
671         path = self._lookup_object(account, container, name)[0]
672         p = self.permissions.public_get(path)
673         if p is not None:
674             p += ULTIMATE_ANSWER
675         return p
676     
677     @backend_method
678     def update_object_public(self, user, account, container, name, public):
679         """Update the public status of the object."""
680         
681         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
682         self._can_write(user, account, container, name)
683         path = self._lookup_object(account, container, name)[0]
684         if not public:
685             self.permissions.public_unset(path)
686         else:
687             self.permissions.public_set(path)
688     
689     @backend_method
690     def get_object_hashmap(self, user, account, container, name, version=None):
691         """Return the object's size and a list with partial hashes."""
692         
693         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
694         self._can_read(user, account, container, name)
695         path, node = self._lookup_object(account, container, name)
696         props = self._get_version(node, version)
697         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
698         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
699     
700     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
701         if permissions is not None and user != account:
702             raise NotAllowedError
703         self._can_write(user, account, container, name)
704         if permissions is not None:
705             path = '/'.join((account, container, name))
706             self._check_permissions(path, permissions)
707         
708         account_path, account_node = self._lookup_account(account, True)
709         container_path, container_node = self._lookup_container(account, container)
710         path, node = self._put_object_node(container_path, container_node, name)
711         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
712         
713         # Handle meta.
714         if src_version_id is None:
715             src_version_id = pre_version_id
716         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
717         
718         # Check quota.
719         del_size = self._apply_versioning(account, container, pre_version_id)
720         size_delta = size - del_size
721         if size_delta > 0:
722             account_quota = long(self._get_policy(account_node)['quota'])
723             container_quota = long(self._get_policy(container_node)['quota'])
724             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
725                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
726                 # This must be executed in a transaction, so the version is never created if it fails.
727                 raise QuotaError
728         self._report_size_change(user, account, size_delta, {'action': 'object update'})
729         
730         if permissions is not None:
731             self.permissions.access_set(path, permissions)
732             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
733         
734         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
735         return dest_version_id
736     
737     @backend_method
738     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
739         """Create/update an object with the specified size and partial hashes."""
740         
741         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
742         if size == 0: # No such thing as an empty hashmap.
743             hashmap = [self.put_block('')]
744         map = HashMap(self.block_size, self.hash_algorithm)
745         map.extend([binascii.unhexlify(x) for x in hashmap])
746         missing = self.store.block_search(map)
747         if missing:
748             ie = IndexError()
749             ie.data = [binascii.hexlify(x) for x in missing]
750             raise ie
751         
752         hash = map.hash()
753         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
754         self.store.map_put(hash, map)
755         return dest_version_id
756     
757     @backend_method
758     def update_object_checksum(self, user, account, container, name, version, checksum):
759         """Update an object's checksum."""
760         
761         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
762         # Update objects with greater version and same hashmap and size (fix metadata updates).
763         self._can_write(user, account, container, name)
764         path, node = self._lookup_object(account, container, name)
765         props = self._get_version(node, version)
766         versions = self.node.node_get_versions(node)
767         for x in versions:
768             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
769                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
770     
771     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
772         dest_version_ids = []
773         self._can_read(user, src_account, src_container, src_name)
774         path, node = self._lookup_object(src_account, src_container, src_name)
775         # TODO: Will do another fetch of the properties in duplicate version...
776         props = self._get_version(node, src_version) # Check to see if source exists.
777         src_version_id = props[self.SERIAL]
778         hash = props[self.HASH]
779         size = props[self.SIZE]
780         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
781         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
782         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
783                 self._delete_object(user, src_account, src_container, src_name)
784         
785         if delimiter:
786             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
787             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
788             src_names.sort(key=lambda x: x[2]) # order by nodes
789             paths = [elem[0] for elem in src_names]
790             nodes = [elem[2] for elem in src_names]
791             # TODO: Will do another fetch of the properties in duplicate version...
792             props = self._get_versions(nodes) # Check to see if source exists.
793             
794             for prop, path, node in zip(props, paths, nodes):
795                 src_version_id = prop[self.SERIAL]
796                 hash = prop[self.HASH]
797                 vtype = prop[self.TYPE]
798                 size = prop[self.SIZE]
799                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
800                 vdest_name = path.replace(prefix, dest_prefix, 1)
801                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
802                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
803                         self._delete_object(user, src_account, src_container, path)
804         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
805     
806     @backend_method
807     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
808         """Copy an object's data and metadata."""
809         
810         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
811         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
812         return dest_version_id
813     
814     @backend_method
815     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
816         """Move an object's data and metadata."""
817         
818         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
819         if user != src_account:
820             raise NotAllowedError
821         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
822         return dest_version_id
823     
824     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
825         if user != account:
826             raise NotAllowedError
827         
828         if until is not None:
829             path = '/'.join((account, container, name))
830             node = self.node.node_lookup(path)
831             if node is None:
832                 return
833             hashes = []
834             size = 0
835             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
836             hashes += h
837             size += s
838             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
839             hashes += h
840             size += s
841             for h in hashes:
842                 self.store.map_delete(h)
843             self.node.node_purge(node, until, CLUSTER_DELETED)
844             try:
845                 props = self._get_version(node)
846             except NameError:
847                 self.permissions.access_clear(path)
848             self._report_size_change(user, account, -size, {'action': 'object purge'})
849             return
850         
851         path, node = self._lookup_object(account, container, name)
852         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
853         del_size = self._apply_versioning(account, container, src_version_id)
854         if del_size:
855             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
856         self._report_object_change(user, account, path, details={'action': 'object delete'})
857         self.permissions.access_clear(path)
858         
859         if delimiter:
860             prefix = name + delimiter if not name.endswith(delimiter) else name
861             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
862             paths = []
863             for t in src_names:
864                 path = '/'.join((account, container, t[0]))
865                 node = t[2]
866                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
867                 del_size = self._apply_versioning(account, container, src_version_id)
868                 if del_size:
869                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
870                 self._report_object_change(user, account, path, details={'action': 'object delete'})
871                 paths.append(path)
872             self.permissions.access_clear_bulk(paths)
873     
874     @backend_method
875     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
876         """Delete/purge an object."""
877         
878         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
879         self._delete_object(user, account, container, name, until, delimiter)
880     
881     @backend_method
882     def list_versions(self, user, account, container, name):
883         """Return a list of all (version, version_timestamp) tuples for an object."""
884         
885         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
886         self._can_read(user, account, container, name)
887         path, node = self._lookup_object(account, container, name)
888         versions = self.node.node_get_versions(node)
889         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
890     
891     @backend_method
892     def get_uuid(self, user, uuid):
893         """Return the (account, container, name) for the UUID given."""
894         
895         logger.debug("get_uuid: %s %s", user, uuid)
896         info = self.node.latest_uuid(uuid)
897         if info is None:
898             raise NameError
899         path, serial = info
900         account, container, name = path.split('/', 2)
901         self._can_read(user, account, container, name)
902         return (account, container, name)
903     
904     @backend_method
905     def get_public(self, user, public):
906         """Return the (account, container, name) for the public id given."""
907         
908         logger.debug("get_public: %s %s", user, public)
909         if public is None or public < ULTIMATE_ANSWER:
910             raise NameError
911         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
912         if path is None:
913             raise NameError
914         account, container, name = path.split('/', 2)
915         self._can_read(user, account, container, name)
916         return (account, container, name)
917     
918     @backend_method(autocommit=0)
919     def get_block(self, hash):
920         """Return a block's data."""
921         
922         logger.debug("get_block: %s", hash)
923         block = self.store.block_get(binascii.unhexlify(hash))
924         if not block:
925             raise ItemNotExists('Block does not exist')
926         return block
927     
928     @backend_method(autocommit=0)
929     def put_block(self, data):
930         """Store a block and return the hash."""
931         
932         logger.debug("put_block: %s", len(data))
933         return binascii.hexlify(self.store.block_put(data))
934     
935     @backend_method(autocommit=0)
936     def update_block(self, hash, data, offset=0):
937         """Update a known block and return the hash."""
938         
939         logger.debug("update_block: %s %s %s", hash, len(data), offset)
940         if offset == 0 and len(data) == self.block_size:
941             return self.put_block(data)
942         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
943         return binascii.hexlify(h)
944     
945     # Path functions.
946     
947     def _generate_uuid(self):
948         return str(uuidlib.uuid4())
949     
950     def _put_object_node(self, path, parent, name):
951         path = '/'.join((path, name))
952         node = self.node.node_lookup(path)
953         if node is None:
954             node = self.node.node_create(parent, path)
955         return path, node
956     
957     def _put_path(self, user, parent, path):
958         node = self.node.node_create(parent, path)
959         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
960         return node
961     
962     def _lookup_account(self, account, create=True):
963         node = self.node.node_lookup(account)
964         if node is None and create:
965             node = self._put_path(account, self.ROOTNODE, account) # User is account.
966         return account, node
967     
968     def _lookup_container(self, account, container):
969         path = '/'.join((account, container))
970         node = self.node.node_lookup(path)
971         if node is None:
972             raise ItemNotExists('Container does not exist')
973         return path, node
974     
975     def _lookup_object(self, account, container, name):
976         path = '/'.join((account, container, name))
977         node = self.node.node_lookup(path)
978         if node is None:
979             raise ItemNotExists('Object does not exist')
980         return path, node
981     
982     def _lookup_objects(self, paths):
983         nodes = self.node.node_lookup_bulk(paths)
984         return paths, nodes
985     
986     def _get_properties(self, node, until=None):
987         """Return properties until the timestamp given."""
988         
989         before = until if until is not None else inf
990         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
991         if props is None and until is not None:
992             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
993         if props is None:
994             raise ItemNotExists('Path does not exist')
995         return props
996     
997     def _get_statistics(self, node, until=None):
998         """Return count, sum of size and latest timestamp of everything under node."""
999         
1000         if until is None:
1001             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1002         else:
1003             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1004         if stats is None:
1005             stats = (0, 0, 0)
1006         return stats
1007     
1008     def _get_version(self, node, version=None):
1009         if version is None:
1010             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1011             if props is None:
1012                 raise ItemNotExists('Object does not exist')
1013         else:
1014             try:
1015                 version = int(version)
1016             except ValueError:
1017                 raise VersionNotExists('Version does not exist')
1018             props = self.node.version_get_properties(version)
1019             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1020                 raise VersionNotExists('Version does not exist')
1021         return props
1022
1023     def _get_versions(self, nodes):
1024         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1025     
1026     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1027         """Create a new version of the node."""
1028         
1029         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1030         if props is not None:
1031             src_version_id = props[self.SERIAL]
1032             src_hash = props[self.HASH]
1033             src_size = props[self.SIZE]
1034             src_type = props[self.TYPE]
1035             src_checksum = props[self.CHECKSUM]
1036         else:
1037             src_version_id = None
1038             src_hash = None
1039             src_size = 0
1040             src_type = ''
1041             src_checksum = ''
1042         if size is None: # Set metadata.
1043             hash = src_hash # This way hash can be set to None (account or container).
1044             size = src_size
1045         if type is None:
1046             type = src_type
1047         if checksum is None:
1048             checksum = src_checksum
1049         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1050         
1051         if src_node is None:
1052             pre_version_id = src_version_id
1053         else:
1054             pre_version_id = None
1055             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1056             if props is not None:
1057                 pre_version_id = props[self.SERIAL]
1058         if pre_version_id is not None:
1059             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1060         
1061         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1062         return pre_version_id, dest_version_id
1063     
1064     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1065         if src_version_id is not None:
1066             self.node.attribute_copy(src_version_id, dest_version_id)
1067         if not replace:
1068             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1069             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1070         else:
1071             self.node.attribute_del(dest_version_id, domain)
1072             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1073     
1074     def _put_metadata(self, user, node, domain, meta, replace=False):
1075         """Create a new version and store metadata."""
1076         
1077         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1078         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1079         return src_version_id, dest_version_id
1080     
1081     def _list_limits(self, listing, marker, limit):
1082         start = 0
1083         if marker:
1084             try:
1085                 start = listing.index(marker) + 1
1086             except ValueError:
1087                 pass
1088         if not limit or limit > 10000:
1089             limit = 10000
1090         return start, limit
1091     
1092     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1093         cont_prefix = path + '/'
1094         prefix = cont_prefix + prefix
1095         start = cont_prefix + marker if marker else None
1096         before = until if until is not None else inf
1097         filterq = keys if domain else []
1098         sizeq = size_range
1099         
1100         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1101         objects.extend([(p, None) for p in prefixes] if virtual else [])
1102         objects.sort(key=lambda x: x[0])
1103         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1104         return objects
1105         
1106     # Reporting functions.
1107     
1108     def _report_size_change(self, user, account, size, details={}):
1109         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1110         account_node = self._lookup_account(account, True)[1]
1111         total = self._get_statistics(account_node)[1]
1112         details.update({'user': user, 'total': total})
1113         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1114     
1115     def _report_object_change(self, user, account, path, details={}):
1116         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1117         details.update({'user': user})
1118         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1119     
1120     def _report_sharing_change(self, user, account, path, details={}):
1121         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1122         details.update({'user': user})
1123         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1124     
1125     # Policy functions.
1126     
1127     def _check_policy(self, policy):
1128         for k in policy.keys():
1129             if policy[k] == '':
1130                 policy[k] = self.default_policy.get(k)
1131         for k, v in policy.iteritems():
1132             if k == 'quota':
1133                 q = int(v) # May raise ValueError.
1134                 if q < 0:
1135                     raise ValueError
1136             elif k == 'versioning':
1137                 if v not in ['auto', 'none']:
1138                     raise ValueError
1139             else:
1140                 raise ValueError
1141     
1142     def _put_policy(self, node, policy, replace):
1143         if replace:
1144             for k, v in self.default_policy.iteritems():
1145                 if k not in policy:
1146                     policy[k] = v
1147         self.node.policy_set(node, policy)
1148     
1149     def _get_policy(self, node):
1150         policy = self.default_policy.copy()
1151         policy.update(self.node.policy_get(node))
1152         return policy
1153     
1154     def _apply_versioning(self, account, container, version_id):
1155         """Delete the provided version if such is the policy.
1156            Return size of object removed.
1157         """
1158         
1159         if version_id is None:
1160             return 0
1161         path, node = self._lookup_container(account, container)
1162         versioning = self._get_policy(node)['versioning']
1163         if versioning != 'auto':
1164             hash, size = self.node.version_remove(version_id)
1165             self.store.map_delete(hash)
1166             return size
1167         return 0
1168     
1169     # Access control functions.
1170     
1171     def _check_groups(self, groups):
1172         # raise ValueError('Bad characters in groups')
1173         pass
1174     
1175     def _check_permissions(self, path, permissions):
1176         # raise ValueError('Bad characters in permissions')
1177         pass
1178     
1179     def _get_formatted_paths(self, paths):
1180         formatted = []
1181         for p in paths:
1182             node = self.node.node_lookup(p)
1183             if node is not None:
1184                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1185             if props is not None:
1186                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1187                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1188                 formatted.append((p, self.MATCH_EXACT))
1189         return formatted
1190     
1191     def _get_permissions_path(self, account, container, name):
1192         path = '/'.join((account, container, name))
1193         permission_paths = self.permissions.access_inherit(path)
1194         permission_paths.sort()
1195         permission_paths.reverse()
1196         for p in permission_paths:
1197             if p == path:
1198                 return p
1199             else:
1200                 if p.count('/') < 2:
1201                     continue
1202                 node = self.node.node_lookup(p)
1203                 if node is not None:
1204                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1205                 if props is not None:
1206                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1207                         return p
1208         return None
1209     
1210     def _can_read(self, user, account, container, name):
1211         if user == account:
1212             return True
1213         path = '/'.join((account, container, name))
1214         if self.permissions.public_get(path) is not None:
1215             return True
1216         path = self._get_permissions_path(account, container, name)
1217         if not path:
1218             raise NotAllowedError
1219         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1220             raise NotAllowedError
1221     
1222     def _can_write(self, user, account, container, name):
1223         if user == account:
1224             return True
1225         path = '/'.join((account, container, name))
1226         path = self._get_permissions_path(account, container, name)
1227         if not path:
1228             raise NotAllowedError
1229         if not self.permissions.access_check(path, self.WRITE, user):
1230             raise NotAllowedError
1231     
1232     def _allowed_accounts(self, user):
1233         allow = set()
1234         for path in self.permissions.access_list_paths(user):
1235             allow.add(path.split('/', 1)[0])
1236         return sorted(allow)
1237     
1238     def _allowed_containers(self, user, account):
1239         allow = set()
1240         for path in self.permissions.access_list_paths(user, account):
1241             allow.add(path.split('/', 2)[1])
1242         return sorted(allow)