include user in logs
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s %s", user, account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s %s", user, account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s %s", user, account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s %s", user, account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared or public:
327             allowed = []
328             if shared:
329                 allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330             if public:
331                 allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332             allowed = list(set(allowed))
333             allowed.sort()
334             start, limit = self._list_limits(allowed, marker, limit)
335             return allowed[start:start + limit]
336         node = self.node.node_lookup(account)
337         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338     
339     @backend_method
340     def list_container_meta(self, user, account, container, domain, until=None):
341         """Return a list with all the container's object meta keys for the domain."""
342         
343         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
344         allowed = []
345         if user != account:
346             if until:
347                 raise NotAllowedError
348             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
349             if not allowed:
350                 raise NotAllowedError
351         path, node = self._lookup_container(account, container)
352         before = until if until is not None else inf
353         allowed = self._get_formatted_paths(allowed)
354         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
355     
356     @backend_method
357     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
358         """Return a dictionary with the container metadata for the domain."""
359         
360         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
361         if user != account:
362             if until or container not in self._allowed_containers(user, account):
363                 raise NotAllowedError
364         path, node = self._lookup_container(account, container)
365         props = self._get_properties(node, until)
366         mtime = props[self.MTIME]
367         count, bytes, tstamp = self._get_statistics(node, until)
368         tstamp = max(tstamp, mtime)
369         if until is None:
370             modified = tstamp
371         else:
372             modified = self._get_statistics(node)[2] # Overall last modification.
373             modified = max(modified, mtime)
374         
375         if user != account:
376             meta = {'name': container}
377         else:
378             meta = {}
379             if include_user_defined:
380                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
381             if until is not None:
382                 meta.update({'until_timestamp': tstamp})
383             meta.update({'name': container, 'count': count, 'bytes': bytes})
384         meta.update({'modified': modified})
385         return meta
386     
387     @backend_method
388     def update_container_meta(self, user, account, container, domain, meta, replace=False):
389         """Update the metadata associated with the container for the domain."""
390         
391         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
392         if user != account:
393             raise NotAllowedError
394         path, node = self._lookup_container(account, container)
395         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
396         if src_version_id is not None:
397             versioning = self._get_policy(node)['versioning']
398             if versioning != 'auto':
399                 self.node.version_remove(src_version_id)
400     
401     @backend_method
402     def get_container_policy(self, user, account, container):
403         """Return a dictionary with the container policy."""
404         
405         logger.debug("get_container_policy: %s %s %s", user, account, container)
406         if user != account:
407             if container not in self._allowed_containers(user, account):
408                 raise NotAllowedError
409             return {}
410         path, node = self._lookup_container(account, container)
411         return self._get_policy(node)
412     
413     @backend_method
414     def update_container_policy(self, user, account, container, policy, replace=False):
415         """Update the policy associated with the container."""
416         
417         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
418         if user != account:
419             raise NotAllowedError
420         path, node = self._lookup_container(account, container)
421         self._check_policy(policy)
422         self._put_policy(node, policy, replace)
423     
424     @backend_method
425     def put_container(self, user, account, container, policy={}):
426         """Create a new container with the given name."""
427         
428         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
429         if user != account:
430             raise NotAllowedError
431         try:
432             path, node = self._lookup_container(account, container)
433         except NameError:
434             pass
435         else:
436             raise NameError('Container already exists')
437         if policy:
438             self._check_policy(policy)
439         path = '/'.join((account, container))
440         node = self._put_path(user, self._lookup_account(account, True)[1], path)
441         self._put_policy(node, policy, True)
442     
443     @backend_method
444     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
445         """Delete/purge the container with the given name."""
446         
447         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
448         if user != account:
449             raise NotAllowedError
450         path, node = self._lookup_container(account, container)
451         
452         if until is not None:
453             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
454             for h in hashes:
455                 self.store.map_delete(h)
456             self.node.node_purge_children(node, until, CLUSTER_DELETED)
457             self._report_size_change(user, account, -size, {'action': 'container purge'})
458             return
459         
460         if self._get_statistics(node)[0] > 0:
461             raise IndexError('Container is not empty')
462         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
463         for h in hashes:
464             self.store.map_delete(h)
465         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
466         self.node.node_remove(node)
467         self._report_size_change(user, account, -size, {'action': 'container delete'})
468     
469     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
470         if user != account and until:
471             raise NotAllowedError
472         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
473         if (shared or public) and not allowed:
474             return []
475         path, node = self._lookup_container(account, container)
476         allowed = self._get_formatted_paths(allowed)
477         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
478     
479     def _list_object_permissions(self, user, account, container, prefix, shared, public):
480         allowed = []
481         path = '/'.join((account, container, prefix)).rstrip('/')
482         if user != account:
483             allowed = self.permissions.access_list_paths(user, path)
484             if not allowed:
485                 raise NotAllowedError
486         else:
487             allowed = []
488             if shared:
489                 allowed.extend(self.permissions.access_list_shared(path))
490             if public:
491                 allowed.extend([x[0] for x in self.permissions.public_list(path)])
492             allowed = list(set(allowed))
493             allowed.sort()
494             if not allowed:
495                 return []
496         return allowed
497     
498     @backend_method
499     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
500         """Return a list of object (name, version_id) tuples existing under a container."""
501         
502         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
503         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
504     
505     @backend_method
506     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
507         """Return a list of object metadata dicts existing under a container."""
508         
509         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
510         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
511         objects = []
512         for p in props:
513             if len(p) == 2:
514                 objects.append({'subdir': p[0]})
515             else:
516                 objects.append({'name': p[0],
517                                 'bytes': p[self.SIZE + 1],
518                                 'type': p[self.TYPE + 1],
519                                 'hash': p[self.HASH + 1],
520                                 'version': p[self.SERIAL + 1],
521                                 'version_timestamp': p[self.MTIME + 1],
522                                 'modified': p[self.MTIME + 1] if until is None else None,
523                                 'modified_by': p[self.MUSER + 1],
524                                 'uuid': p[self.UUID + 1],
525                                 'checksum': p[self.CHECKSUM + 1]})
526         return objects
527     
528     @backend_method
529     def list_object_permissions(self, user, account, container, prefix=''):
530         """Return a list of paths that enforce permissions under a container."""
531         
532         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
533         return self._list_object_permissions(user, account, container, prefix, True, False)
534     
535     @backend_method
536     def list_object_public(self, user, account, container, prefix=''):
537         """Return a dict mapping paths to public ids for objects that are public under a container."""
538         
539         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
540         public = {}
541         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
542             public[path] = p + ULTIMATE_ANSWER
543         return public
544     
545     @backend_method
546     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
547         """Return a dictionary with the object metadata for the domain."""
548         
549         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
550         self._can_read(user, account, container, name)
551         path, node = self._lookup_object(account, container, name)
552         props = self._get_version(node, version)
553         if version is None:
554             modified = props[self.MTIME]
555         else:
556             try:
557                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
558             except NameError: # Object may be deleted.
559                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
560                 if del_props is None:
561                     raise NameError('Object does not exist')
562                 modified = del_props[self.MTIME]
563         
564         meta = {}
565         if include_user_defined:
566             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
567         meta.update({'name': name,
568                      'bytes': props[self.SIZE],
569                      'type': props[self.TYPE],
570                      'hash': props[self.HASH],
571                      'version': props[self.SERIAL],
572                      'version_timestamp': props[self.MTIME],
573                      'modified': modified,
574                      'modified_by': props[self.MUSER],
575                      'uuid': props[self.UUID],
576                      'checksum': props[self.CHECKSUM]})
577         return meta
578     
579     @backend_method
580     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
581         """Update the metadata associated with the object for the domain and return the new version."""
582         
583         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
584         self._can_write(user, account, container, name)
585         path, node = self._lookup_object(account, container, name)
586         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
587         self._apply_versioning(account, container, src_version_id)
588         return dest_version_id
589     
590     @backend_method
591     def get_object_permissions(self, user, account, container, name):
592         """Return the action allowed on the object, the path
593         from which the object gets its permissions from,
594         along with a dictionary containing the permissions."""
595         
596         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
597         allowed = 'write'
598         permissions_path = self._get_permissions_path(account, container, name)
599         if user != account:
600             if self.permissions.access_check(permissions_path, self.WRITE, user):
601                 allowed = 'write'
602             elif self.permissions.access_check(permissions_path, self.READ, user):
603                 allowed = 'read'
604             else:
605                 raise NotAllowedError
606         self._lookup_object(account, container, name)
607         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
608     
609     @backend_method
610     def update_object_permissions(self, user, account, container, name, permissions):
611         """Update the permissions associated with the object."""
612         
613         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
614         if user != account:
615             raise NotAllowedError
616         path = self._lookup_object(account, container, name)[0]
617         self._check_permissions(path, permissions)
618         self.permissions.access_set(path, permissions)
619         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
620     
621     @backend_method
622     def get_object_public(self, user, account, container, name):
623         """Return the public id of the object if applicable."""
624         
625         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
626         self._can_read(user, account, container, name)
627         path = self._lookup_object(account, container, name)[0]
628         p = self.permissions.public_get(path)
629         if p is not None:
630             p += ULTIMATE_ANSWER
631         return p
632     
633     @backend_method
634     def update_object_public(self, user, account, container, name, public):
635         """Update the public status of the object."""
636         
637         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
638         self._can_write(user, account, container, name)
639         path = self._lookup_object(account, container, name)[0]
640         if not public:
641             self.permissions.public_unset(path)
642         else:
643             self.permissions.public_set(path)
644     
645     @backend_method
646     def get_object_hashmap(self, user, account, container, name, version=None):
647         """Return the object's size and a list with partial hashes."""
648         
649         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
650         self._can_read(user, account, container, name)
651         path, node = self._lookup_object(account, container, name)
652         props = self._get_version(node, version)
653         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
654         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
655     
656     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
657         if permissions is not None and user != account:
658             raise NotAllowedError
659         self._can_write(user, account, container, name)
660         if permissions is not None:
661             path = '/'.join((account, container, name))
662             self._check_permissions(path, permissions)
663         
664         account_path, account_node = self._lookup_account(account, True)
665         container_path, container_node = self._lookup_container(account, container)
666         path, node = self._put_object_node(container_path, container_node, name)
667         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
668         
669         # Handle meta.
670         if src_version_id is None:
671             src_version_id = pre_version_id
672         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
673         
674         # Check quota.
675         del_size = self._apply_versioning(account, container, pre_version_id)
676         size_delta = size - del_size
677         if size_delta > 0:
678             account_quota = long(self._get_policy(account_node)['quota'])
679             container_quota = long(self._get_policy(container_node)['quota'])
680             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
681                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
682                 # This must be executed in a transaction, so the version is never created if it fails.
683                 raise QuotaError
684         self._report_size_change(user, account, size_delta, {'action': 'object update'})
685         
686         if permissions is not None:
687             self.permissions.access_set(path, permissions)
688             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
689         
690         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
691         return dest_version_id
692     
693     @backend_method
694     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
695         """Create/update an object with the specified size and partial hashes."""
696         
697         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
698         if size == 0: # No such thing as an empty hashmap.
699             hashmap = [self.put_block('')]
700         map = HashMap(self.block_size, self.hash_algorithm)
701         map.extend([binascii.unhexlify(x) for x in hashmap])
702         missing = self.store.block_search(map)
703         if missing:
704             ie = IndexError()
705             ie.data = [binascii.hexlify(x) for x in missing]
706             raise ie
707         
708         hash = map.hash()
709         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
710         self.store.map_put(hash, map)
711         return dest_version_id
712     
713     @backend_method
714     def update_object_checksum(self, user, account, container, name, version, checksum):
715         """Update an object's checksum."""
716         
717         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
718         # Update objects with greater version and same hashmap and size (fix metadata updates).
719         self._can_write(user, account, container, name)
720         path, node = self._lookup_object(account, container, name)
721         props = self._get_version(node, version)
722         versions = self.node.node_get_versions(node)
723         for x in versions:
724             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
725                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
726     
727     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
728         self._can_read(user, src_account, src_container, src_name)
729         path, node = self._lookup_object(src_account, src_container, src_name)
730         # TODO: Will do another fetch of the properties in duplicate version...
731         props = self._get_version(node, src_version) # Check to see if source exists.
732         src_version_id = props[self.SERIAL]
733         hash = props[self.HASH]
734         size = props[self.SIZE]
735         
736         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
737         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
738         return dest_version_id
739     
740     @backend_method
741     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
742         """Copy an object's data and metadata."""
743         
744         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
745         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
746         return dest_version_id
747     
748     @backend_method
749     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
750         """Move an object's data and metadata."""
751         
752         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
753         if user != src_account:
754             raise NotAllowedError
755         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
756         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
757             self._delete_object(user, src_account, src_container, src_name)
758         return dest_version_id
759     
760     def _delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
761         if user != account:
762             raise NotAllowedError
763         
764         if until is not None:
765             path = '/'.join((account, container, name))
766             node = self.node.node_lookup(path)
767             if node is None:
768                 return
769             hashes = []
770             size = 0
771             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
772             hashes += h
773             size += s
774             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
775             hashes += h
776             size += s
777             for h in hashes:
778                 self.store.map_delete(h)
779             self.node.node_purge(node, until, CLUSTER_DELETED)
780             try:
781                 props = self._get_version(node)
782             except NameError:
783                 self.permissions.access_clear(path)
784             self._report_size_change(user, account, -size, {'action': 'object purge'})
785             return
786         
787         path, node = self._lookup_object(account, container, name)
788         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
789         del_size = self._apply_versioning(account, container, src_version_id)
790         if del_size:
791             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
792         self._report_object_change(user, account, path, details={'action': 'object delete'})
793         self.permissions.access_clear(path)
794     
795     @backend_method
796     def (self, user, account, container, name, until=None, prefix='', delimiter=None):
797         """Delete/purge an object."""
798         
799         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
800         self._delete_object(user, account, container, name, until)
801     
802     @backend_method
803     def list_versions(self, user, account, container, name):
804         """Return a list of all (version, version_timestamp) tuples for an object."""
805         
806         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
807         self._can_read(user, account, container, name)
808         path, node = self._lookup_object(account, container, name)
809         versions = self.node.node_get_versions(node)
810         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
811     
812     @backend_method
813     def get_uuid(self, user, uuid):
814         """Return the (account, container, name) for the UUID given."""
815         
816         logger.debug("get_uuid: %s %s", user, uuid)
817         info = self.node.latest_uuid(uuid)
818         if info is None:
819             raise NameError
820         path, serial = info
821         account, container, name = path.split('/', 2)
822         self._can_read(user, account, container, name)
823         return (account, container, name)
824     
825     @backend_method
826     def get_public(self, user, public):
827         """Return the (account, container, name) for the public id given."""
828         
829         logger.debug("get_public: %s %s", user, public)
830         if public is None or public < ULTIMATE_ANSWER:
831             raise NameError
832         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
833         if path is None:
834             raise NameError
835         account, container, name = path.split('/', 2)
836         self._can_read(user, account, container, name)
837         return (account, container, name)
838     
839     @backend_method(autocommit=0)
840     def get_block(self, hash):
841         """Return a block's data."""
842         
843         logger.debug("get_block: %s", hash)
844         block = self.store.block_get(binascii.unhexlify(hash))
845         if not block:
846             raise NameError('Block does not exist')
847         return block
848     
849     @backend_method(autocommit=0)
850     def put_block(self, data):
851         """Store a block and return the hash."""
852         
853         logger.debug("put_block: %s", len(data))
854         return binascii.hexlify(self.store.block_put(data))
855     
856     @backend_method(autocommit=0)
857     def update_block(self, hash, data, offset=0):
858         """Update a known block and return the hash."""
859         
860         logger.debug("update_block: %s %s %s", hash, len(data), offset)
861         if offset == 0 and len(data) == self.block_size:
862             return self.put_block(data)
863         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
864         return binascii.hexlify(h)
865     
866     # Path functions.
867     
868     def _generate_uuid(self):
869         return str(uuidlib.uuid4())
870     
871     def _put_object_node(self, path, parent, name):
872         path = '/'.join((path, name))
873         node = self.node.node_lookup(path)
874         if node is None:
875             node = self.node.node_create(parent, path)
876         return path, node
877     
878     def _put_path(self, user, parent, path):
879         node = self.node.node_create(parent, path)
880         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
881         return node
882     
883     def _lookup_account(self, account, create=True):
884         node = self.node.node_lookup(account)
885         if node is None and create:
886             node = self._put_path(account, self.ROOTNODE, account) # User is account.
887         return account, node
888     
889     def _lookup_container(self, account, container):
890         path = '/'.join((account, container))
891         node = self.node.node_lookup(path)
892         if node is None:
893             raise NameError('Container does not exist')
894         return path, node
895     
896     def _lookup_object(self, account, container, name):
897         path = '/'.join((account, container, name))
898         node = self.node.node_lookup(path)
899         if node is None:
900             raise NameError('Object does not exist')
901         return path, node
902     
903     def _get_properties(self, node, until=None):
904         """Return properties until the timestamp given."""
905         
906         before = until if until is not None else inf
907         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
908         if props is None and until is not None:
909             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
910         if props is None:
911             raise NameError('Path does not exist')
912         return props
913     
914     def _get_statistics(self, node, until=None):
915         """Return count, sum of size and latest timestamp of everything under node."""
916         
917         if until is None:
918             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
919         else:
920             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
921         if stats is None:
922             stats = (0, 0, 0)
923         return stats
924     
925     def _get_version(self, node, version=None):
926         if version is None:
927             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
928             if props is None:
929                 raise NameError('Object does not exist')
930         else:
931             try:
932                 version = int(version)
933             except ValueError:
934                 raise IndexError('Version does not exist')
935             props = self.node.version_get_properties(version)
936             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
937                 raise IndexError('Version does not exist')
938         return props
939     
940     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
941         """Create a new version of the node."""
942         
943         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
944         if props is not None:
945             src_version_id = props[self.SERIAL]
946             src_hash = props[self.HASH]
947             src_size = props[self.SIZE]
948             src_type = props[self.TYPE]
949             src_checksum = props[self.CHECKSUM]
950         else:
951             src_version_id = None
952             src_hash = None
953             src_size = 0
954             src_type = ''
955             src_checksum = ''
956         if size is None: # Set metadata.
957             hash = src_hash # This way hash can be set to None (account or container).
958             size = src_size
959         if type is None:
960             type = src_type
961         if checksum is None:
962             checksum = src_checksum
963         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
964         
965         if src_node is None:
966             pre_version_id = src_version_id
967         else:
968             pre_version_id = None
969             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
970             if props is not None:
971                 pre_version_id = props[self.SERIAL]
972         if pre_version_id is not None:
973             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
974         
975         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
976         return pre_version_id, dest_version_id
977     
978     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
979         if src_version_id is not None:
980             self.node.attribute_copy(src_version_id, dest_version_id)
981         if not replace:
982             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
983             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
984         else:
985             self.node.attribute_del(dest_version_id, domain)
986             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
987     
988     def _put_metadata(self, user, node, domain, meta, replace=False):
989         """Create a new version and store metadata."""
990         
991         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
992         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
993         return src_version_id, dest_version_id
994     
995     def _list_limits(self, listing, marker, limit):
996         start = 0
997         if marker:
998             try:
999                 start = listing.index(marker) + 1
1000             except ValueError:
1001                 pass
1002         if not limit or limit > 10000:
1003             limit = 10000
1004         return start, limit
1005     
1006     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1007         cont_prefix = path + '/'
1008         prefix = cont_prefix + prefix
1009         start = cont_prefix + marker if marker else None
1010         before = until if until is not None else inf
1011         filterq = keys if domain else []
1012         sizeq = size_range
1013         
1014         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1015         objects.extend([(p, None) for p in prefixes] if virtual else [])
1016         objects.sort(key=lambda x: x[0])
1017         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1018         
1019         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1020         return objects[start:start + limit]
1021     
1022     # Reporting functions.
1023     
1024     def _report_size_change(self, user, account, size, details={}):
1025         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1026         account_node = self._lookup_account(account, True)[1]
1027         total = self._get_statistics(account_node)[1]
1028         details.update({'user': user, 'total': total})
1029         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1030     
1031     def _report_object_change(self, user, account, path, details={}):
1032         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1033         details.update({'user': user})
1034         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1035     
1036     def _report_sharing_change(self, user, account, path, details={}):
1037         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1038         details.update({'user': user})
1039         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1040     
1041     # Policy functions.
1042     
1043     def _check_policy(self, policy):
1044         for k in policy.keys():
1045             if policy[k] == '':
1046                 policy[k] = self.default_policy.get(k)
1047         for k, v in policy.iteritems():
1048             if k == 'quota':
1049                 q = int(v) # May raise ValueError.
1050                 if q < 0:
1051                     raise ValueError
1052             elif k == 'versioning':
1053                 if v not in ['auto', 'none']:
1054                     raise ValueError
1055             else:
1056                 raise ValueError
1057     
1058     def _put_policy(self, node, policy, replace):
1059         if replace:
1060             for k, v in self.default_policy.iteritems():
1061                 if k not in policy:
1062                     policy[k] = v
1063         self.node.policy_set(node, policy)
1064     
1065     def _get_policy(self, node):
1066         policy = self.default_policy.copy()
1067         policy.update(self.node.policy_get(node))
1068         return policy
1069     
1070     def _apply_versioning(self, account, container, version_id):
1071         """Delete the provided version if such is the policy.
1072            Return size of object removed.
1073         """
1074         
1075         if version_id is None:
1076             return 0
1077         path, node = self._lookup_container(account, container)
1078         versioning = self._get_policy(node)['versioning']
1079         if versioning != 'auto':
1080             hash, size = self.node.version_remove(version_id)
1081             self.store.map_delete(hash)
1082             return size
1083         return 0
1084     
1085     # Access control functions.
1086     
1087     def _check_groups(self, groups):
1088         # raise ValueError('Bad characters in groups')
1089         pass
1090     
1091     def _check_permissions(self, path, permissions):
1092         # raise ValueError('Bad characters in permissions')
1093         pass
1094     
1095     def _get_formatted_paths(self, paths):
1096         formatted = []
1097         for p in paths:
1098             node = self.node.node_lookup(p)
1099             if node is not None:
1100                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1101             if props is not None:
1102                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1103                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1104                 formatted.append((p, self.MATCH_EXACT))
1105         return formatted
1106     
1107     def _get_permissions_path(self, account, container, name):
1108         path = '/'.join((account, container, name))
1109         permission_paths = self.permissions.access_inherit(path)
1110         permission_paths.sort()
1111         permission_paths.reverse()
1112         for p in permission_paths:
1113             if p == path:
1114                 return p
1115             else:
1116                 if p.count('/') < 2:
1117                     continue
1118                 node = self.node.node_lookup(p)
1119                 if node is not None:
1120                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1121                 if props is not None:
1122                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1123                         return p
1124         return None
1125     
1126     def _can_read(self, user, account, container, name):
1127         if user == account:
1128             return True
1129         path = '/'.join((account, container, name))
1130         if self.permissions.public_get(path) is not None:
1131             return True
1132         path = self._get_permissions_path(account, container, name)
1133         if not path:
1134             raise NotAllowedError
1135         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1136             raise NotAllowedError
1137     
1138     def _can_write(self, user, account, container, name):
1139         if user == account:
1140             return True
1141         path = '/'.join((account, container, name))
1142         path = self._get_permissions_path(account, container, name)
1143         if not path:
1144             raise NotAllowedError
1145         if not self.permissions.access_check(path, self.WRITE, user):
1146             raise NotAllowedError
1147     
1148     def _allowed_accounts(self, user):
1149         allow = set()
1150         for path in self.permissions.access_list_paths(user):
1151             allow.add(path.split('/', 1)[0])
1152         return sorted(allow)
1153     
1154     def _allowed_containers(self, user, account):
1155         allow = set()
1156         for path in self.permissions.access_list_paths(user, account):
1157             allow.add(path.split('/', 2)[1])
1158         return sorted(allow)