Add umask option.
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s", account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s", account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s", account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s", account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s", account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared:
327             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
328             allowed = list(set(allowed))
329             start, limit = self._list_limits(allowed, marker, limit)
330             return allowed[start:start + limit]
331         node = self.node.node_lookup(account)
332         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
333     
334     @backend_method
335     def list_container_meta(self, user, account, container, domain, until=None):
336         """Return a list with all the container's object meta keys for the domain."""
337         
338         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
339         allowed = []
340         if user != account:
341             if until:
342                 raise NotAllowedError
343             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
344             if not allowed:
345                 raise NotAllowedError
346         path, node = self._lookup_container(account, container)
347         before = until if until is not None else inf
348         allowed = self._get_formatted_paths(allowed)
349         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
350     
351     @backend_method
352     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
353         """Return a dictionary with the container metadata for the domain."""
354         
355         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
356         if user != account:
357             if until or container not in self._allowed_containers(user, account):
358                 raise NotAllowedError
359         path, node = self._lookup_container(account, container)
360         props = self._get_properties(node, until)
361         mtime = props[self.MTIME]
362         count, bytes, tstamp = self._get_statistics(node, until)
363         tstamp = max(tstamp, mtime)
364         if until is None:
365             modified = tstamp
366         else:
367             modified = self._get_statistics(node)[2] # Overall last modification.
368             modified = max(modified, mtime)
369         
370         if user != account:
371             meta = {'name': container}
372         else:
373             meta = {}
374             if include_user_defined:
375                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
376             if until is not None:
377                 meta.update({'until_timestamp': tstamp})
378             meta.update({'name': container, 'count': count, 'bytes': bytes})
379         meta.update({'modified': modified})
380         return meta
381     
382     @backend_method
383     def update_container_meta(self, user, account, container, domain, meta, replace=False):
384         """Update the metadata associated with the container for the domain."""
385         
386         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
387         if user != account:
388             raise NotAllowedError
389         path, node = self._lookup_container(account, container)
390         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
391         if src_version_id is not None:
392             versioning = self._get_policy(node)['versioning']
393             if versioning != 'auto':
394                 self.node.version_remove(src_version_id)
395     
396     @backend_method
397     def get_container_policy(self, user, account, container):
398         """Return a dictionary with the container policy."""
399         
400         logger.debug("get_container_policy: %s %s", account, container)
401         if user != account:
402             if container not in self._allowed_containers(user, account):
403                 raise NotAllowedError
404             return {}
405         path, node = self._lookup_container(account, container)
406         return self._get_policy(node)
407     
408     @backend_method
409     def update_container_policy(self, user, account, container, policy, replace=False):
410         """Update the policy associated with the container."""
411         
412         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
413         if user != account:
414             raise NotAllowedError
415         path, node = self._lookup_container(account, container)
416         self._check_policy(policy)
417         self._put_policy(node, policy, replace)
418     
419     @backend_method
420     def put_container(self, user, account, container, policy={}):
421         """Create a new container with the given name."""
422         
423         logger.debug("put_container: %s %s %s", account, container, policy)
424         if user != account:
425             raise NotAllowedError
426         try:
427             path, node = self._lookup_container(account, container)
428         except NameError:
429             pass
430         else:
431             raise NameError('Container already exists')
432         if policy:
433             self._check_policy(policy)
434         path = '/'.join((account, container))
435         node = self._put_path(user, self._lookup_account(account, True)[1], path)
436         self._put_policy(node, policy, True)
437     
438     @backend_method
439     def delete_container(self, user, account, container, until=None):
440         """Delete/purge the container with the given name."""
441         
442         logger.debug("delete_container: %s %s %s", account, container, until)
443         if user != account:
444             raise NotAllowedError
445         path, node = self._lookup_container(account, container)
446         
447         if until is not None:
448             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
449             for h in hashes:
450                 self.store.map_delete(h)
451             self.node.node_purge_children(node, until, CLUSTER_DELETED)
452             self._report_size_change(user, account, -size, {'action': 'container purge'})
453             return
454         
455         if self._get_statistics(node)[0] > 0:
456             raise IndexError('Container is not empty')
457         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
458         for h in hashes:
459             self.store.map_delete(h)
460         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
461         self.node.node_remove(node)
462         self._report_size_change(user, account, -size, {'action': 'container delete'})
463     
464     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
465         if user != account and until:
466             raise NotAllowedError
467         allowed = self._list_object_permissions(user, account, container, prefix, shared)
468         if shared and not allowed:
469             return []
470         path, node = self._lookup_container(account, container)
471         allowed = self._get_formatted_paths(allowed)
472         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
473     
474     def _list_object_permissions(self, user, account, container, prefix, shared):
475         allowed = []
476         path = '/'.join((account, container, prefix)).rstrip('/')
477         if user != account:
478             allowed = self.permissions.access_list_paths(user, path)
479             if not allowed:
480                 raise NotAllowedError
481         else:
482             if shared:
483                 allowed = self.permissions.access_list_shared(path)
484                 if not allowed:
485                     return []
486         return allowed
487     
488     @backend_method
489     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
490         """Return a list of object (name, version_id) tuples existing under a container."""
491         
492         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
493         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
494     
495     @backend_method
496     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
497         """Return a list of object metadata dicts existing under a container."""
498         
499         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
500         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
501         objects = []
502         for p in props:
503             if len(p) == 2:
504                 objects.append({'subdir': p[0]})
505             else:
506                 objects.append({'name': p[0],
507                                 'bytes': p[self.SIZE + 1],
508                                 'type': p[self.TYPE + 1],
509                                 'hash': p[self.HASH + 1],
510                                 'version': p[self.SERIAL + 1],
511                                 'version_timestamp': p[self.MTIME + 1],
512                                 'modified': p[self.MTIME + 1] if until is None else None,
513                                 'modified_by': p[self.MUSER + 1],
514                                 'uuid': p[self.UUID + 1],
515                                 'checksum': p[self.CHECKSUM + 1]})
516         return objects
517     
518     @backend_method
519     def list_object_permissions(self, user, account, container, prefix=''):
520         """Return a list of paths that enforce permissions under a container."""
521         
522         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
523         return self._list_object_permissions(user, account, container, prefix, True)
524     
525     @backend_method
526     def list_object_public(self, user, account, container, prefix=''):
527         """Return a dict mapping paths to public ids for objects that are public under a container."""
528         
529         logger.debug("list_object_public: %s %s %s", account, container, prefix)
530         public = {}
531         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
532             public[path] = p + ULTIMATE_ANSWER
533         return public
534     
535     @backend_method
536     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
537         """Return a dictionary with the object metadata for the domain."""
538         
539         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
540         self._can_read(user, account, container, name)
541         path, node = self._lookup_object(account, container, name)
542         props = self._get_version(node, version)
543         if version is None:
544             modified = props[self.MTIME]
545         else:
546             try:
547                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
548             except NameError: # Object may be deleted.
549                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
550                 if del_props is None:
551                     raise NameError('Object does not exist')
552                 modified = del_props[self.MTIME]
553         
554         meta = {}
555         if include_user_defined:
556             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
557         meta.update({'name': name,
558                      'bytes': props[self.SIZE],
559                      'type': props[self.TYPE],
560                      'hash': props[self.HASH],
561                      'version': props[self.SERIAL],
562                      'version_timestamp': props[self.MTIME],
563                      'modified': modified,
564                      'modified_by': props[self.MUSER],
565                      'uuid': props[self.UUID],
566                      'checksum': props[self.CHECKSUM]})
567         return meta
568     
569     @backend_method
570     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
571         """Update the metadata associated with the object for the domain and return the new version."""
572         
573         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
574         self._can_write(user, account, container, name)
575         path, node = self._lookup_object(account, container, name)
576         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
577         self._apply_versioning(account, container, src_version_id)
578         return dest_version_id
579     
580     @backend_method
581     def get_object_permissions(self, user, account, container, name):
582         """Return the action allowed on the object, the path
583         from which the object gets its permissions from,
584         along with a dictionary containing the permissions."""
585         
586         logger.debug("get_object_permissions: %s %s %s", account, container, name)
587         allowed = 'write'
588         permissions_path = self._get_permissions_path(account, container, name)
589         if user != account:
590             if self.permissions.access_check(permissions_path, self.WRITE, user):
591                 allowed = 'write'
592             elif self.permissions.access_check(permissions_path, self.READ, user):
593                 allowed = 'read'
594             else:
595                 raise NotAllowedError
596         self._lookup_object(account, container, name)
597         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
598     
599     @backend_method
600     def update_object_permissions(self, user, account, container, name, permissions):
601         """Update the permissions associated with the object."""
602         
603         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
604         if user != account:
605             raise NotAllowedError
606         path = self._lookup_object(account, container, name)[0]
607         self._check_permissions(path, permissions)
608         self.permissions.access_set(path, permissions)
609         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
610     
611     @backend_method
612     def get_object_public(self, user, account, container, name):
613         """Return the public id of the object if applicable."""
614         
615         logger.debug("get_object_public: %s %s %s", account, container, name)
616         self._can_read(user, account, container, name)
617         path = self._lookup_object(account, container, name)[0]
618         p = self.permissions.public_get(path)
619         if p is not None:
620             p += ULTIMATE_ANSWER
621         return p
622     
623     @backend_method
624     def update_object_public(self, user, account, container, name, public):
625         """Update the public status of the object."""
626         
627         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
628         self._can_write(user, account, container, name)
629         path = self._lookup_object(account, container, name)[0]
630         if not public:
631             self.permissions.public_unset(path)
632         else:
633             self.permissions.public_set(path)
634     
635     @backend_method
636     def get_object_hashmap(self, user, account, container, name, version=None):
637         """Return the object's size and a list with partial hashes."""
638         
639         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
640         self._can_read(user, account, container, name)
641         path, node = self._lookup_object(account, container, name)
642         props = self._get_version(node, version)
643         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
644         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
645     
646     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
647         if permissions is not None and user != account:
648             raise NotAllowedError
649         self._can_write(user, account, container, name)
650         if permissions is not None:
651             path = '/'.join((account, container, name))
652             self._check_permissions(path, permissions)
653         
654         account_path, account_node = self._lookup_account(account, True)
655         container_path, container_node = self._lookup_container(account, container)
656         path, node = self._put_object_node(container_path, container_node, name)
657         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
658         
659         # Handle meta.
660         if src_version_id is None:
661             src_version_id = pre_version_id
662         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
663         
664         # Check quota.
665         del_size = self._apply_versioning(account, container, pre_version_id)
666         size_delta = size - del_size
667         if size_delta > 0:
668             account_quota = long(self._get_policy(account_node)['quota'])
669             container_quota = long(self._get_policy(container_node)['quota'])
670             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
671                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
672                 # This must be executed in a transaction, so the version is never created if it fails.
673                 raise QuotaError
674         self._report_size_change(user, account, size_delta, {'action': 'object update'})
675         
676         if permissions is not None:
677             self.permissions.access_set(path, permissions)
678             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
679         
680         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
681         return dest_version_id
682     
683     @backend_method
684     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
685         """Create/update an object with the specified size and partial hashes."""
686         
687         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
688         if size == 0: # No such thing as an empty hashmap.
689             hashmap = [self.put_block('')]
690         map = HashMap(self.block_size, self.hash_algorithm)
691         map.extend([binascii.unhexlify(x) for x in hashmap])
692         missing = self.store.block_search(map)
693         if missing:
694             ie = IndexError()
695             ie.data = [binascii.hexlify(x) for x in missing]
696             raise ie
697         
698         hash = map.hash()
699         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
700         self.store.map_put(hash, map)
701         return dest_version_id
702     
703     @backend_method
704     def update_object_checksum(self, user, account, container, name, version, checksum):
705         """Update an object's checksum."""
706         
707         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
708         # Update objects with greater version and same hashmap and size (fix metadata updates).
709         self._can_write(user, account, container, name)
710         path, node = self._lookup_object(account, container, name)
711         props = self._get_version(node, version)
712         versions = self.node.node_get_versions(node)
713         for x in versions:
714             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
715                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
716     
717     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
718         self._can_read(user, src_account, src_container, src_name)
719         path, node = self._lookup_object(src_account, src_container, src_name)
720         # TODO: Will do another fetch of the properties in duplicate version...
721         props = self._get_version(node, src_version) # Check to see if source exists.
722         src_version_id = props[self.SERIAL]
723         hash = props[self.HASH]
724         size = props[self.SIZE]
725         
726         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
727         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
728         return dest_version_id
729     
730     @backend_method
731     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
732         """Copy an object's data and metadata."""
733         
734         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
735         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
736         return dest_version_id
737     
738     @backend_method
739     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
740         """Move an object's data and metadata."""
741         
742         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
743         if user != src_account:
744             raise NotAllowedError
745         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
746         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
747             self._delete_object(user, src_account, src_container, src_name)
748         return dest_version_id
749     
750     def _delete_object(self, user, account, container, name, until=None):
751         if user != account:
752             raise NotAllowedError
753         
754         if until is not None:
755             path = '/'.join((account, container, name))
756             node = self.node.node_lookup(path)
757             if node is None:
758                 return
759             hashes = []
760             size = 0
761             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
762             hashes += h
763             size += s
764             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
765             hashes += h
766             size += s
767             for h in hashes:
768                 self.store.map_delete(h)
769             self.node.node_purge(node, until, CLUSTER_DELETED)
770             try:
771                 props = self._get_version(node)
772             except NameError:
773                 self.permissions.access_clear(path)
774             self._report_size_change(user, account, -size, {'action': 'object purge'})
775             return
776         
777         path, node = self._lookup_object(account, container, name)
778         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
779         del_size = self._apply_versioning(account, container, src_version_id)
780         if del_size:
781             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
782         self._report_object_change(user, account, path, details={'action': 'object delete'})
783         self.permissions.access_clear(path)
784     
785     @backend_method
786     def delete_object(self, user, account, container, name, until=None):
787         """Delete/purge an object."""
788         
789         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
790         self._delete_object(user, account, container, name, until)
791     
792     @backend_method
793     def list_versions(self, user, account, container, name):
794         """Return a list of all (version, version_timestamp) tuples for an object."""
795         
796         logger.debug("list_versions: %s %s %s", account, container, name)
797         self._can_read(user, account, container, name)
798         path, node = self._lookup_object(account, container, name)
799         versions = self.node.node_get_versions(node)
800         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
801     
802     @backend_method
803     def get_uuid(self, user, uuid):
804         """Return the (account, container, name) for the UUID given."""
805         
806         logger.debug("get_uuid: %s", uuid)
807         info = self.node.latest_uuid(uuid)
808         if info is None:
809             raise NameError
810         path, serial = info
811         account, container, name = path.split('/', 2)
812         self._can_read(user, account, container, name)
813         return (account, container, name)
814     
815     @backend_method
816     def get_public(self, user, public):
817         """Return the (account, container, name) for the public id given."""
818         
819         logger.debug("get_public: %s", public)
820         if public is None or public < ULTIMATE_ANSWER:
821             raise NameError
822         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
823         if path is None:
824             raise NameError
825         account, container, name = path.split('/', 2)
826         self._can_read(user, account, container, name)
827         return (account, container, name)
828     
829     @backend_method(autocommit=0)
830     def get_block(self, hash):
831         """Return a block's data."""
832         
833         logger.debug("get_block: %s", hash)
834         block = self.store.block_get(binascii.unhexlify(hash))
835         if not block:
836             raise NameError('Block does not exist')
837         return block
838     
839     @backend_method(autocommit=0)
840     def put_block(self, data):
841         """Store a block and return the hash."""
842         
843         logger.debug("put_block: %s", len(data))
844         return binascii.hexlify(self.store.block_put(data))
845     
846     @backend_method(autocommit=0)
847     def update_block(self, hash, data, offset=0):
848         """Update a known block and return the hash."""
849         
850         logger.debug("update_block: %s %s %s", hash, len(data), offset)
851         if offset == 0 and len(data) == self.block_size:
852             return self.put_block(data)
853         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
854         return binascii.hexlify(h)
855     
856     # Path functions.
857     
858     def _generate_uuid(self):
859         return str(uuidlib.uuid4())
860     
861     def _put_object_node(self, path, parent, name):
862         path = '/'.join((path, name))
863         node = self.node.node_lookup(path)
864         if node is None:
865             node = self.node.node_create(parent, path)
866         return path, node
867     
868     def _put_path(self, user, parent, path):
869         node = self.node.node_create(parent, path)
870         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
871         return node
872     
873     def _lookup_account(self, account, create=True):
874         node = self.node.node_lookup(account)
875         if node is None and create:
876             node = self._put_path(account, self.ROOTNODE, account) # User is account.
877         return account, node
878     
879     def _lookup_container(self, account, container):
880         path = '/'.join((account, container))
881         node = self.node.node_lookup(path)
882         if node is None:
883             raise NameError('Container does not exist')
884         return path, node
885     
886     def _lookup_object(self, account, container, name):
887         path = '/'.join((account, container, name))
888         node = self.node.node_lookup(path)
889         if node is None:
890             raise NameError('Object does not exist')
891         return path, node
892     
893     def _get_properties(self, node, until=None):
894         """Return properties until the timestamp given."""
895         
896         before = until if until is not None else inf
897         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
898         if props is None and until is not None:
899             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
900         if props is None:
901             raise NameError('Path does not exist')
902         return props
903     
904     def _get_statistics(self, node, until=None):
905         """Return count, sum of size and latest timestamp of everything under node."""
906         
907         if until is None:
908             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
909         else:
910             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
911         if stats is None:
912             stats = (0, 0, 0)
913         return stats
914     
915     def _get_version(self, node, version=None):
916         if version is None:
917             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
918             if props is None:
919                 raise NameError('Object does not exist')
920         else:
921             try:
922                 version = int(version)
923             except ValueError:
924                 raise IndexError('Version does not exist')
925             props = self.node.version_get_properties(version)
926             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
927                 raise IndexError('Version does not exist')
928         return props
929     
930     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
931         """Create a new version of the node."""
932         
933         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
934         if props is not None:
935             src_version_id = props[self.SERIAL]
936             src_hash = props[self.HASH]
937             src_size = props[self.SIZE]
938             src_type = props[self.TYPE]
939             src_checksum = props[self.CHECKSUM]
940         else:
941             src_version_id = None
942             src_hash = None
943             src_size = 0
944             src_type = ''
945             src_checksum = ''
946         if size is None: # Set metadata.
947             hash = src_hash # This way hash can be set to None (account or container).
948             size = src_size
949         if type is None:
950             type = src_type
951         if checksum is None:
952             checksum = src_checksum
953         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
954         
955         if src_node is None:
956             pre_version_id = src_version_id
957         else:
958             pre_version_id = None
959             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
960             if props is not None:
961                 pre_version_id = props[self.SERIAL]
962         if pre_version_id is not None:
963             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
964         
965         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
966         return pre_version_id, dest_version_id
967     
968     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
969         if src_version_id is not None:
970             self.node.attribute_copy(src_version_id, dest_version_id)
971         if not replace:
972             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
973             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
974         else:
975             self.node.attribute_del(dest_version_id, domain)
976             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
977     
978     def _put_metadata(self, user, node, domain, meta, replace=False):
979         """Create a new version and store metadata."""
980         
981         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
982         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
983         return src_version_id, dest_version_id
984     
985     def _list_limits(self, listing, marker, limit):
986         start = 0
987         if marker:
988             try:
989                 start = listing.index(marker) + 1
990             except ValueError:
991                 pass
992         if not limit or limit > 10000:
993             limit = 10000
994         return start, limit
995     
996     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
997         cont_prefix = path + '/'
998         prefix = cont_prefix + prefix
999         start = cont_prefix + marker if marker else None
1000         before = until if until is not None else inf
1001         filterq = keys if domain else []
1002         sizeq = size_range
1003         
1004         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1005         objects.extend([(p, None) for p in prefixes] if virtual else [])
1006         objects.sort(key=lambda x: x[0])
1007         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1008         
1009         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1010         return objects[start:start + limit]
1011     
1012     # Reporting functions.
1013     
1014     def _report_size_change(self, user, account, size, details={}):
1015         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1016         account_node = self._lookup_account(account, True)[1]
1017         total = self._get_statistics(account_node)[1]
1018         details.update({'user': user, 'total': total})
1019         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1020     
1021     def _report_object_change(self, user, account, path, details={}):
1022         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1023         details.update({'user': user})
1024         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1025     
1026     def _report_sharing_change(self, user, account, path, details={}):
1027         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1028         details.update({'user': user})
1029         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1030     
1031     # Policy functions.
1032     
1033     def _check_policy(self, policy):
1034         for k in policy.keys():
1035             if policy[k] == '':
1036                 policy[k] = self.default_policy.get(k)
1037         for k, v in policy.iteritems():
1038             if k == 'quota':
1039                 q = int(v) # May raise ValueError.
1040                 if q < 0:
1041                     raise ValueError
1042             elif k == 'versioning':
1043                 if v not in ['auto', 'none']:
1044                     raise ValueError
1045             else:
1046                 raise ValueError
1047     
1048     def _put_policy(self, node, policy, replace):
1049         if replace:
1050             for k, v in self.default_policy.iteritems():
1051                 if k not in policy:
1052                     policy[k] = v
1053         self.node.policy_set(node, policy)
1054     
1055     def _get_policy(self, node):
1056         policy = self.default_policy.copy()
1057         policy.update(self.node.policy_get(node))
1058         return policy
1059     
1060     def _apply_versioning(self, account, container, version_id):
1061         """Delete the provided version if such is the policy.
1062            Return size of object removed.
1063         """
1064         
1065         if version_id is None:
1066             return 0
1067         path, node = self._lookup_container(account, container)
1068         versioning = self._get_policy(node)['versioning']
1069         if versioning != 'auto':
1070             hash, size = self.node.version_remove(version_id)
1071             self.store.map_delete(hash)
1072             return size
1073         return 0
1074     
1075     # Access control functions.
1076     
1077     def _check_groups(self, groups):
1078         # raise ValueError('Bad characters in groups')
1079         pass
1080     
1081     def _check_permissions(self, path, permissions):
1082         # raise ValueError('Bad characters in permissions')
1083         pass
1084     
1085     def _get_formatted_paths(self, paths):
1086         formatted = []
1087         for p in paths:
1088             node = self.node.node_lookup(p)
1089             if node is not None:
1090                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1091             if props is not None:
1092                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1093                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1094                 formatted.append((p, self.MATCH_EXACT))
1095         return formatted
1096     
1097     def _get_permissions_path(self, account, container, name):
1098         path = '/'.join((account, container, name))
1099         permission_paths = self.permissions.access_inherit(path)
1100         permission_paths.sort()
1101         permission_paths.reverse()
1102         for p in permission_paths:
1103             if p == path:
1104                 return p
1105             else:
1106                 if p.count('/') < 2:
1107                     continue
1108                 node = self.node.node_lookup(p)
1109                 if node is not None:
1110                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1111                 if props is not None:
1112                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1113                         return p
1114         return None
1115     
1116     def _can_read(self, user, account, container, name):
1117         if user == account:
1118             return True
1119         path = '/'.join((account, container, name))
1120         if self.permissions.public_get(path) is not None:
1121             return True
1122         path = self._get_permissions_path(account, container, name)
1123         if not path:
1124             raise NotAllowedError
1125         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1126             raise NotAllowedError
1127     
1128     def _can_write(self, user, account, container, name):
1129         if user == account:
1130             return True
1131         path = '/'.join((account, container, name))
1132         path = self._get_permissions_path(account, container, name)
1133         if not path:
1134             raise NotAllowedError
1135         if not self.permissions.access_check(path, self.WRITE, user):
1136             raise NotAllowedError
1137     
1138     def _allowed_accounts(self, user):
1139         allow = set()
1140         for path in self.permissions.access_list_paths(user):
1141             allow.add(path.split('/', 1)[0])
1142         return sorted(allow)
1143     
1144     def _allowed_containers(self, user, account):
1145         allow = set()
1146         for path in self.permissions.access_list_paths(user, account):
1147             allow.add(path.split('/', 2)[1])
1148         return sorted(allow)