390198a4b11c058b6d31ae639f5612dc75b96ce2
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
44
45 # Stripped-down version of the HashMap class found in tools.
46
47
48 class HashMap(list):
49
50     def __init__(self, blocksize, blockhash):
51         super(HashMap, self).__init__()
52         self.blocksize = blocksize
53         self.blockhash = blockhash
54
55     def _hash_raw(self, v):
56         h = hashlib.new(self.blockhash)
57         h.update(v)
58         return h.digest()
59
60     def hash(self):
61         if len(self) == 0:
62             return self._hash_raw('')
63         if len(self) == 1:
64             return self.__getitem__(0)
65
66         h = list(self)
67         s = 2
68         while s < len(h):
69             s = s * 2
70         h += [('\x00' * len(h[0]))] * (s - len(h))
71         while len(h) > 1:
72             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73         return h[0]
74
75 # Default modules and settings.
76 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79 DEFAULT_BLOCK_PATH = 'data/'
80 DEFAULT_BLOCK_UMASK = 0o022
81 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
83
84 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
85 QUEUE_CLIENT_ID = 'pithos'
86 QUEUE_INSTANCE_ID = '1'
87
88 (CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
89
90 inf = float('inf')
91
92 ULTIMATE_ANSWER = 42
93
94
95 logger = logging.getLogger(__name__)
96
97
98 def backend_method(func=None, autocommit=1):
99     if func is None:
100         def fn(func):
101             return backend_method(func, autocommit)
102         return fn
103
104     if not autocommit:
105         return func
106
107     def fn(self, *args, **kw):
108         self.wrapper.execute()
109         try:
110             self.messages = []
111             ret = func(self, *args, **kw)
112             for m in self.messages:
113                 self.queue.send(*m)
114             self.wrapper.commit()
115             return ret
116         except:
117             self.wrapper.rollback()
118             raise
119     return fn
120
121
122 class ModularBackend(BaseBackend):
123     """A modular backend.
124
125     Uses modules for SQL functions and storage.
126     """
127
128     def __init__(self, db_module=None, db_connection=None,
129                  block_module=None, block_path=None, block_umask=None,
130                  queue_module=None, queue_connection=None):
131         db_module = db_module or DEFAULT_DB_MODULE
132         db_connection = db_connection or DEFAULT_DB_CONNECTION
133         block_module = block_module or DEFAULT_BLOCK_MODULE
134         block_path = block_path or DEFAULT_BLOCK_PATH
135         block_umask = block_umask or DEFAULT_BLOCK_UMASK
136         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
138
139         self.hash_algorithm = 'sha256'
140         self.block_size = 4 * 1024 * 1024  # 4MB
141
142         self.default_policy = {'quota': DEFAULT_QUOTA,
143                                'versioning': DEFAULT_VERSIONING}
144
145         def load_module(m):
146             __import__(m)
147             return sys.modules[m]
148
149         self.db_module = load_module(db_module)
150         self.wrapper = self.db_module.DBWrapper(db_connection)
151         params = {'wrapper': self.wrapper}
152         self.permissions = self.db_module.Permissions(**params)
153         self.config = seld.db_module.Config(**params)
154         self.config = seld.db_module.QuotaholderSync(**params)
155         for x in ['READ', 'WRITE']:
156             setattr(self, x, getattr(self.db_module, x))
157         self.node = self.db_module.Node(**params)
158         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
159             setattr(self, x, getattr(self.db_module, x))
160
161         self.block_module = load_module(block_module)
162         params = {'path': block_path,
163                   'block_size': self.block_size,
164                   'hash_algorithm': self.hash_algorithm,
165                   'umask': block_umask}
166         self.store = self.block_module.Store(**params)
167
168         if queue_module and queue_connection:
169             self.queue_module = load_module(queue_module)
170             params = {'exchange': queue_connection,
171                       'client_id': QUEUE_CLIENT_ID}
172             self.queue = self.queue_module.Queue(**params)
173         else:
174             class NoQueue:
175                 def send(self, *args):
176                     pass
177
178                 def close(self):
179                     pass
180
181             self.queue = NoQueue()
182
183     def close(self):
184         self.wrapper.close()
185         self.queue.close()
186
187     @backend_method
188     def list_accounts(self, user, marker=None, limit=10000):
189         """Return a list of accounts the user can access."""
190
191         logger.debug("list_accounts: %s %s %s", user, marker, limit)
192         allowed = self._allowed_accounts(user)
193         start, limit = self._list_limits(allowed, marker, limit)
194         return allowed[start:start + limit]
195
196     @backend_method
197     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
198         """Return a dictionary with the account metadata for the domain."""
199
200         logger.debug(
201             "get_account_meta: %s %s %s %s", user, account, domain, until)
202         path, node = self._lookup_account(account, user == account)
203         if user != account:
204             if until or node is None or account not in self._allowed_accounts(user):
205                 raise NotAllowedError
206         try:
207             props = self._get_properties(node, until)
208             mtime = props[self.MTIME]
209         except NameError:
210             props = None
211             mtime = until
212         count, bytes, tstamp = self._get_statistics(node, until)
213         tstamp = max(tstamp, mtime)
214         if until is None:
215             modified = tstamp
216         else:
217             modified = self._get_statistics(
218                 node)[2]  # Overall last modification.
219             modified = max(modified, mtime)
220
221         if user != account:
222             meta = {'name': account}
223         else:
224             meta = {}
225             if props is not None and include_user_defined:
226                 meta.update(
227                     dict(self.node.attribute_get(props[self.SERIAL], domain)))
228             if until is not None:
229                 meta.update({'until_timestamp': tstamp})
230             meta.update({'name': account, 'count': count, 'bytes': bytes})
231         meta.update({'modified': modified})
232         return meta
233
234     @backend_method
235     def update_account_meta(self, user, account, domain, meta, replace=False):
236         """Update the metadata associated with the account for the domain."""
237
238         logger.debug("update_account_meta: %s %s %s %s %s", user,
239                      account, domain, meta, replace)
240         if user != account:
241             raise NotAllowedError
242         path, node = self._lookup_account(account, True)
243         self._put_metadata(user, node, domain, meta, replace)
244
245     @backend_method
246     def get_account_groups(self, user, account):
247         """Return a dictionary with the user groups defined for this account."""
248
249         logger.debug("get_account_groups: %s %s", user, account)
250         if user != account:
251             if account not in self._allowed_accounts(user):
252                 raise NotAllowedError
253             return {}
254         self._lookup_account(account, True)
255         return self.permissions.group_dict(account)
256
257     @backend_method
258     def update_account_groups(self, user, account, groups, replace=False):
259         """Update the groups associated with the account."""
260
261         logger.debug("update_account_groups: %s %s %s %s", user,
262                      account, groups, replace)
263         if user != account:
264             raise NotAllowedError
265         self._lookup_account(account, True)
266         self._check_groups(groups)
267         if replace:
268             self.permissions.group_destroy(account)
269         for k, v in groups.iteritems():
270             if not replace:  # If not already deleted.
271                 self.permissions.group_delete(account, k)
272             if v:
273                 self.permissions.group_addmany(account, k, v)
274
275     @backend_method
276     def get_account_policy(self, user, account):
277         """Return a dictionary with the account policy."""
278
279         logger.debug("get_account_policy: %s %s", user, account)
280         if user != account:
281             if account not in self._allowed_accounts(user):
282                 raise NotAllowedError
283             return {}
284         path, node = self._lookup_account(account, True)
285         return self._get_policy(node)
286
287     @backend_method
288     def update_account_policy(self, user, account, policy, replace=False):
289         """Update the policy associated with the account."""
290
291         logger.debug("update_account_policy: %s %s %s %s", user,
292                      account, policy, replace)
293         if user != account:
294             raise NotAllowedError
295         path, node = self._lookup_account(account, True)
296         self._check_policy(policy)
297         self._put_policy(node, policy, replace)
298
299     @backend_method
300     def put_account(self, user, account, policy={}):
301         """Create a new account with the given name."""
302
303         logger.debug("put_account: %s %s %s", user, account, policy)
304         if user != account:
305             raise NotAllowedError
306         node = self.node.node_lookup(account)
307         if node is not None:
308             raise AccountExists('Account already exists')
309         if policy:
310             self._check_policy(policy)
311         node = self._put_path(user, self.ROOTNODE, account)
312         self._put_policy(node, policy, True)
313
314     @backend_method
315     def delete_account(self, user, account):
316         """Delete the account with the given name."""
317
318         logger.debug("delete_account: %s %s", user, account)
319         if user != account:
320             raise NotAllowedError
321         node = self.node.node_lookup(account)
322         if node is None:
323             return
324         if not self.node.node_remove(node):
325             raise AccountNotEmpty('Account is not empty')
326         self.permissions.group_destroy(account)
327
328     @backend_method
329     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
330         """Return a list of containers existing under an account."""
331
332         logger.debug("list_containers: %s %s %s %s %s %s %s", user,
333                      account, marker, limit, shared, until, public)
334         if user != account:
335             if until or account not in self._allowed_accounts(user):
336                 raise NotAllowedError
337             allowed = self._allowed_containers(user, account)
338             start, limit = self._list_limits(allowed, marker, limit)
339             return allowed[start:start + limit]
340         if shared or public:
341             allowed = set()
342             if shared:
343                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
344             if public:
345                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
346             allowed = sorted(allowed)
347             start, limit = self._list_limits(allowed, marker, limit)
348             return allowed[start:start + limit]
349         node = self.node.node_lookup(account)
350         containers = [x[0] for x in self._list_object_properties(
351             node, account, '', '/', marker, limit, False, None, [], until)]
352         start, limit = self._list_limits(
353             [x[0] for x in containers], marker, limit)
354         return containers[start:start + limit]
355
356     @backend_method
357     def list_container_meta(self, user, account, container, domain, until=None):
358         """Return a list with all the container's object meta keys for the domain."""
359
360         logger.debug("list_container_meta: %s %s %s %s %s", user,
361                      account, container, domain, until)
362         allowed = []
363         if user != account:
364             if until:
365                 raise NotAllowedError
366             allowed = self.permissions.access_list_paths(
367                 user, '/'.join((account, container)))
368             if not allowed:
369                 raise NotAllowedError
370         path, node = self._lookup_container(account, container)
371         before = until if until is not None else inf
372         allowed = self._get_formatted_paths(allowed)
373         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
374
375     @backend_method
376     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
377         """Return a dictionary with the container metadata for the domain."""
378
379         logger.debug("get_container_meta: %s %s %s %s %s", user,
380                      account, container, domain, until)
381         if user != account:
382             if until or container not in self._allowed_containers(user, account):
383                 raise NotAllowedError
384         path, node = self._lookup_container(account, container)
385         props = self._get_properties(node, until)
386         mtime = props[self.MTIME]
387         count, bytes, tstamp = self._get_statistics(node, until)
388         tstamp = max(tstamp, mtime)
389         if until is None:
390             modified = tstamp
391         else:
392             modified = self._get_statistics(
393                 node)[2]  # Overall last modification.
394             modified = max(modified, mtime)
395
396         if user != account:
397             meta = {'name': container}
398         else:
399             meta = {}
400             if include_user_defined:
401                 meta.update(
402                     dict(self.node.attribute_get(props[self.SERIAL], domain)))
403             if until is not None:
404                 meta.update({'until_timestamp': tstamp})
405             meta.update({'name': container, 'count': count, 'bytes': bytes})
406         meta.update({'modified': modified})
407         return meta
408
409     @backend_method
410     def update_container_meta(self, user, account, container, domain, meta, replace=False):
411         """Update the metadata associated with the container for the domain."""
412
413         logger.debug("update_container_meta: %s %s %s %s %s %s",
414                      user, account, container, domain, meta, replace)
415         if user != account:
416             raise NotAllowedError
417         path, node = self._lookup_container(account, container)
418         src_version_id, dest_version_id = self._put_metadata(
419             user, node, domain, meta, replace)
420         if src_version_id is not None:
421             versioning = self._get_policy(node)['versioning']
422             if versioning != 'auto':
423                 self.node.version_remove(src_version_id)
424
425     @backend_method
426     def get_container_policy(self, user, account, container):
427         """Return a dictionary with the container policy."""
428
429         logger.debug(
430             "get_container_policy: %s %s %s", user, account, container)
431         if user != account:
432             if container not in self._allowed_containers(user, account):
433                 raise NotAllowedError
434             return {}
435         path, node = self._lookup_container(account, container)
436         return self._get_policy(node)
437
438     @backend_method
439     def update_container_policy(self, user, account, container, policy, replace=False):
440         """Update the policy associated with the container."""
441
442         logger.debug("update_container_policy: %s %s %s %s %s",
443                      user, account, container, policy, replace)
444         if user != account:
445             raise NotAllowedError
446         path, node = self._lookup_container(account, container)
447         self._check_policy(policy)
448         self._put_policy(node, policy, replace)
449
450     @backend_method
451     def put_container(self, user, account, container, policy={}):
452         """Create a new container with the given name."""
453
454         logger.debug(
455             "put_container: %s %s %s %s", user, account, container, policy)
456         if user != account:
457             raise NotAllowedError
458         try:
459             path, node = self._lookup_container(account, container)
460         except NameError:
461             pass
462         else:
463             raise ContainerExists('Container already exists')
464         if policy:
465             self._check_policy(policy)
466         path = '/'.join((account, container))
467         node = self._put_path(
468             user, self._lookup_account(account, True)[1], path)
469         self._put_policy(node, policy, True)
470
471     @backend_method
472     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
473         """Delete/purge the container with the given name."""
474
475         logger.debug("delete_container: %s %s %s %s %s %s", user,
476                      account, container, until, prefix, delimiter)
477         if user != account:
478             raise NotAllowedError
479         path, node = self._lookup_container(account, container)
480
481         if until is not None:
482             hashes, size = self.node.node_purge_children(
483                 node, until, CLUSTER_HISTORY)
484             for h in hashes:
485                 self.store.map_delete(h)
486             self.node.node_purge_children(node, until, CLUSTER_DELETED)
487             self._report_size_change(user, account, -size, {'action':
488                                      'container purge', 'path': path})
489             return
490
491         if not delimiter:
492             if self._get_statistics(node)[0] > 0:
493                 raise ContainerNotEmpty('Container is not empty')
494             hashes, size = self.node.node_purge_children(
495                 node, inf, CLUSTER_HISTORY)
496             for h in hashes:
497                 self.store.map_delete(h)
498             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
499             self.node.node_remove(node)
500             self._report_size_change(user, account, -size, {'action':
501                                      'container delete', 'path': path})
502         else:
503                 # remove only contents
504             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
505             paths = []
506             for t in src_names:
507                 path = '/'.join((account, container, t[0]))
508                 node = t[2]
509                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
510                 del_size = self._apply_versioning(
511                     account, container, src_version_id)
512                 if del_size:
513                     self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
514                 self._report_object_change(
515                     user, account, path, details={'action': 'object delete'})
516                 paths.append(path)
517             self.permissions.access_clear_bulk(paths)
518
519     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
520         if user != account and until:
521             raise NotAllowedError
522         if shared and public:
523             # get shared first
524             shared = self._list_object_permissions(
525                 user, account, container, prefix, shared=True, public=False)
526             objects = set()
527             if shared:
528                 path, node = self._lookup_container(account, container)
529                 shared = self._get_formatted_paths(shared)
530                 objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
531
532             # get public
533             objects |= set(self._list_public_object_properties(
534                 user, account, container, prefix, all_props))
535             objects = list(objects)
536
537             objects.sort(key=lambda x: x[0])
538             start, limit = self._list_limits(
539                 [x[0] for x in objects], marker, limit)
540             return objects[start:start + limit]
541         elif public:
542             objects = self._list_public_object_properties(
543                 user, account, container, prefix, all_props)
544             start, limit = self._list_limits(
545                 [x[0] for x in objects], marker, limit)
546             return objects[start:start + limit]
547
548         allowed = self._list_object_permissions(
549             user, account, container, prefix, shared, public)
550         if shared and not allowed:
551             return []
552         path, node = self._lookup_container(account, container)
553         allowed = self._get_formatted_paths(allowed)
554         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
555         start, limit = self._list_limits(
556             [x[0] for x in objects], marker, limit)
557         return objects[start:start + limit]
558
559     def _list_public_object_properties(self, user, account, container, prefix, all_props):
560         public = self._list_object_permissions(
561             user, account, container, prefix, shared=False, public=True)
562         paths, nodes = self._lookup_objects(public)
563         path = '/'.join((account, container))
564         cont_prefix = path + '/'
565         paths = [x[len(cont_prefix):] for x in paths]
566         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
567         objects = [(path,) + props for path, props in zip(paths, props)]
568         return objects
569
570     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
571         objects = []
572         while True:
573             marker = objects[-1] if objects else None
574             limit = 10000
575             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
576             objects.extend(l)
577             if not l or len(l) < limit:
578                 break
579         return objects
580
581     def _list_object_permissions(self, user, account, container, prefix, shared, public):
582         allowed = []
583         path = '/'.join((account, container, prefix)).rstrip('/')
584         if user != account:
585             allowed = self.permissions.access_list_paths(user, path)
586             if not allowed:
587                 raise NotAllowedError
588         else:
589             allowed = set()
590             if shared:
591                 allowed.update(self.permissions.access_list_shared(path))
592             if public:
593                 allowed.update(
594                     [x[0] for x in self.permissions.public_list(path)])
595             allowed = sorted(allowed)
596             if not allowed:
597                 return []
598         return allowed
599
600     @backend_method
601     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
602         """Return a list of object (name, version_id) tuples existing under a container."""
603
604         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
605         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
606
607     @backend_method
608     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
609         """Return a list of object metadata dicts existing under a container."""
610
611         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
612         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
613         objects = []
614         for p in props:
615             if len(p) == 2:
616                 objects.append({'subdir': p[0]})
617             else:
618                 objects.append({'name': p[0],
619                                 'bytes': p[self.SIZE + 1],
620                                 'type': p[self.TYPE + 1],
621                                 'hash': p[self.HASH + 1],
622                                 'version': p[self.SERIAL + 1],
623                                 'version_timestamp': p[self.MTIME + 1],
624                                 'modified': p[self.MTIME + 1] if until is None else None,
625                                 'modified_by': p[self.MUSER + 1],
626                                 'uuid': p[self.UUID + 1],
627                                 'checksum': p[self.CHECKSUM + 1]})
628         return objects
629
630     @backend_method
631     def list_object_permissions(self, user, account, container, prefix=''):
632         """Return a list of paths that enforce permissions under a container."""
633
634         logger.debug("list_object_permissions: %s %s %s %s", user,
635                      account, container, prefix)
636         return self._list_object_permissions(user, account, container, prefix, True, False)
637
638     @backend_method
639     def list_object_public(self, user, account, container, prefix=''):
640         """Return a dict mapping paths to public ids for objects that are public under a container."""
641
642         logger.debug("list_object_public: %s %s %s %s", user,
643                      account, container, prefix)
644         public = {}
645         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
646             public[path] = p + ULTIMATE_ANSWER
647         return public
648
649     @backend_method
650     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
651         """Return a dictionary with the object metadata for the domain."""
652
653         logger.debug("get_object_meta: %s %s %s %s %s %s", user,
654                      account, container, name, domain, version)
655         self._can_read(user, account, container, name)
656         path, node = self._lookup_object(account, container, name)
657         props = self._get_version(node, version)
658         if version is None:
659             modified = props[self.MTIME]
660         else:
661             try:
662                 modified = self._get_version(
663                     node)[self.MTIME]  # Overall last modification.
664             except NameError:  # Object may be deleted.
665                 del_props = self.node.version_lookup(
666                     node, inf, CLUSTER_DELETED)
667                 if del_props is None:
668                     raise ItemNotExists('Object does not exist')
669                 modified = del_props[self.MTIME]
670
671         meta = {}
672         if include_user_defined:
673             meta.update(
674                 dict(self.node.attribute_get(props[self.SERIAL], domain)))
675         meta.update({'name': name,
676                      'bytes': props[self.SIZE],
677                      'type': props[self.TYPE],
678                      'hash': props[self.HASH],
679                      'version': props[self.SERIAL],
680                      'version_timestamp': props[self.MTIME],
681                      'modified': modified,
682                      'modified_by': props[self.MUSER],
683                      'uuid': props[self.UUID],
684                      'checksum': props[self.CHECKSUM]})
685         return meta
686
687     @backend_method
688     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
689         """Update the metadata associated with the object for the domain and return the new version."""
690
691         logger.debug("update_object_meta: %s %s %s %s %s %s %s",
692                      user, account, container, name, domain, meta, replace)
693         self._can_write(user, account, container, name)
694         path, node = self._lookup_object(account, container, name)
695         src_version_id, dest_version_id = self._put_metadata(
696             user, node, domain, meta, replace)
697         self._apply_versioning(account, container, src_version_id)
698         return dest_version_id
699
700     @backend_method
701     def get_object_permissions(self, user, account, container, name):
702         """Return the action allowed on the object, the path
703         from which the object gets its permissions from,
704         along with a dictionary containing the permissions."""
705
706         logger.debug("get_object_permissions: %s %s %s %s", user,
707                      account, container, name)
708         allowed = 'write'
709         permissions_path = self._get_permissions_path(account, container, name)
710         if user != account:
711             if self.permissions.access_check(permissions_path, self.WRITE, user):
712                 allowed = 'write'
713             elif self.permissions.access_check(permissions_path, self.READ, user):
714                 allowed = 'read'
715             else:
716                 raise NotAllowedError
717         self._lookup_object(account, container, name)
718         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
719
720     @backend_method
721     def update_object_permissions(self, user, account, container, name, permissions):
722         """Update the permissions associated with the object."""
723
724         logger.debug("update_object_permissions: %s %s %s %s %s",
725                      user, account, container, name, permissions)
726         if user != account:
727             raise NotAllowedError
728         path = self._lookup_object(account, container, name)[0]
729         self._check_permissions(path, permissions)
730         self.permissions.access_set(path, permissions)
731         self._report_sharing_change(user, account, path, {'members':
732                                     self.permissions.access_members(path)})
733
734     @backend_method
735     def get_object_public(self, user, account, container, name):
736         """Return the public id of the object if applicable."""
737
738         logger.debug(
739             "get_object_public: %s %s %s %s", user, account, container, name)
740         self._can_read(user, account, container, name)
741         path = self._lookup_object(account, container, name)[0]
742         p = self.permissions.public_get(path)
743         if p is not None:
744             p += ULTIMATE_ANSWER
745         return p
746
747     @backend_method
748     def update_object_public(self, user, account, container, name, public):
749         """Update the public status of the object."""
750
751         logger.debug("update_object_public: %s %s %s %s %s", user,
752                      account, container, name, public)
753         self._can_write(user, account, container, name)
754         path = self._lookup_object(account, container, name)[0]
755         if not public:
756             self.permissions.public_unset(path)
757         else:
758             self.permissions.public_set(path)
759
760     @backend_method
761     def get_object_hashmap(self, user, account, container, name, version=None):
762         """Return the object's size and a list with partial hashes."""
763
764         logger.debug("get_object_hashmap: %s %s %s %s %s", user,
765                      account, container, name, version)
766         self._can_read(user, account, container, name)
767         path, node = self._lookup_object(account, container, name)
768         props = self._get_version(node, version)
769         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
770         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
771
772     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
773         if permissions is not None and user != account:
774             raise NotAllowedError
775         self._can_write(user, account, container, name)
776         if permissions is not None:
777             path = '/'.join((account, container, name))
778             self._check_permissions(path, permissions)
779
780         account_path, account_node = self._lookup_account(account, True)
781         container_path, container_node = self._lookup_container(
782             account, container)
783         path, node = self._put_object_node(
784             container_path, container_node, name)
785         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
786
787         # Handle meta.
788         if src_version_id is None:
789             src_version_id = pre_version_id
790         self._put_metadata_duplicate(
791             src_version_id, dest_version_id, domain, meta, replace_meta)
792
793         # Check quota.
794         del_size = self._apply_versioning(account, container, pre_version_id)
795         size_delta = size - del_size
796         if size_delta > 0:
797             account_quota = long(self._get_policy(account_node)['quota'])
798             container_quota = long(self._get_policy(container_node)['quota'])
799             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
800                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
801                 # This must be executed in a transaction, so the version is never created if it fails.
802                 raise QuotaError
803         self._report_size_change(user, account, size_delta, {
804                                  'action': 'object update', 'path': path})
805
806         if permissions is not None:
807             self.permissions.access_set(path, permissions)
808             self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
809
810         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
811         return dest_version_id
812
813     @backend_method
814     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
815         """Create/update an object with the specified size and partial hashes."""
816
817         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
818                      account, container, name, size, type, hashmap, checksum)
819         if size == 0:  # No such thing as an empty hashmap.
820             hashmap = [self.put_block('')]
821         map = HashMap(self.block_size, self.hash_algorithm)
822         map.extend([binascii.unhexlify(x) for x in hashmap])
823         missing = self.store.block_search(map)
824         if missing:
825             ie = IndexError()
826             ie.data = [binascii.hexlify(x) for x in missing]
827             raise ie
828
829         hash = map.hash()
830         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
831         self.store.map_put(hash, map)
832         return dest_version_id
833
834     @backend_method
835     def update_object_checksum(self, user, account, container, name, version, checksum):
836         """Update an object's checksum."""
837
838         logger.debug("update_object_checksum: %s %s %s %s %s %s",
839                      user, account, container, name, version, checksum)
840         # Update objects with greater version and same hashmap and size (fix metadata updates).
841         self._can_write(user, account, container, name)
842         path, node = self._lookup_object(account, container, name)
843         props = self._get_version(node, version)
844         versions = self.node.node_get_versions(node)
845         for x in versions:
846             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
847                 self.node.version_put_property(
848                     x[self.SERIAL], 'checksum', checksum)
849
850     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
851         dest_version_ids = []
852         self._can_read(user, src_account, src_container, src_name)
853         path, node = self._lookup_object(src_account, src_container, src_name)
854         # TODO: Will do another fetch of the properties in duplicate version...
855         props = self._get_version(
856             node, src_version)  # Check to see if source exists.
857         src_version_id = props[self.SERIAL]
858         hash = props[self.HASH]
859         size = props[self.SIZE]
860         is_copy = not is_move and (src_account, src_container, src_name) != (
861             dest_account, dest_container, dest_name)  # New uuid.
862         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
863         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
864             self._delete_object(user, src_account, src_container, src_name)
865
866         if delimiter:
867             prefix = src_name + \
868                 delimiter if not src_name.endswith(delimiter) else src_name
869             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
870             src_names.sort(key=lambda x: x[2])  # order by nodes
871             paths = [elem[0] for elem in src_names]
872             nodes = [elem[2] for elem in src_names]
873             # TODO: Will do another fetch of the properties in duplicate version...
874             props = self._get_versions(nodes)  # Check to see if source exists.
875
876             for prop, path, node in zip(props, paths, nodes):
877                 src_version_id = prop[self.SERIAL]
878                 hash = prop[self.HASH]
879                 vtype = prop[self.TYPE]
880                 size = prop[self.SIZE]
881                 dest_prefix = dest_name + delimiter if not dest_name.endswith(
882                     delimiter) else dest_name
883                 vdest_name = path.replace(prefix, dest_prefix, 1)
884                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
885                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
886                     self._delete_object(user, src_account, src_container, path)
887         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
888
889     @backend_method
890     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
891         """Copy an object's data and metadata."""
892
893         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
894         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
895         return dest_version_id
896
897     @backend_method
898     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
899         """Move an object's data and metadata."""
900
901         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
902         if user != src_account:
903             raise NotAllowedError
904         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
905         return dest_version_id
906
907     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
908         if user != account:
909             raise NotAllowedError
910
911         if until is not None:
912             path = '/'.join((account, container, name))
913             node = self.node.node_lookup(path)
914             if node is None:
915                 return
916             hashes = []
917             size = 0
918             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
919             hashes += h
920             size += s
921             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
922             hashes += h
923             size += s
924             for h in hashes:
925                 self.store.map_delete(h)
926             self.node.node_purge(node, until, CLUSTER_DELETED)
927             try:
928                 props = self._get_version(node)
929             except NameError:
930                 self.permissions.access_clear(path)
931             self._report_size_change(user, account, -size, {
932                                      'action': 'object purge', 'path': path})
933             return
934
935         path, node = self._lookup_object(account, container, name)
936         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
937         del_size = self._apply_versioning(account, container, src_version_id)
938         if del_size:
939             self._report_size_change(user, account, -del_size, {
940                                      'action': 'object delete', 'path': path})
941         self._report_object_change(
942             user, account, path, details={'action': 'object delete'})
943         self.permissions.access_clear(path)
944
945         if delimiter:
946             prefix = name + delimiter if not name.endswith(delimiter) else name
947             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
948             paths = []
949             for t in src_names:
950                 path = '/'.join((account, container, t[0]))
951                 node = t[2]
952                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
953                 del_size = self._apply_versioning(
954                     account, container, src_version_id)
955                 if del_size:
956                     self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
957                 self._report_object_change(
958                     user, account, path, details={'action': 'object delete'})
959                 paths.append(path)
960             self.permissions.access_clear_bulk(paths)
961
962     @backend_method
963     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
964         """Delete/purge an object."""
965
966         logger.debug("delete_object: %s %s %s %s %s %s %s", user,
967                      account, container, name, until, prefix, delimiter)
968         self._delete_object(user, account, container, name, until, delimiter)
969
970     @backend_method
971     def list_versions(self, user, account, container, name):
972         """Return a list of all (version, version_timestamp) tuples for an object."""
973
974         logger.debug(
975             "list_versions: %s %s %s %s", user, account, container, name)
976         self._can_read(user, account, container, name)
977         path, node = self._lookup_object(account, container, name)
978         versions = self.node.node_get_versions(node)
979         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
980
981     @backend_method
982     def get_uuid(self, user, uuid):
983         """Return the (account, container, name) for the UUID given."""
984
985         logger.debug("get_uuid: %s %s", user, uuid)
986         info = self.node.latest_uuid(uuid)
987         if info is None:
988             raise NameError
989         path, serial = info
990         account, container, name = path.split('/', 2)
991         self._can_read(user, account, container, name)
992         return (account, container, name)
993
994     @backend_method
995     def get_public(self, user, public):
996         """Return the (account, container, name) for the public id given."""
997
998         logger.debug("get_public: %s %s", user, public)
999         if public is None or public < ULTIMATE_ANSWER:
1000             raise NameError
1001         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1002         if path is None:
1003             raise NameError
1004         account, container, name = path.split('/', 2)
1005         self._can_read(user, account, container, name)
1006         return (account, container, name)
1007
1008     @backend_method(autocommit=0)
1009     def get_block(self, hash):
1010         """Return a block's data."""
1011
1012         logger.debug("get_block: %s", hash)
1013         block = self.store.block_get(binascii.unhexlify(hash))
1014         if not block:
1015             raise ItemNotExists('Block does not exist')
1016         return block
1017
1018     @backend_method(autocommit=0)
1019     def put_block(self, data):
1020         """Store a block and return the hash."""
1021
1022         logger.debug("put_block: %s", len(data))
1023         return binascii.hexlify(self.store.block_put(data))
1024
1025     @backend_method(autocommit=0)
1026     def update_block(self, hash, data, offset=0):
1027         """Update a known block and return the hash."""
1028
1029         logger.debug("update_block: %s %s %s", hash, len(data), offset)
1030         if offset == 0 and len(data) == self.block_size:
1031             return self.put_block(data)
1032         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1033         return binascii.hexlify(h)
1034
1035     # Path functions.
1036
1037     def _generate_uuid(self):
1038         return str(uuidlib.uuid4())
1039
1040     def _put_object_node(self, path, parent, name):
1041         path = '/'.join((path, name))
1042         node = self.node.node_lookup(path)
1043         if node is None:
1044             node = self.node.node_create(parent, path)
1045         return path, node
1046
1047     def _put_path(self, user, parent, path):
1048         node = self.node.node_create(parent, path)
1049         self.node.version_create(node, None, 0, '', None, user,
1050                                  self._generate_uuid(), '', CLUSTER_NORMAL)
1051         return node
1052
1053     def _lookup_account(self, account, create=True):
1054         node = self.node.node_lookup(account)
1055         if node is None and create:
1056             node = self._put_path(
1057                 account, self.ROOTNODE, account)  # User is account.
1058         return account, node
1059
1060     def _lookup_container(self, account, container):
1061         path = '/'.join((account, container))
1062         node = self.node.node_lookup(path)
1063         if node is None:
1064             raise ItemNotExists('Container does not exist')
1065         return path, node
1066
1067     def _lookup_object(self, account, container, name):
1068         path = '/'.join((account, container, name))
1069         node = self.node.node_lookup(path)
1070         if node is None:
1071             raise ItemNotExists('Object does not exist')
1072         return path, node
1073
1074     def _lookup_objects(self, paths):
1075         nodes = self.node.node_lookup_bulk(paths)
1076         return paths, nodes
1077
1078     def _get_properties(self, node, until=None):
1079         """Return properties until the timestamp given."""
1080
1081         before = until if until is not None else inf
1082         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1083         if props is None and until is not None:
1084             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1085         if props is None:
1086             raise ItemNotExists('Path does not exist')
1087         return props
1088
1089     def _get_statistics(self, node, until=None):
1090         """Return count, sum of size and latest timestamp of everything under node."""
1091
1092         if until is None:
1093             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1094         else:
1095             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1096         if stats is None:
1097             stats = (0, 0, 0)
1098         return stats
1099
1100     def _get_version(self, node, version=None):
1101         if version is None:
1102             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1103             if props is None:
1104                 raise ItemNotExists('Object does not exist')
1105         else:
1106             try:
1107                 version = int(version)
1108             except ValueError:
1109                 raise VersionNotExists('Version does not exist')
1110             props = self.node.version_get_properties(version)
1111             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1112                 raise VersionNotExists('Version does not exist')
1113         return props
1114
1115     def _get_versions(self, nodes):
1116         return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1117
1118     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1119         """Create a new version of the node."""
1120
1121         props = self.node.version_lookup(
1122             node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1123         if props is not None:
1124             src_version_id = props[self.SERIAL]
1125             src_hash = props[self.HASH]
1126             src_size = props[self.SIZE]
1127             src_type = props[self.TYPE]
1128             src_checksum = props[self.CHECKSUM]
1129         else:
1130             src_version_id = None
1131             src_hash = None
1132             src_size = 0
1133             src_type = ''
1134             src_checksum = ''
1135         if size is None:  # Set metadata.
1136             hash = src_hash  # This way hash can be set to None (account or container).
1137             size = src_size
1138         if type is None:
1139             type = src_type
1140         if checksum is None:
1141             checksum = src_checksum
1142         uuid = self._generate_uuid(
1143         ) if (is_copy or src_version_id is None) else props[self.UUID]
1144
1145         if src_node is None:
1146             pre_version_id = src_version_id
1147         else:
1148             pre_version_id = None
1149             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1150             if props is not None:
1151                 pre_version_id = props[self.SERIAL]
1152         if pre_version_id is not None:
1153             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1154
1155         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1156         return pre_version_id, dest_version_id
1157
1158     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1159         if src_version_id is not None:
1160             self.node.attribute_copy(src_version_id, dest_version_id)
1161         if not replace:
1162             self.node.attribute_del(dest_version_id, domain, (
1163                 k for k, v in meta.iteritems() if v == ''))
1164             self.node.attribute_set(dest_version_id, domain, (
1165                 (k, v) for k, v in meta.iteritems() if v != ''))
1166         else:
1167             self.node.attribute_del(dest_version_id, domain)
1168             self.node.attribute_set(dest_version_id, domain, ((
1169                 k, v) for k, v in meta.iteritems()))
1170
1171     def _put_metadata(self, user, node, domain, meta, replace=False):
1172         """Create a new version and store metadata."""
1173
1174         src_version_id, dest_version_id = self._put_version_duplicate(
1175             user, node)
1176         self._put_metadata_duplicate(
1177             src_version_id, dest_version_id, domain, meta, replace)
1178         return src_version_id, dest_version_id
1179
1180     def _list_limits(self, listing, marker, limit):
1181         start = 0
1182         if marker:
1183             try:
1184                 start = listing.index(marker) + 1
1185             except ValueError:
1186                 pass
1187         if not limit or limit > 10000:
1188             limit = 10000
1189         return start, limit
1190
1191     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1192         cont_prefix = path + '/'
1193         prefix = cont_prefix + prefix
1194         start = cont_prefix + marker if marker else None
1195         before = until if until is not None else inf
1196         filterq = keys if domain else []
1197         sizeq = size_range
1198
1199         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1200         objects.extend([(p, None) for p in prefixes] if virtual else [])
1201         objects.sort(key=lambda x: x[0])
1202         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1203         return objects
1204
1205     # Reporting functions.
1206
1207     def _report_size_change(self, user, account, size, details={}):
1208         account_node = self._lookup_account(account, True)[1]
1209         total = self._get_statistics(account_node)[1]
1210         details.update({'user': user, 'total': total})
1211         logger.debug(
1212             "_report_size_change: %s %s %s %s", user, account, size, details)
1213         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
1214                                                   account,
1215                                                   QUEUE_INSTANCE_ID,
1216                                                   'diskspace',
1217                                                   float(size),
1218                                                   details))
1219
1220     def _report_object_change(self, user, account, path, details={}):
1221         details.update({'user': user})
1222         logger.debug("_report_object_change: %s %s %s %s", user,
1223                      account, path, details)
1224         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
1225             'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1226
1227     def _report_sharing_change(self, user, account, path, details={}):
1228         logger.debug("_report_permissions_change: %s %s %s %s",
1229                      user, account, path, details)
1230         details.update({'user': user})
1231         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
1232                              QUEUE_INSTANCE_ID, 'sharing', path, details))
1233
1234     # Policy functions.
1235
1236     def _check_policy(self, policy):
1237         for k in policy.keys():
1238             if policy[k] == '':
1239                 policy[k] = self.default_policy.get(k)
1240         for k, v in policy.iteritems():
1241             if k == 'quota':
1242                 q = int(v)  # May raise ValueError.
1243                 if q < 0:
1244                     raise ValueError
1245             elif k == 'versioning':
1246                 if v not in ['auto', 'none']:
1247                     raise ValueError
1248             else:
1249                 raise ValueError
1250
1251     def _put_policy(self, node, policy, replace):
1252         if replace:
1253             for k, v in self.default_policy.iteritems():
1254                 if k not in policy:
1255                     policy[k] = v
1256         self.node.policy_set(node, policy)
1257
1258     def _get_policy(self, node):
1259         policy = self.default_policy.copy()
1260         policy.update(self.node.policy_get(node))
1261         return policy
1262
1263     def _apply_versioning(self, account, container, version_id):
1264         """Delete the provided version if such is the policy.
1265            Return size of object removed.
1266         """
1267
1268         if version_id is None:
1269             return 0
1270         path, node = self._lookup_container(account, container)
1271         versioning = self._get_policy(node)['versioning']
1272         if versioning != 'auto':
1273             hash, size = self.node.version_remove(version_id)
1274             self.store.map_delete(hash)
1275             return size
1276         return 0
1277
1278     # Access control functions.
1279
1280     def _check_groups(self, groups):
1281         # raise ValueError('Bad characters in groups')
1282         pass
1283
1284     def _check_permissions(self, path, permissions):
1285         # raise ValueError('Bad characters in permissions')
1286         pass
1287
1288     def _get_formatted_paths(self, paths):
1289         formatted = []
1290         for p in paths:
1291             node = self.node.node_lookup(p)
1292             props = None
1293             if node is not None:
1294                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1295             if props is not None:
1296                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1297                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1298                 formatted.append((p, self.MATCH_EXACT))
1299         return formatted
1300
1301     def _get_permissions_path(self, account, container, name):
1302         path = '/'.join((account, container, name))
1303         permission_paths = self.permissions.access_inherit(path)
1304         permission_paths.sort()
1305         permission_paths.reverse()
1306         for p in permission_paths:
1307             if p == path:
1308                 return p
1309             else:
1310                 if p.count('/') < 2:
1311                     continue
1312                 node = self.node.node_lookup(p)
1313                 if node is not None:
1314                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1315                 if props is not None:
1316                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1317                         return p
1318         return None
1319
1320     def _can_read(self, user, account, container, name):
1321         if user == account:
1322             return True
1323         path = '/'.join((account, container, name))
1324         if self.permissions.public_get(path) is not None:
1325             return True
1326         path = self._get_permissions_path(account, container, name)
1327         if not path:
1328             raise NotAllowedError
1329         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1330             raise NotAllowedError
1331
1332     def _can_write(self, user, account, container, name):
1333         if user == account:
1334             return True
1335         path = '/'.join((account, container, name))
1336         path = self._get_permissions_path(account, container, name)
1337         if not path:
1338             raise NotAllowedError
1339         if not self.permissions.access_check(path, self.WRITE, user):
1340             raise NotAllowedError
1341
1342     def _allowed_accounts(self, user):
1343         allow = set()
1344         for path in self.permissions.access_list_paths(user):
1345             allow.add(path.split('/', 1)[0])
1346         return sorted(allow)
1347
1348     def _allowed_containers(self, user, account):
1349         allow = set()
1350         for path in self.permissions.access_list_paths(user, account):
1351             allow.add(path.split('/', 2)[1])
1352         return sorted(allow)