Send sharing notifications.
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79
80 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81 QUEUE_CLIENT_ID = 'pithos'
82 QUEUE_INSTANCE_ID = '1'
83
84 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
85
86 inf = float('inf')
87
88 ULTIMATE_ANSWER = 42
89
90
91 logger = logging.getLogger(__name__)
92
93
94 def backend_method(func=None, autocommit=1):
95     if func is None:
96         def fn(func):
97             return backend_method(func, autocommit)
98         return fn
99
100     if not autocommit:
101         return func
102     def fn(self, *args, **kw):
103         self.wrapper.execute()
104         try:
105             self.messages = []
106             ret = func(self, *args, **kw)
107             for m in self.messages:
108                 self.queue.send(*m)
109             self.wrapper.commit()
110             return ret
111         except:
112             self.wrapper.rollback()
113             raise
114     return fn
115
116
117 class ModularBackend(BaseBackend):
118     """A modular backend.
119     
120     Uses modules for SQL functions and storage.
121     """
122     
123     def __init__(self, db_module=None, db_connection=None,
124                  block_module=None, block_path=None,
125                  queue_module=None, queue_connection=None):
126         db_module = db_module or DEFAULT_DB_MODULE
127         db_connection = db_connection or DEFAULT_DB_CONNECTION
128         block_module = block_module or DEFAULT_BLOCK_MODULE
129         block_path = block_path or DEFAULT_BLOCK_PATH
130         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
131         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
132         
133         self.hash_algorithm = 'sha256'
134         self.block_size = 4 * 1024 * 1024 # 4MB
135         
136         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
137         
138         def load_module(m):
139             __import__(m)
140             return sys.modules[m]
141         
142         self.db_module = load_module(db_module)
143         self.wrapper = self.db_module.DBWrapper(db_connection)
144         params = {'wrapper': self.wrapper}
145         self.permissions = self.db_module.Permissions(**params)
146         for x in ['READ', 'WRITE']:
147             setattr(self, x, getattr(self.db_module, x))
148         self.node = self.db_module.Node(**params)
149         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
150             setattr(self, x, getattr(self.db_module, x))
151         
152         self.block_module = load_module(block_module)
153         params = {'path': block_path,
154                   'block_size': self.block_size,
155                   'hash_algorithm': self.hash_algorithm}
156         self.store = self.block_module.Store(**params)
157
158         if queue_module and queue_connection:
159             self.queue_module = load_module(queue_module)
160             params = {'exchange': queue_connection,
161                       'client_id': QUEUE_CLIENT_ID}
162             self.queue = self.queue_module.Queue(**params)
163         else:
164             class NoQueue:
165                 def send(self, *args):
166                     pass
167                 
168                 def close(self):
169                     pass
170             
171             self.queue = NoQueue()
172     
173     def close(self):
174         self.wrapper.close()
175         self.queue.close()
176     
177     @backend_method
178     def list_accounts(self, user, marker=None, limit=10000):
179         """Return a list of accounts the user can access."""
180         
181         logger.debug("list_accounts: %s %s %s", user, marker, limit)
182         allowed = self._allowed_accounts(user)
183         start, limit = self._list_limits(allowed, marker, limit)
184         return allowed[start:start + limit]
185     
186     @backend_method
187     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
188         """Return a dictionary with the account metadata for the domain."""
189         
190         logger.debug("get_account_meta: %s %s %s", account, domain, until)
191         path, node = self._lookup_account(account, user == account)
192         if user != account:
193             if until or node is None or account not in self._allowed_accounts(user):
194                 raise NotAllowedError
195         try:
196             props = self._get_properties(node, until)
197             mtime = props[self.MTIME]
198         except NameError:
199             props = None
200             mtime = until
201         count, bytes, tstamp = self._get_statistics(node, until)
202         tstamp = max(tstamp, mtime)
203         if until is None:
204             modified = tstamp
205         else:
206             modified = self._get_statistics(node)[2] # Overall last modification.
207             modified = max(modified, mtime)
208         
209         if user != account:
210             meta = {'name': account}
211         else:
212             meta = {}
213             if props is not None and include_user_defined:
214                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
215             if until is not None:
216                 meta.update({'until_timestamp': tstamp})
217             meta.update({'name': account, 'count': count, 'bytes': bytes})
218         meta.update({'modified': modified})
219         return meta
220     
221     @backend_method
222     def update_account_meta(self, user, account, domain, meta, replace=False):
223         """Update the metadata associated with the account for the domain."""
224         
225         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
226         if user != account:
227             raise NotAllowedError
228         path, node = self._lookup_account(account, True)
229         self._put_metadata(user, node, domain, meta, replace)
230     
231     @backend_method
232     def get_account_groups(self, user, account):
233         """Return a dictionary with the user groups defined for this account."""
234         
235         logger.debug("get_account_groups: %s", account)
236         if user != account:
237             if account not in self._allowed_accounts(user):
238                 raise NotAllowedError
239             return {}
240         self._lookup_account(account, True)
241         return self.permissions.group_dict(account)
242     
243     @backend_method
244     def update_account_groups(self, user, account, groups, replace=False):
245         """Update the groups associated with the account."""
246         
247         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
248         if user != account:
249             raise NotAllowedError
250         self._lookup_account(account, True)
251         self._check_groups(groups)
252         if replace:
253             self.permissions.group_destroy(account)
254         for k, v in groups.iteritems():
255             if not replace: # If not already deleted.
256                 self.permissions.group_delete(account, k)
257             if v:
258                 self.permissions.group_addmany(account, k, v)
259     
260     @backend_method
261     def get_account_policy(self, user, account):
262         """Return a dictionary with the account policy."""
263         
264         logger.debug("get_account_policy: %s", account)
265         if user != account:
266             if account not in self._allowed_accounts(user):
267                 raise NotAllowedError
268             return {}
269         path, node = self._lookup_account(account, True)
270         return self._get_policy(node)
271     
272     @backend_method
273     def update_account_policy(self, user, account, policy, replace=False):
274         """Update the policy associated with the account."""
275         
276         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
277         if user != account:
278             raise NotAllowedError
279         path, node = self._lookup_account(account, True)
280         self._check_policy(policy)
281         self._put_policy(node, policy, replace)
282     
283     @backend_method
284     def put_account(self, user, account, policy={}):
285         """Create a new account with the given name."""
286         
287         logger.debug("put_account: %s %s", account, policy)
288         if user != account:
289             raise NotAllowedError
290         node = self.node.node_lookup(account)
291         if node is not None:
292             raise NameError('Account already exists')
293         if policy:
294             self._check_policy(policy)
295         node = self._put_path(user, self.ROOTNODE, account)
296         self._put_policy(node, policy, True)
297     
298     @backend_method
299     def delete_account(self, user, account):
300         """Delete the account with the given name."""
301         
302         logger.debug("delete_account: %s", account)
303         if user != account:
304             raise NotAllowedError
305         node = self.node.node_lookup(account)
306         if node is None:
307             return
308         if not self.node.node_remove(node):
309             raise IndexError('Account is not empty')
310         self.permissions.group_destroy(account)
311     
312     @backend_method
313     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
314         """Return a list of containers existing under an account."""
315         
316         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
317         if user != account:
318             if until or account not in self._allowed_accounts(user):
319                 raise NotAllowedError
320             allowed = self._allowed_containers(user, account)
321             start, limit = self._list_limits(allowed, marker, limit)
322             return allowed[start:start + limit]
323         if shared:
324             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
325             allowed = list(set(allowed))
326             start, limit = self._list_limits(allowed, marker, limit)
327             return allowed[start:start + limit]
328         node = self.node.node_lookup(account)
329         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
330     
331     @backend_method
332     def list_container_meta(self, user, account, container, domain, until=None):
333         """Return a list with all the container's object meta keys for the domain."""
334         
335         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
336         allowed = []
337         if user != account:
338             if until:
339                 raise NotAllowedError
340             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
341             if not allowed:
342                 raise NotAllowedError
343         path, node = self._lookup_container(account, container)
344         before = until if until is not None else inf
345         allowed = self._get_formatted_paths(allowed)
346         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
347     
348     @backend_method
349     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
350         """Return a dictionary with the container metadata for the domain."""
351         
352         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
353         if user != account:
354             if until or container not in self._allowed_containers(user, account):
355                 raise NotAllowedError
356         path, node = self._lookup_container(account, container)
357         props = self._get_properties(node, until)
358         mtime = props[self.MTIME]
359         count, bytes, tstamp = self._get_statistics(node, until)
360         tstamp = max(tstamp, mtime)
361         if until is None:
362             modified = tstamp
363         else:
364             modified = self._get_statistics(node)[2] # Overall last modification.
365             modified = max(modified, mtime)
366         
367         if user != account:
368             meta = {'name': container}
369         else:
370             meta = {}
371             if include_user_defined:
372                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
373             if until is not None:
374                 meta.update({'until_timestamp': tstamp})
375             meta.update({'name': container, 'count': count, 'bytes': bytes})
376         meta.update({'modified': modified})
377         return meta
378     
379     @backend_method
380     def update_container_meta(self, user, account, container, domain, meta, replace=False):
381         """Update the metadata associated with the container for the domain."""
382         
383         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
384         if user != account:
385             raise NotAllowedError
386         path, node = self._lookup_container(account, container)
387         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
388         if src_version_id is not None:
389             versioning = self._get_policy(node)['versioning']
390             if versioning != 'auto':
391                 self.node.version_remove(src_version_id)
392     
393     @backend_method
394     def get_container_policy(self, user, account, container):
395         """Return a dictionary with the container policy."""
396         
397         logger.debug("get_container_policy: %s %s", account, container)
398         if user != account:
399             if container not in self._allowed_containers(user, account):
400                 raise NotAllowedError
401             return {}
402         path, node = self._lookup_container(account, container)
403         return self._get_policy(node)
404     
405     @backend_method
406     def update_container_policy(self, user, account, container, policy, replace=False):
407         """Update the policy associated with the container."""
408         
409         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
410         if user != account:
411             raise NotAllowedError
412         path, node = self._lookup_container(account, container)
413         self._check_policy(policy)
414         self._put_policy(node, policy, replace)
415     
416     @backend_method
417     def put_container(self, user, account, container, policy={}):
418         """Create a new container with the given name."""
419         
420         logger.debug("put_container: %s %s %s", account, container, policy)
421         if user != account:
422             raise NotAllowedError
423         try:
424             path, node = self._lookup_container(account, container)
425         except NameError:
426             pass
427         else:
428             raise NameError('Container already exists')
429         if policy:
430             self._check_policy(policy)
431         path = '/'.join((account, container))
432         node = self._put_path(user, self._lookup_account(account, True)[1], path)
433         self._put_policy(node, policy, True)
434     
435     @backend_method
436     def delete_container(self, user, account, container, until=None):
437         """Delete/purge the container with the given name."""
438         
439         logger.debug("delete_container: %s %s %s", account, container, until)
440         if user != account:
441             raise NotAllowedError
442         path, node = self._lookup_container(account, container)
443         
444         if until is not None:
445             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
446             for h in hashes:
447                 self.store.map_delete(h)
448             self.node.node_purge_children(node, until, CLUSTER_DELETED)
449             self._report_size_change(user, account, -size, {'action': 'container purge'})
450             return
451         
452         if self._get_statistics(node)[0] > 0:
453             raise IndexError('Container is not empty')
454         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
455         for h in hashes:
456             self.store.map_delete(h)
457         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
458         self.node.node_remove(node)
459         self._report_size_change(user, account, -size, {'action': 'container delete'})
460     
461     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
462         if user != account and until:
463             raise NotAllowedError
464         allowed = self._list_object_permissions(user, account, container, prefix, shared)
465         if shared and not allowed:
466             return []
467         path, node = self._lookup_container(account, container)
468         allowed = self._get_formatted_paths(allowed)
469         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
470     
471     def _list_object_permissions(self, user, account, container, prefix, shared):
472         allowed = []
473         path = '/'.join((account, container, prefix)).rstrip('/')
474         if user != account:
475             allowed = self.permissions.access_list_paths(user, path)
476             if not allowed:
477                 raise NotAllowedError
478         else:
479             if shared:
480                 allowed = self.permissions.access_list_shared(path)
481                 if not allowed:
482                     return []
483         return allowed
484     
485     @backend_method
486     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487         """Return a list of object (name, version_id) tuples existing under a container."""
488         
489         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
491     
492     @backend_method
493     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
494         """Return a list of object metadata dicts existing under a container."""
495         
496         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
497         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
498         objects = []
499         for p in props:
500             if len(p) == 2:
501                 objects.append({'subdir': p[0]})
502             else:
503                 objects.append({'name': p[0],
504                                 'bytes': p[self.SIZE + 1],
505                                 'type': p[self.TYPE + 1],
506                                 'hash': p[self.HASH + 1],
507                                 'version': p[self.SERIAL + 1],
508                                 'version_timestamp': p[self.MTIME + 1],
509                                 'modified': p[self.MTIME + 1] if until is None else None,
510                                 'modified_by': p[self.MUSER + 1],
511                                 'uuid': p[self.UUID + 1],
512                                 'checksum': p[self.CHECKSUM + 1]})
513         return objects
514     
515     @backend_method
516     def list_object_permissions(self, user, account, container, prefix=''):
517         """Return a list of paths that enforce permissions under a container."""
518         
519         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
520         return self._list_object_permissions(user, account, container, prefix, True)
521     
522     @backend_method
523     def list_object_public(self, user, account, container, prefix=''):
524         """Return a dict mapping paths to public ids for objects that are public under a container."""
525         
526         logger.debug("list_object_public: %s %s %s", account, container, prefix)
527         public = {}
528         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
529             public[path] = p + ULTIMATE_ANSWER
530         return public
531     
532     @backend_method
533     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
534         """Return a dictionary with the object metadata for the domain."""
535         
536         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
537         self._can_read(user, account, container, name)
538         path, node = self._lookup_object(account, container, name)
539         props = self._get_version(node, version)
540         if version is None:
541             modified = props[self.MTIME]
542         else:
543             try:
544                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
545             except NameError: # Object may be deleted.
546                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
547                 if del_props is None:
548                     raise NameError('Object does not exist')
549                 modified = del_props[self.MTIME]
550         
551         meta = {}
552         if include_user_defined:
553             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
554         meta.update({'name': name,
555                      'bytes': props[self.SIZE],
556                      'type': props[self.TYPE],
557                      'hash': props[self.HASH],
558                      'version': props[self.SERIAL],
559                      'version_timestamp': props[self.MTIME],
560                      'modified': modified,
561                      'modified_by': props[self.MUSER],
562                      'uuid': props[self.UUID],
563                      'checksum': props[self.CHECKSUM]})
564         return meta
565     
566     @backend_method
567     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
568         """Update the metadata associated with the object for the domain and return the new version."""
569         
570         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
571         self._can_write(user, account, container, name)
572         path, node = self._lookup_object(account, container, name)
573         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
574         self._apply_versioning(account, container, src_version_id)
575         return dest_version_id
576     
577     @backend_method
578     def get_object_permissions(self, user, account, container, name):
579         """Return the action allowed on the object, the path
580         from which the object gets its permissions from,
581         along with a dictionary containing the permissions."""
582         
583         logger.debug("get_object_permissions: %s %s %s", account, container, name)
584         allowed = 'write'
585         permissions_path = self._get_permissions_path(account, container, name)
586         if user != account:
587             if self.permissions.access_check(permissions_path, self.WRITE, user):
588                 allowed = 'write'
589             elif self.permissions.access_check(permissions_path, self.READ, user):
590                 allowed = 'read'
591             else:
592                 raise NotAllowedError
593         self._lookup_object(account, container, name)
594         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
595     
596     @backend_method
597     def update_object_permissions(self, user, account, container, name, permissions):
598         """Update the permissions associated with the object."""
599         
600         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
601         if user != account:
602             raise NotAllowedError
603         path = self._lookup_object(account, container, name)[0]
604         self._check_permissions(path, permissions)
605         self.permissions.access_set(path, permissions)
606         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
607     
608     @backend_method
609     def get_object_public(self, user, account, container, name):
610         """Return the public id of the object if applicable."""
611         
612         logger.debug("get_object_public: %s %s %s", account, container, name)
613         self._can_read(user, account, container, name)
614         path = self._lookup_object(account, container, name)[0]
615         p = self.permissions.public_get(path)
616         if p is not None:
617             p += ULTIMATE_ANSWER
618         return p
619     
620     @backend_method
621     def update_object_public(self, user, account, container, name, public):
622         """Update the public status of the object."""
623         
624         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
625         self._can_write(user, account, container, name)
626         path = self._lookup_object(account, container, name)[0]
627         if not public:
628             self.permissions.public_unset(path)
629         else:
630             self.permissions.public_set(path)
631     
632     @backend_method
633     def get_object_hashmap(self, user, account, container, name, version=None):
634         """Return the object's size and a list with partial hashes."""
635         
636         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
637         self._can_read(user, account, container, name)
638         path, node = self._lookup_object(account, container, name)
639         props = self._get_version(node, version)
640         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
641         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
642     
643     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
644         if permissions is not None and user != account:
645             raise NotAllowedError
646         self._can_write(user, account, container, name)
647         if permissions is not None:
648             path = '/'.join((account, container, name))
649             self._check_permissions(path, permissions)
650         
651         account_path, account_node = self._lookup_account(account, True)
652         container_path, container_node = self._lookup_container(account, container)
653         path, node = self._put_object_node(container_path, container_node, name)
654         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
655         
656         # Handle meta.
657         if src_version_id is None:
658             src_version_id = pre_version_id
659         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
660         
661         # Check quota.
662         del_size = self._apply_versioning(account, container, pre_version_id)
663         size_delta = size - del_size
664         if size_delta > 0:
665             account_quota = long(self._get_policy(account_node)['quota'])
666             container_quota = long(self._get_policy(container_node)['quota'])
667             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
668                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
669                 # This must be executed in a transaction, so the version is never created if it fails.
670                 raise QuotaError
671         self._report_size_change(user, account, size_delta, {'action': 'object update'})
672         
673         if permissions is not None:
674             self.permissions.access_set(path, permissions)
675             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
676         
677         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
678         return dest_version_id
679     
680     @backend_method
681     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
682         """Create/update an object with the specified size and partial hashes."""
683         
684         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
685         if size == 0: # No such thing as an empty hashmap.
686             hashmap = [self.put_block('')]
687         map = HashMap(self.block_size, self.hash_algorithm)
688         map.extend([binascii.unhexlify(x) for x in hashmap])
689         missing = self.store.block_search(map)
690         if missing:
691             ie = IndexError()
692             ie.data = [binascii.hexlify(x) for x in missing]
693             raise ie
694         
695         hash = map.hash()
696         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
697         self.store.map_put(hash, map)
698         return dest_version_id
699     
700     @backend_method
701     def update_object_checksum(self, user, account, container, name, version, checksum):
702         """Update an object's checksum."""
703         
704         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
705         # Update objects with greater version and same hashmap and size (fix metadata updates).
706         self._can_write(user, account, container, name)
707         path, node = self._lookup_object(account, container, name)
708         props = self._get_version(node, version)
709         versions = self.node.node_get_versions(node)
710         for x in versions:
711             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
712                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
713     
714     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
715         self._can_read(user, src_account, src_container, src_name)
716         path, node = self._lookup_object(src_account, src_container, src_name)
717         # TODO: Will do another fetch of the properties in duplicate version...
718         props = self._get_version(node, src_version) # Check to see if source exists.
719         src_version_id = props[self.SERIAL]
720         hash = props[self.HASH]
721         size = props[self.SIZE]
722         
723         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
724         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
725         return dest_version_id
726     
727     @backend_method
728     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
729         """Copy an object's data and metadata."""
730         
731         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
732         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
733         return dest_version_id
734     
735     @backend_method
736     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
737         """Move an object's data and metadata."""
738         
739         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
740         if user != src_account:
741             raise NotAllowedError
742         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
743         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
744             self._delete_object(user, src_account, src_container, src_name)
745         return dest_version_id
746     
747     def _delete_object(self, user, account, container, name, until=None):
748         if user != account:
749             raise NotAllowedError
750         
751         if until is not None:
752             path = '/'.join((account, container, name))
753             node = self.node.node_lookup(path)
754             if node is None:
755                 return
756             hashes = []
757             size = 0
758             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
759             hashes += h
760             size += s
761             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
762             hashes += h
763             size += s
764             for h in hashes:
765                 self.store.map_delete(h)
766             self.node.node_purge(node, until, CLUSTER_DELETED)
767             try:
768                 props = self._get_version(node)
769             except NameError:
770                 self.permissions.access_clear(path)
771             self._report_size_change(user, account, -size, {'action': 'object purge'})
772             return
773         
774         path, node = self._lookup_object(account, container, name)
775         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
776         del_size = self._apply_versioning(account, container, src_version_id)
777         if del_size:
778             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
779         self._report_object_change(user, account, path, details={'action': 'object delete'})
780         self.permissions.access_clear(path)
781     
782     @backend_method
783     def delete_object(self, user, account, container, name, until=None):
784         """Delete/purge an object."""
785         
786         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
787         self._delete_object(user, account, container, name, until)
788     
789     @backend_method
790     def list_versions(self, user, account, container, name):
791         """Return a list of all (version, version_timestamp) tuples for an object."""
792         
793         logger.debug("list_versions: %s %s %s", account, container, name)
794         self._can_read(user, account, container, name)
795         path, node = self._lookup_object(account, container, name)
796         versions = self.node.node_get_versions(node)
797         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
798     
799     @backend_method
800     def get_uuid(self, user, uuid):
801         """Return the (account, container, name) for the UUID given."""
802         
803         logger.debug("get_uuid: %s", uuid)
804         info = self.node.latest_uuid(uuid)
805         if info is None:
806             raise NameError
807         path, serial = info
808         account, container, name = path.split('/', 2)
809         self._can_read(user, account, container, name)
810         return (account, container, name)
811     
812     @backend_method
813     def get_public(self, user, public):
814         """Return the (account, container, name) for the public id given."""
815         
816         logger.debug("get_public: %s", public)
817         if public is None or public < ULTIMATE_ANSWER:
818             raise NameError
819         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
820         if path is None:
821             raise NameError
822         account, container, name = path.split('/', 2)
823         self._can_read(user, account, container, name)
824         return (account, container, name)
825     
826     @backend_method(autocommit=0)
827     def get_block(self, hash):
828         """Return a block's data."""
829         
830         logger.debug("get_block: %s", hash)
831         block = self.store.block_get(binascii.unhexlify(hash))
832         if not block:
833             raise NameError('Block does not exist')
834         return block
835     
836     @backend_method(autocommit=0)
837     def put_block(self, data):
838         """Store a block and return the hash."""
839         
840         logger.debug("put_block: %s", len(data))
841         return binascii.hexlify(self.store.block_put(data))
842     
843     @backend_method(autocommit=0)
844     def update_block(self, hash, data, offset=0):
845         """Update a known block and return the hash."""
846         
847         logger.debug("update_block: %s %s %s", hash, len(data), offset)
848         if offset == 0 and len(data) == self.block_size:
849             return self.put_block(data)
850         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
851         return binascii.hexlify(h)
852     
853     # Path functions.
854     
855     def _generate_uuid(self):
856         return str(uuidlib.uuid4())
857     
858     def _put_object_node(self, path, parent, name):
859         path = '/'.join((path, name))
860         node = self.node.node_lookup(path)
861         if node is None:
862             node = self.node.node_create(parent, path)
863         return path, node
864     
865     def _put_path(self, user, parent, path):
866         node = self.node.node_create(parent, path)
867         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
868         return node
869     
870     def _lookup_account(self, account, create=True):
871         node = self.node.node_lookup(account)
872         if node is None and create:
873             node = self._put_path(account, self.ROOTNODE, account) # User is account.
874         return account, node
875     
876     def _lookup_container(self, account, container):
877         path = '/'.join((account, container))
878         node = self.node.node_lookup(path)
879         if node is None:
880             raise NameError('Container does not exist')
881         return path, node
882     
883     def _lookup_object(self, account, container, name):
884         path = '/'.join((account, container, name))
885         node = self.node.node_lookup(path)
886         if node is None:
887             raise NameError('Object does not exist')
888         return path, node
889     
890     def _get_properties(self, node, until=None):
891         """Return properties until the timestamp given."""
892         
893         before = until if until is not None else inf
894         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
895         if props is None and until is not None:
896             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
897         if props is None:
898             raise NameError('Path does not exist')
899         return props
900     
901     def _get_statistics(self, node, until=None):
902         """Return count, sum of size and latest timestamp of everything under node."""
903         
904         if until is None:
905             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
906         else:
907             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
908         if stats is None:
909             stats = (0, 0, 0)
910         return stats
911     
912     def _get_version(self, node, version=None):
913         if version is None:
914             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
915             if props is None:
916                 raise NameError('Object does not exist')
917         else:
918             try:
919                 version = int(version)
920             except ValueError:
921                 raise IndexError('Version does not exist')
922             props = self.node.version_get_properties(version)
923             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
924                 raise IndexError('Version does not exist')
925         return props
926     
927     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
928         """Create a new version of the node."""
929         
930         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
931         if props is not None:
932             src_version_id = props[self.SERIAL]
933             src_hash = props[self.HASH]
934             src_size = props[self.SIZE]
935             src_type = props[self.TYPE]
936             src_checksum = props[self.CHECKSUM]
937         else:
938             src_version_id = None
939             src_hash = None
940             src_size = 0
941             src_type = ''
942             src_checksum = ''
943         if size is None: # Set metadata.
944             hash = src_hash # This way hash can be set to None (account or container).
945             size = src_size
946         if type is None:
947             type = src_type
948         if checksum is None:
949             checksum = src_checksum
950         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
951         
952         if src_node is None:
953             pre_version_id = src_version_id
954         else:
955             pre_version_id = None
956             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
957             if props is not None:
958                 pre_version_id = props[self.SERIAL]
959         if pre_version_id is not None:
960             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
961         
962         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
963         return pre_version_id, dest_version_id
964     
965     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
966         if src_version_id is not None:
967             self.node.attribute_copy(src_version_id, dest_version_id)
968         if not replace:
969             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
970             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
971         else:
972             self.node.attribute_del(dest_version_id, domain)
973             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
974     
975     def _put_metadata(self, user, node, domain, meta, replace=False):
976         """Create a new version and store metadata."""
977         
978         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
979         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
980         return src_version_id, dest_version_id
981     
982     def _list_limits(self, listing, marker, limit):
983         start = 0
984         if marker:
985             try:
986                 start = listing.index(marker) + 1
987             except ValueError:
988                 pass
989         if not limit or limit > 10000:
990             limit = 10000
991         return start, limit
992     
993     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
994         cont_prefix = path + '/'
995         prefix = cont_prefix + prefix
996         start = cont_prefix + marker if marker else None
997         before = until if until is not None else inf
998         filterq = keys if domain else []
999         sizeq = size_range
1000         
1001         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1002         objects.extend([(p, None) for p in prefixes] if virtual else [])
1003         objects.sort(key=lambda x: x[0])
1004         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1005         
1006         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1007         return objects[start:start + limit]
1008     
1009     # Reporting functions.
1010     
1011     def _report_size_change(self, user, account, size, details={}):
1012         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1013         account_node = self._lookup_account(account, True)[1]
1014         total = self._get_statistics(account_node)[1]
1015         details.update({'user': user, 'total': total})
1016         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1017     
1018     def _report_object_change(self, user, account, path, details={}):
1019         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1020         details.update({'user': user})
1021         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1022     
1023     def _report_sharing_change(self, user, account, path, details={}):
1024         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1025         details.update({'user': user})
1026         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1027     
1028     # Policy functions.
1029     
1030     def _check_policy(self, policy):
1031         for k in policy.keys():
1032             if policy[k] == '':
1033                 policy[k] = self.default_policy.get(k)
1034         for k, v in policy.iteritems():
1035             if k == 'quota':
1036                 q = int(v) # May raise ValueError.
1037                 if q < 0:
1038                     raise ValueError
1039             elif k == 'versioning':
1040                 if v not in ['auto', 'none']:
1041                     raise ValueError
1042             else:
1043                 raise ValueError
1044     
1045     def _put_policy(self, node, policy, replace):
1046         if replace:
1047             for k, v in self.default_policy.iteritems():
1048                 if k not in policy:
1049                     policy[k] = v
1050         self.node.policy_set(node, policy)
1051     
1052     def _get_policy(self, node):
1053         policy = self.default_policy.copy()
1054         policy.update(self.node.policy_get(node))
1055         return policy
1056     
1057     def _apply_versioning(self, account, container, version_id):
1058         """Delete the provided version if such is the policy.
1059            Return size of object removed.
1060         """
1061         
1062         if version_id is None:
1063             return 0
1064         path, node = self._lookup_container(account, container)
1065         versioning = self._get_policy(node)['versioning']
1066         if versioning != 'auto':
1067             hash, size = self.node.version_remove(version_id)
1068             self.store.map_delete(hash)
1069             return size
1070         return 0
1071     
1072     # Access control functions.
1073     
1074     def _check_groups(self, groups):
1075         # raise ValueError('Bad characters in groups')
1076         pass
1077     
1078     def _check_permissions(self, path, permissions):
1079         # raise ValueError('Bad characters in permissions')
1080         pass
1081     
1082     def _get_formatted_paths(self, paths):
1083         formatted = []
1084         for p in paths:
1085             node = self.node.node_lookup(p)
1086             if node is not None:
1087                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1088             if props is not None:
1089                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1090                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1091                 formatted.append((p, self.MATCH_EXACT))
1092         return formatted
1093     
1094     def _get_permissions_path(self, account, container, name):
1095         path = '/'.join((account, container, name))
1096         permission_paths = self.permissions.access_inherit(path)
1097         permission_paths.sort()
1098         permission_paths.reverse()
1099         for p in permission_paths:
1100             if p == path:
1101                 return p
1102             else:
1103                 if p.count('/') < 2:
1104                     continue
1105                 node = self.node.node_lookup(p)
1106                 if node is not None:
1107                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1108                 if props is not None:
1109                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1110                         return p
1111         return None
1112     
1113     def _can_read(self, user, account, container, name):
1114         if user == account:
1115             return True
1116         path = '/'.join((account, container, name))
1117         if self.permissions.public_get(path) is not None:
1118             return True
1119         path = self._get_permissions_path(account, container, name)
1120         if not path:
1121             raise NotAllowedError
1122         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1123             raise NotAllowedError
1124     
1125     def _can_write(self, user, account, container, name):
1126         if user == account:
1127             return True
1128         path = '/'.join((account, container, name))
1129         path = self._get_permissions_path(account, container, name)
1130         if not path:
1131             raise NotAllowedError
1132         if not self.permissions.access_check(path, self.WRITE, user):
1133             raise NotAllowedError
1134     
1135     def _allowed_accounts(self, user):
1136         allow = set()
1137         for path in self.permissions.access_list_paths(user):
1138             allow.add(path.split('/', 1)[0])
1139         return sorted(allow)
1140     
1141     def _allowed_containers(self, user, account):
1142         allow = set()
1143         for path in self.permissions.access_list_paths(user, account):
1144             allow.add(path.split('/', 2)[1])
1145         return sorted(allow)