change API calls to include public objects in shared by me
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s", account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s", account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s", account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s", account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s", account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared or public:
327             allowed = []
328             if shared:
329                 allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330             if public:
331                 allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332             allowed = list(set(allowed))
333             start, limit = self._list_limits(allowed, marker, limit)
334             return allowed[start:start + limit]
335         node = self.node.node_lookup(account)
336         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
337     
338     @backend_method
339     def list_container_meta(self, user, account, container, domain, until=None):
340         """Return a list with all the container's object meta keys for the domain."""
341         
342         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
343         allowed = []
344         if user != account:
345             if until:
346                 raise NotAllowedError
347             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
348             if not allowed:
349                 raise NotAllowedError
350         path, node = self._lookup_container(account, container)
351         before = until if until is not None else inf
352         allowed = self._get_formatted_paths(allowed)
353         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
354     
355     @backend_method
356     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
357         """Return a dictionary with the container metadata for the domain."""
358         
359         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
360         if user != account:
361             if until or container not in self._allowed_containers(user, account):
362                 raise NotAllowedError
363         path, node = self._lookup_container(account, container)
364         props = self._get_properties(node, until)
365         mtime = props[self.MTIME]
366         count, bytes, tstamp = self._get_statistics(node, until)
367         tstamp = max(tstamp, mtime)
368         if until is None:
369             modified = tstamp
370         else:
371             modified = self._get_statistics(node)[2] # Overall last modification.
372             modified = max(modified, mtime)
373         
374         if user != account:
375             meta = {'name': container}
376         else:
377             meta = {}
378             if include_user_defined:
379                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
380             if until is not None:
381                 meta.update({'until_timestamp': tstamp})
382             meta.update({'name': container, 'count': count, 'bytes': bytes})
383         meta.update({'modified': modified})
384         return meta
385     
386     @backend_method
387     def update_container_meta(self, user, account, container, domain, meta, replace=False):
388         """Update the metadata associated with the container for the domain."""
389         
390         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
391         if user != account:
392             raise NotAllowedError
393         path, node = self._lookup_container(account, container)
394         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
395         if src_version_id is not None:
396             versioning = self._get_policy(node)['versioning']
397             if versioning != 'auto':
398                 self.node.version_remove(src_version_id)
399     
400     @backend_method
401     def get_container_policy(self, user, account, container):
402         """Return a dictionary with the container policy."""
403         
404         logger.debug("get_container_policy: %s %s", account, container)
405         if user != account:
406             if container not in self._allowed_containers(user, account):
407                 raise NotAllowedError
408             return {}
409         path, node = self._lookup_container(account, container)
410         return self._get_policy(node)
411     
412     @backend_method
413     def update_container_policy(self, user, account, container, policy, replace=False):
414         """Update the policy associated with the container."""
415         
416         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
417         if user != account:
418             raise NotAllowedError
419         path, node = self._lookup_container(account, container)
420         self._check_policy(policy)
421         self._put_policy(node, policy, replace)
422     
423     @backend_method
424     def put_container(self, user, account, container, policy={}):
425         """Create a new container with the given name."""
426         
427         logger.debug("put_container: %s %s %s", account, container, policy)
428         if user != account:
429             raise NotAllowedError
430         try:
431             path, node = self._lookup_container(account, container)
432         except NameError:
433             pass
434         else:
435             raise NameError('Container already exists')
436         if policy:
437             self._check_policy(policy)
438         path = '/'.join((account, container))
439         node = self._put_path(user, self._lookup_account(account, True)[1], path)
440         self._put_policy(node, policy, True)
441     
442     @backend_method
443     def delete_container(self, user, account, container, until=None):
444         """Delete/purge the container with the given name."""
445         
446         logger.debug("delete_container: %s %s %s", account, container, until)
447         if user != account:
448             raise NotAllowedError
449         path, node = self._lookup_container(account, container)
450         
451         if until is not None:
452             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
453             for h in hashes:
454                 self.store.map_delete(h)
455             self.node.node_purge_children(node, until, CLUSTER_DELETED)
456             self._report_size_change(user, account, -size, {'action': 'container purge'})
457             return
458         
459         if self._get_statistics(node)[0] > 0:
460             raise IndexError('Container is not empty')
461         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
462         for h in hashes:
463             self.store.map_delete(h)
464         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
465         self.node.node_remove(node)
466         self._report_size_change(user, account, -size, {'action': 'container delete'})
467     
468     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
469         if user != account and until:
470             raise NotAllowedError
471         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
472         if (shared or public) and not allowed:
473             return []
474         path, node = self._lookup_container(account, container)
475         allowed = self._get_formatted_paths(allowed)
476         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
477     
478     def _list_object_permissions(self, user, account, container, prefix, shared, public):
479         allowed = []
480         path = '/'.join((account, container, prefix)).rstrip('/')
481         if user != account:
482             allowed = self.permissions.access_list_paths(user, path)
483             if not allowed:
484                 raise NotAllowedError
485         else:
486             if shared:
487                 allowed = self.permissions.access_list_shared(path)
488                 allowed.extend([x[0] for x in self.permissions.public_list(path)])
489                 allowed = list(set(allowed))
490                 if not allowed:
491                     return []
492         return allowed
493     
494     @backend_method
495     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
496         """Return a list of object (name, version_id) tuples existing under a container."""
497         
498         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
499         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
500     
501     @backend_method
502     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
503         """Return a list of object metadata dicts existing under a container."""
504         
505         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
506         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
507         objects = []
508         for p in props:
509             if len(p) == 2:
510                 objects.append({'subdir': p[0]})
511             else:
512                 objects.append({'name': p[0],
513                                 'bytes': p[self.SIZE + 1],
514                                 'type': p[self.TYPE + 1],
515                                 'hash': p[self.HASH + 1],
516                                 'version': p[self.SERIAL + 1],
517                                 'version_timestamp': p[self.MTIME + 1],
518                                 'modified': p[self.MTIME + 1] if until is None else None,
519                                 'modified_by': p[self.MUSER + 1],
520                                 'uuid': p[self.UUID + 1],
521                                 'checksum': p[self.CHECKSUM + 1]})
522         return objects
523     
524     @backend_method
525     def list_object_permissions(self, user, account, container, prefix=''):
526         """Return a list of paths that enforce permissions under a container."""
527         
528         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
529         return self._list_object_permissions(user, account, container, prefix, True, False)
530     
531     @backend_method
532     def list_object_public(self, user, account, container, prefix=''):
533         """Return a dict mapping paths to public ids for objects that are public under a container."""
534         
535         logger.debug("list_object_public: %s %s %s", account, container, prefix)
536         public = {}
537         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
538             public[path] = p + ULTIMATE_ANSWER
539         return public
540     
541     @backend_method
542     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
543         """Return a dictionary with the object metadata for the domain."""
544         
545         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
546         self._can_read(user, account, container, name)
547         path, node = self._lookup_object(account, container, name)
548         props = self._get_version(node, version)
549         if version is None:
550             modified = props[self.MTIME]
551         else:
552             try:
553                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
554             except NameError: # Object may be deleted.
555                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
556                 if del_props is None:
557                     raise NameError('Object does not exist')
558                 modified = del_props[self.MTIME]
559         
560         meta = {}
561         if include_user_defined:
562             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
563         meta.update({'name': name,
564                      'bytes': props[self.SIZE],
565                      'type': props[self.TYPE],
566                      'hash': props[self.HASH],
567                      'version': props[self.SERIAL],
568                      'version_timestamp': props[self.MTIME],
569                      'modified': modified,
570                      'modified_by': props[self.MUSER],
571                      'uuid': props[self.UUID],
572                      'checksum': props[self.CHECKSUM]})
573         return meta
574     
575     @backend_method
576     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
577         """Update the metadata associated with the object for the domain and return the new version."""
578         
579         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
580         self._can_write(user, account, container, name)
581         path, node = self._lookup_object(account, container, name)
582         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
583         self._apply_versioning(account, container, src_version_id)
584         return dest_version_id
585     
586     @backend_method
587     def get_object_permissions(self, user, account, container, name):
588         """Return the action allowed on the object, the path
589         from which the object gets its permissions from,
590         along with a dictionary containing the permissions."""
591         
592         logger.debug("get_object_permissions: %s %s %s", account, container, name)
593         allowed = 'write'
594         permissions_path = self._get_permissions_path(account, container, name)
595         if user != account:
596             if self.permissions.access_check(permissions_path, self.WRITE, user):
597                 allowed = 'write'
598             elif self.permissions.access_check(permissions_path, self.READ, user):
599                 allowed = 'read'
600             else:
601                 raise NotAllowedError
602         self._lookup_object(account, container, name)
603         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
604     
605     @backend_method
606     def update_object_permissions(self, user, account, container, name, permissions):
607         """Update the permissions associated with the object."""
608         
609         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
610         if user != account:
611             raise NotAllowedError
612         path = self._lookup_object(account, container, name)[0]
613         self._check_permissions(path, permissions)
614         self.permissions.access_set(path, permissions)
615         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
616     
617     @backend_method
618     def get_object_public(self, user, account, container, name):
619         """Return the public id of the object if applicable."""
620         
621         logger.debug("get_object_public: %s %s %s", account, container, name)
622         self._can_read(user, account, container, name)
623         path = self._lookup_object(account, container, name)[0]
624         p = self.permissions.public_get(path)
625         if p is not None:
626             p += ULTIMATE_ANSWER
627         return p
628     
629     @backend_method
630     def update_object_public(self, user, account, container, name, public):
631         """Update the public status of the object."""
632         
633         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
634         self._can_write(user, account, container, name)
635         path = self._lookup_object(account, container, name)[0]
636         if not public:
637             self.permissions.public_unset(path)
638         else:
639             self.permissions.public_set(path)
640     
641     @backend_method
642     def get_object_hashmap(self, user, account, container, name, version=None):
643         """Return the object's size and a list with partial hashes."""
644         
645         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
646         self._can_read(user, account, container, name)
647         path, node = self._lookup_object(account, container, name)
648         props = self._get_version(node, version)
649         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
650         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
651     
652     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
653         if permissions is not None and user != account:
654             raise NotAllowedError
655         self._can_write(user, account, container, name)
656         if permissions is not None:
657             path = '/'.join((account, container, name))
658             self._check_permissions(path, permissions)
659         
660         account_path, account_node = self._lookup_account(account, True)
661         container_path, container_node = self._lookup_container(account, container)
662         path, node = self._put_object_node(container_path, container_node, name)
663         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
664         
665         # Handle meta.
666         if src_version_id is None:
667             src_version_id = pre_version_id
668         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
669         
670         # Check quota.
671         del_size = self._apply_versioning(account, container, pre_version_id)
672         size_delta = size - del_size
673         if size_delta > 0:
674             account_quota = long(self._get_policy(account_node)['quota'])
675             container_quota = long(self._get_policy(container_node)['quota'])
676             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
677                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
678                 # This must be executed in a transaction, so the version is never created if it fails.
679                 raise QuotaError
680         self._report_size_change(user, account, size_delta, {'action': 'object update'})
681         
682         if permissions is not None:
683             self.permissions.access_set(path, permissions)
684             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
685         
686         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
687         return dest_version_id
688     
689     @backend_method
690     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
691         """Create/update an object with the specified size and partial hashes."""
692         
693         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
694         if size == 0: # No such thing as an empty hashmap.
695             hashmap = [self.put_block('')]
696         map = HashMap(self.block_size, self.hash_algorithm)
697         map.extend([binascii.unhexlify(x) for x in hashmap])
698         missing = self.store.block_search(map)
699         if missing:
700             ie = IndexError()
701             ie.data = [binascii.hexlify(x) for x in missing]
702             raise ie
703         
704         hash = map.hash()
705         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
706         self.store.map_put(hash, map)
707         return dest_version_id
708     
709     @backend_method
710     def update_object_checksum(self, user, account, container, name, version, checksum):
711         """Update an object's checksum."""
712         
713         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
714         # Update objects with greater version and same hashmap and size (fix metadata updates).
715         self._can_write(user, account, container, name)
716         path, node = self._lookup_object(account, container, name)
717         props = self._get_version(node, version)
718         versions = self.node.node_get_versions(node)
719         for x in versions:
720             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
721                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
722     
723     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
724         self._can_read(user, src_account, src_container, src_name)
725         path, node = self._lookup_object(src_account, src_container, src_name)
726         # TODO: Will do another fetch of the properties in duplicate version...
727         props = self._get_version(node, src_version) # Check to see if source exists.
728         src_version_id = props[self.SERIAL]
729         hash = props[self.HASH]
730         size = props[self.SIZE]
731         
732         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
733         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
734         return dest_version_id
735     
736     @backend_method
737     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
738         """Copy an object's data and metadata."""
739         
740         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
741         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
742         return dest_version_id
743     
744     @backend_method
745     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
746         """Move an object's data and metadata."""
747         
748         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
749         if user != src_account:
750             raise NotAllowedError
751         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
752         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
753             self._delete_object(user, src_account, src_container, src_name)
754         return dest_version_id
755     
756     def _delete_object(self, user, account, container, name, until=None):
757         if user != account:
758             raise NotAllowedError
759         
760         if until is not None:
761             path = '/'.join((account, container, name))
762             node = self.node.node_lookup(path)
763             if node is None:
764                 return
765             hashes = []
766             size = 0
767             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
768             hashes += h
769             size += s
770             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
771             hashes += h
772             size += s
773             for h in hashes:
774                 self.store.map_delete(h)
775             self.node.node_purge(node, until, CLUSTER_DELETED)
776             try:
777                 props = self._get_version(node)
778             except NameError:
779                 self.permissions.access_clear(path)
780             self._report_size_change(user, account, -size, {'action': 'object purge'})
781             return
782         
783         path, node = self._lookup_object(account, container, name)
784         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
785         del_size = self._apply_versioning(account, container, src_version_id)
786         if del_size:
787             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
788         self._report_object_change(user, account, path, details={'action': 'object delete'})
789         self.permissions.access_clear(path)
790     
791     @backend_method
792     def delete_object(self, user, account, container, name, until=None):
793         """Delete/purge an object."""
794         
795         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
796         self._delete_object(user, account, container, name, until)
797     
798     @backend_method
799     def list_versions(self, user, account, container, name):
800         """Return a list of all (version, version_timestamp) tuples for an object."""
801         
802         logger.debug("list_versions: %s %s %s", account, container, name)
803         self._can_read(user, account, container, name)
804         path, node = self._lookup_object(account, container, name)
805         versions = self.node.node_get_versions(node)
806         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
807     
808     @backend_method
809     def get_uuid(self, user, uuid):
810         """Return the (account, container, name) for the UUID given."""
811         
812         logger.debug("get_uuid: %s", uuid)
813         info = self.node.latest_uuid(uuid)
814         if info is None:
815             raise NameError
816         path, serial = info
817         account, container, name = path.split('/', 2)
818         self._can_read(user, account, container, name)
819         return (account, container, name)
820     
821     @backend_method
822     def get_public(self, user, public):
823         """Return the (account, container, name) for the public id given."""
824         
825         logger.debug("get_public: %s", public)
826         if public is None or public < ULTIMATE_ANSWER:
827             raise NameError
828         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
829         if path is None:
830             raise NameError
831         account, container, name = path.split('/', 2)
832         self._can_read(user, account, container, name)
833         return (account, container, name)
834     
835     @backend_method(autocommit=0)
836     def get_block(self, hash):
837         """Return a block's data."""
838         
839         logger.debug("get_block: %s", hash)
840         block = self.store.block_get(binascii.unhexlify(hash))
841         if not block:
842             raise NameError('Block does not exist')
843         return block
844     
845     @backend_method(autocommit=0)
846     def put_block(self, data):
847         """Store a block and return the hash."""
848         
849         logger.debug("put_block: %s", len(data))
850         return binascii.hexlify(self.store.block_put(data))
851     
852     @backend_method(autocommit=0)
853     def update_block(self, hash, data, offset=0):
854         """Update a known block and return the hash."""
855         
856         logger.debug("update_block: %s %s %s", hash, len(data), offset)
857         if offset == 0 and len(data) == self.block_size:
858             return self.put_block(data)
859         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
860         return binascii.hexlify(h)
861     
862     # Path functions.
863     
864     def _generate_uuid(self):
865         return str(uuidlib.uuid4())
866     
867     def _put_object_node(self, path, parent, name):
868         path = '/'.join((path, name))
869         node = self.node.node_lookup(path)
870         if node is None:
871             node = self.node.node_create(parent, path)
872         return path, node
873     
874     def _put_path(self, user, parent, path):
875         node = self.node.node_create(parent, path)
876         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
877         return node
878     
879     def _lookup_account(self, account, create=True):
880         node = self.node.node_lookup(account)
881         if node is None and create:
882             node = self._put_path(account, self.ROOTNODE, account) # User is account.
883         return account, node
884     
885     def _lookup_container(self, account, container):
886         path = '/'.join((account, container))
887         node = self.node.node_lookup(path)
888         if node is None:
889             raise NameError('Container does not exist')
890         return path, node
891     
892     def _lookup_object(self, account, container, name):
893         path = '/'.join((account, container, name))
894         node = self.node.node_lookup(path)
895         if node is None:
896             raise NameError('Object does not exist')
897         return path, node
898     
899     def _get_properties(self, node, until=None):
900         """Return properties until the timestamp given."""
901         
902         before = until if until is not None else inf
903         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
904         if props is None and until is not None:
905             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
906         if props is None:
907             raise NameError('Path does not exist')
908         return props
909     
910     def _get_statistics(self, node, until=None):
911         """Return count, sum of size and latest timestamp of everything under node."""
912         
913         if until is None:
914             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
915         else:
916             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
917         if stats is None:
918             stats = (0, 0, 0)
919         return stats
920     
921     def _get_version(self, node, version=None):
922         if version is None:
923             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
924             if props is None:
925                 raise NameError('Object does not exist')
926         else:
927             try:
928                 version = int(version)
929             except ValueError:
930                 raise IndexError('Version does not exist')
931             props = self.node.version_get_properties(version)
932             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
933                 raise IndexError('Version does not exist')
934         return props
935     
936     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
937         """Create a new version of the node."""
938         
939         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
940         if props is not None:
941             src_version_id = props[self.SERIAL]
942             src_hash = props[self.HASH]
943             src_size = props[self.SIZE]
944             src_type = props[self.TYPE]
945             src_checksum = props[self.CHECKSUM]
946         else:
947             src_version_id = None
948             src_hash = None
949             src_size = 0
950             src_type = ''
951             src_checksum = ''
952         if size is None: # Set metadata.
953             hash = src_hash # This way hash can be set to None (account or container).
954             size = src_size
955         if type is None:
956             type = src_type
957         if checksum is None:
958             checksum = src_checksum
959         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
960         
961         if src_node is None:
962             pre_version_id = src_version_id
963         else:
964             pre_version_id = None
965             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
966             if props is not None:
967                 pre_version_id = props[self.SERIAL]
968         if pre_version_id is not None:
969             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
970         
971         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
972         return pre_version_id, dest_version_id
973     
974     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
975         if src_version_id is not None:
976             self.node.attribute_copy(src_version_id, dest_version_id)
977         if not replace:
978             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
979             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
980         else:
981             self.node.attribute_del(dest_version_id, domain)
982             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
983     
984     def _put_metadata(self, user, node, domain, meta, replace=False):
985         """Create a new version and store metadata."""
986         
987         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
988         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
989         return src_version_id, dest_version_id
990     
991     def _list_limits(self, listing, marker, limit):
992         start = 0
993         if marker:
994             try:
995                 start = listing.index(marker) + 1
996             except ValueError:
997                 pass
998         if not limit or limit > 10000:
999             limit = 10000
1000         return start, limit
1001     
1002     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1003         cont_prefix = path + '/'
1004         prefix = cont_prefix + prefix
1005         start = cont_prefix + marker if marker else None
1006         before = until if until is not None else inf
1007         filterq = keys if domain else []
1008         sizeq = size_range
1009         
1010         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1011         objects.extend([(p, None) for p in prefixes] if virtual else [])
1012         objects.sort(key=lambda x: x[0])
1013         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1014         
1015         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1016         return objects[start:start + limit]
1017     
1018     # Reporting functions.
1019     
1020     def _report_size_change(self, user, account, size, details={}):
1021         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1022         account_node = self._lookup_account(account, True)[1]
1023         total = self._get_statistics(account_node)[1]
1024         details.update({'user': user, 'total': total})
1025         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1026     
1027     def _report_object_change(self, user, account, path, details={}):
1028         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1029         details.update({'user': user})
1030         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1031     
1032     def _report_sharing_change(self, user, account, path, details={}):
1033         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1034         details.update({'user': user})
1035         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1036     
1037     # Policy functions.
1038     
1039     def _check_policy(self, policy):
1040         for k in policy.keys():
1041             if policy[k] == '':
1042                 policy[k] = self.default_policy.get(k)
1043         for k, v in policy.iteritems():
1044             if k == 'quota':
1045                 q = int(v) # May raise ValueError.
1046                 if q < 0:
1047                     raise ValueError
1048             elif k == 'versioning':
1049                 if v not in ['auto', 'none']:
1050                     raise ValueError
1051             else:
1052                 raise ValueError
1053     
1054     def _put_policy(self, node, policy, replace):
1055         if replace:
1056             for k, v in self.default_policy.iteritems():
1057                 if k not in policy:
1058                     policy[k] = v
1059         self.node.policy_set(node, policy)
1060     
1061     def _get_policy(self, node):
1062         policy = self.default_policy.copy()
1063         policy.update(self.node.policy_get(node))
1064         return policy
1065     
1066     def _apply_versioning(self, account, container, version_id):
1067         """Delete the provided version if such is the policy.
1068            Return size of object removed.
1069         """
1070         
1071         if version_id is None:
1072             return 0
1073         path, node = self._lookup_container(account, container)
1074         versioning = self._get_policy(node)['versioning']
1075         if versioning != 'auto':
1076             hash, size = self.node.version_remove(version_id)
1077             self.store.map_delete(hash)
1078             return size
1079         return 0
1080     
1081     # Access control functions.
1082     
1083     def _check_groups(self, groups):
1084         # raise ValueError('Bad characters in groups')
1085         pass
1086     
1087     def _check_permissions(self, path, permissions):
1088         # raise ValueError('Bad characters in permissions')
1089         pass
1090     
1091     def _get_formatted_paths(self, paths):
1092         formatted = []
1093         for p in paths:
1094             node = self.node.node_lookup(p)
1095             if node is not None:
1096                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1097             if props is not None:
1098                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1099                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1100                 formatted.append((p, self.MATCH_EXACT))
1101         return formatted
1102     
1103     def _get_permissions_path(self, account, container, name):
1104         path = '/'.join((account, container, name))
1105         permission_paths = self.permissions.access_inherit(path)
1106         permission_paths.sort()
1107         permission_paths.reverse()
1108         for p in permission_paths:
1109             if p == path:
1110                 return p
1111             else:
1112                 if p.count('/') < 2:
1113                     continue
1114                 node = self.node.node_lookup(p)
1115                 if node is not None:
1116                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1117                 if props is not None:
1118                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1119                         return p
1120         return None
1121     
1122     def _can_read(self, user, account, container, name):
1123         if user == account:
1124             return True
1125         path = '/'.join((account, container, name))
1126         if self.permissions.public_get(path) is not None:
1127             return True
1128         path = self._get_permissions_path(account, container, name)
1129         if not path:
1130             raise NotAllowedError
1131         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1132             raise NotAllowedError
1133     
1134     def _can_write(self, user, account, container, name):
1135         if user == account:
1136             return True
1137         path = '/'.join((account, container, name))
1138         path = self._get_permissions_path(account, container, name)
1139         if not path:
1140             raise NotAllowedError
1141         if not self.permissions.access_check(path, self.WRITE, user):
1142             raise NotAllowedError
1143     
1144     def _allowed_accounts(self, user):
1145         allow = set()
1146         for path in self.permissions.access_list_paths(user):
1147             allow.add(path.split('/', 1)[0])
1148         return sorted(allow)
1149     
1150     def _allowed_containers(self, user, account):
1151         allow = set()
1152         for path in self.permissions.access_list_paths(user, account):
1153             allow.add(path.split('/', 2)[1])
1154         return sorted(allow)