Merge commit 'v0.9.0' into packaging
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import binascii
40
41 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42
43 from pithos.lib.hashmap import HashMap
44
45 # Default modules and settings.
46 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 DEFAULT_BLOCK_PATH = 'data/'
50 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52
53 QUEUE_MESSAGE_KEY = '#'
54 QUEUE_CLIENT_ID = 2 # Pithos.
55
56 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
57
58 inf = float('inf')
59
60 ULTIMATE_ANSWER = 42
61
62
63 logger = logging.getLogger(__name__)
64
65
66 def backend_method(func=None, autocommit=1):
67     if func is None:
68         def fn(func):
69             return backend_method(func, autocommit)
70         return fn
71
72     if not autocommit:
73         return func
74     def fn(self, *args, **kw):
75         self.wrapper.execute()
76         try:
77             ret = func(self, *args, **kw)
78             self.wrapper.commit()
79             return ret
80         except:
81             self.wrapper.rollback()
82             raise
83     return fn
84
85
86 class ModularBackend(BaseBackend):
87     """A modular backend.
88     
89     Uses modules for SQL functions and storage.
90     """
91     
92     def __init__(self, db_module=None, db_connection=None,
93                  block_module=None, block_path=None,
94                  queue_module=None, queue_connection=None):
95         db_module = db_module or DEFAULT_DB_MODULE
96         db_connection = db_connection or DEFAULT_DB_CONNECTION
97         block_module = block_module or DEFAULT_BLOCK_MODULE
98         block_path = block_path or DEFAULT_BLOCK_PATH
99         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
100         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
101         
102         self.hash_algorithm = 'sha256'
103         self.block_size = 4 * 1024 * 1024 # 4MB
104         
105         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
106         
107         def load_module(m):
108             __import__(m)
109             return sys.modules[m]
110         
111         self.db_module = load_module(db_module)
112         self.wrapper = self.db_module.DBWrapper(db_connection)
113         params = {'wrapper': self.wrapper}
114         self.permissions = self.db_module.Permissions(**params)
115         for x in ['READ', 'WRITE']:
116             setattr(self, x, getattr(self.db_module, x))
117         self.node = self.db_module.Node(**params)
118         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
119             setattr(self, x, getattr(self.db_module, x))
120         
121         self.block_module = load_module(block_module)
122         params = {'path': block_path,
123                   'block_size': self.block_size,
124                   'hash_algorithm': self.hash_algorithm}
125         self.store = self.block_module.Store(**params)
126
127         if queue_module and queue_connection:
128             self.queue_module = load_module(queue_module)
129             params = {'exchange': queue_connection,
130                       'message_key': QUEUE_MESSAGE_KEY,
131                       'client_id': QUEUE_CLIENT_ID}
132             self.queue = self.queue_module.Queue(**params)
133         else:
134             class NoQueue:
135                 def send(self, *args):
136                     pass
137                 
138                 def close(self):
139                     pass
140             
141             self.queue = NoQueue()
142     
143     def close(self):
144         self.wrapper.close()
145         self.queue.close()
146     
147     @backend_method
148     def list_accounts(self, user, marker=None, limit=10000):
149         """Return a list of accounts the user can access."""
150         
151         logger.debug("list_accounts: %s %s %s", user, marker, limit)
152         allowed = self._allowed_accounts(user)
153         start, limit = self._list_limits(allowed, marker, limit)
154         return allowed[start:start + limit]
155     
156     @backend_method
157     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
158         """Return a dictionary with the account metadata for the domain."""
159         
160         logger.debug("get_account_meta: %s %s %s", account, domain, until)
161         path, node = self._lookup_account(account, user == account)
162         if user != account:
163             if until or node is None or account not in self._allowed_accounts(user):
164                 raise NotAllowedError
165         try:
166             props = self._get_properties(node, until)
167             mtime = props[self.MTIME]
168         except NameError:
169             props = None
170             mtime = until
171         count, bytes, tstamp = self._get_statistics(node, until)
172         tstamp = max(tstamp, mtime)
173         if until is None:
174             modified = tstamp
175         else:
176             modified = self._get_statistics(node)[2] # Overall last modification.
177             modified = max(modified, mtime)
178         
179         if user != account:
180             meta = {'name': account}
181         else:
182             meta = {}
183             if props is not None and include_user_defined:
184                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
185             if until is not None:
186                 meta.update({'until_timestamp': tstamp})
187             meta.update({'name': account, 'count': count, 'bytes': bytes})
188         meta.update({'modified': modified})
189         return meta
190     
191     @backend_method
192     def update_account_meta(self, user, account, domain, meta, replace=False):
193         """Update the metadata associated with the account for the domain."""
194         
195         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
196         if user != account:
197             raise NotAllowedError
198         path, node = self._lookup_account(account, True)
199         self._put_metadata(user, node, domain, meta, replace)
200     
201     @backend_method
202     def get_account_groups(self, user, account):
203         """Return a dictionary with the user groups defined for this account."""
204         
205         logger.debug("get_account_groups: %s", account)
206         if user != account:
207             if account not in self._allowed_accounts(user):
208                 raise NotAllowedError
209             return {}
210         self._lookup_account(account, True)
211         return self.permissions.group_dict(account)
212     
213     @backend_method
214     def update_account_groups(self, user, account, groups, replace=False):
215         """Update the groups associated with the account."""
216         
217         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
218         if user != account:
219             raise NotAllowedError
220         self._lookup_account(account, True)
221         self._check_groups(groups)
222         if replace:
223             self.permissions.group_destroy(account)
224         for k, v in groups.iteritems():
225             if not replace: # If not already deleted.
226                 self.permissions.group_delete(account, k)
227             if v:
228                 self.permissions.group_addmany(account, k, v)
229     
230     @backend_method
231     def get_account_policy(self, user, account):
232         """Return a dictionary with the account policy."""
233         
234         logger.debug("get_account_policy: %s", account)
235         if user != account:
236             if account not in self._allowed_accounts(user):
237                 raise NotAllowedError
238             return {}
239         path, node = self._lookup_account(account, True)
240         return self._get_policy(node)
241     
242     @backend_method
243     def update_account_policy(self, user, account, policy, replace=False):
244         """Update the policy associated with the account."""
245         
246         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
247         if user != account:
248             raise NotAllowedError
249         path, node = self._lookup_account(account, True)
250         self._check_policy(policy)
251         self._put_policy(node, policy, replace)
252     
253     @backend_method
254     def put_account(self, user, account, policy={}):
255         """Create a new account with the given name."""
256         
257         logger.debug("put_account: %s %s", account, policy)
258         if user != account:
259             raise NotAllowedError
260         node = self.node.node_lookup(account)
261         if node is not None:
262             raise NameError('Account already exists')
263         if policy:
264             self._check_policy(policy)
265         node = self._put_path(user, self.ROOTNODE, account)
266         self._put_policy(node, policy, True)
267     
268     @backend_method
269     def delete_account(self, user, account):
270         """Delete the account with the given name."""
271         
272         logger.debug("delete_account: %s", account)
273         if user != account:
274             raise NotAllowedError
275         node = self.node.node_lookup(account)
276         if node is None:
277             return
278         if not self.node.node_remove(node):
279             raise IndexError('Account is not empty')
280         self.permissions.group_destroy(account)
281     
282     @backend_method
283     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
284         """Return a list of containers existing under an account."""
285         
286         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
287         if user != account:
288             if until or account not in self._allowed_accounts(user):
289                 raise NotAllowedError
290             allowed = self._allowed_containers(user, account)
291             start, limit = self._list_limits(allowed, marker, limit)
292             return allowed[start:start + limit]
293         if shared:
294             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
295             allowed = list(set(allowed))
296             start, limit = self._list_limits(allowed, marker, limit)
297             return allowed[start:start + limit]
298         node = self.node.node_lookup(account)
299         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
300     
301     @backend_method
302     def list_container_meta(self, user, account, container, domain, until=None):
303         """Return a list with all the container's object meta keys for the domain."""
304         
305         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
306         allowed = []
307         if user != account:
308             if until:
309                 raise NotAllowedError
310             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
311             if not allowed:
312                 raise NotAllowedError
313         path, node = self._lookup_container(account, container)
314         before = until if until is not None else inf
315         allowed = self._get_formatted_paths(allowed)
316         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
317     
318     @backend_method
319     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
320         """Return a dictionary with the container metadata for the domain."""
321         
322         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
323         if user != account:
324             if until or container not in self._allowed_containers(user, account):
325                 raise NotAllowedError
326         path, node = self._lookup_container(account, container)
327         props = self._get_properties(node, until)
328         mtime = props[self.MTIME]
329         count, bytes, tstamp = self._get_statistics(node, until)
330         tstamp = max(tstamp, mtime)
331         if until is None:
332             modified = tstamp
333         else:
334             modified = self._get_statistics(node)[2] # Overall last modification.
335             modified = max(modified, mtime)
336         
337         if user != account:
338             meta = {'name': container}
339         else:
340             meta = {}
341             if include_user_defined:
342                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
343             if until is not None:
344                 meta.update({'until_timestamp': tstamp})
345             meta.update({'name': container, 'count': count, 'bytes': bytes})
346         meta.update({'modified': modified})
347         return meta
348     
349     @backend_method
350     def update_container_meta(self, user, account, container, domain, meta, replace=False):
351         """Update the metadata associated with the container for the domain."""
352         
353         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
354         if user != account:
355             raise NotAllowedError
356         path, node = self._lookup_container(account, container)
357         self._put_metadata(user, node, domain, meta, replace)
358     
359     @backend_method
360     def get_container_policy(self, user, account, container):
361         """Return a dictionary with the container policy."""
362         
363         logger.debug("get_container_policy: %s %s", account, container)
364         if user != account:
365             if container not in self._allowed_containers(user, account):
366                 raise NotAllowedError
367             return {}
368         path, node = self._lookup_container(account, container)
369         return self._get_policy(node)
370     
371     @backend_method
372     def update_container_policy(self, user, account, container, policy, replace=False):
373         """Update the policy associated with the container."""
374         
375         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
376         if user != account:
377             raise NotAllowedError
378         path, node = self._lookup_container(account, container)
379         self._check_policy(policy)
380         self._put_policy(node, policy, replace)
381     
382     @backend_method
383     def put_container(self, user, account, container, policy={}):
384         """Create a new container with the given name."""
385         
386         logger.debug("put_container: %s %s %s", account, container, policy)
387         if user != account:
388             raise NotAllowedError
389         try:
390             path, node = self._lookup_container(account, container)
391         except NameError:
392             pass
393         else:
394             raise NameError('Container already exists')
395         if policy:
396             self._check_policy(policy)
397         path = '/'.join((account, container))
398         node = self._put_path(user, self._lookup_account(account, True)[1], path)
399         self._put_policy(node, policy, True)
400     
401     @backend_method
402     def delete_container(self, user, account, container, until=None):
403         """Delete/purge the container with the given name."""
404         
405         logger.debug("delete_container: %s %s %s", account, container, until)
406         if user != account:
407             raise NotAllowedError
408         path, node = self._lookup_container(account, container)
409         
410         if until is not None:
411             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
412             for h in hashes:
413                 self.store.map_delete(h)
414             self.node.node_purge_children(node, until, CLUSTER_DELETED)
415             self._report_size_change(user, account, -size, {'action': 'container purge'})
416             return
417         
418         if self._get_statistics(node)[0] > 0:
419             raise IndexError('Container is not empty')
420         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
421         for h in hashes:
422             self.store.map_delete(h)
423         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
424         self.node.node_remove(node)
425         self._report_size_change(user, account, -size, {'action': 'container delete'})
426     
427     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
428         if user != account and until:
429             raise NotAllowedError
430         allowed = self._list_object_permissions(user, account, container, prefix, shared)
431         if shared and not allowed:
432             return []
433         path, node = self._lookup_container(account, container)
434         allowed = self._get_formatted_paths(allowed)
435         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
436     
437     def _list_object_permissions(self, user, account, container, prefix, shared):
438         allowed = []
439         path = '/'.join((account, container, prefix)).rstrip('/')
440         if user != account:
441             allowed = self.permissions.access_list_paths(user, path)
442             if not allowed:
443                 raise NotAllowedError
444         else:
445             if shared:
446                 allowed = self.permissions.access_list_shared(path)
447                 if not allowed:
448                     return []
449         return allowed
450     
451     @backend_method
452     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
453         """Return a list of object (name, version_id) tuples existing under a container."""
454         
455         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
456         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
457     
458     @backend_method
459     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
460         """Return a list of object metadata dicts existing under a container."""
461         
462         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
463         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
464         objects = []
465         for p in props:
466             if len(p) == 2:
467                 objects.append({'subdir': p[0]})
468             else:
469                 objects.append({'name': p[0],
470                                 'bytes': p[self.SIZE + 1],
471                                 'type': p[self.TYPE + 1],
472                                 'hash': p[self.HASH + 1],
473                                 'version': p[self.SERIAL + 1],
474                                 'version_timestamp': p[self.MTIME + 1],
475                                 'modified': p[self.MTIME + 1] if until is None else None,
476                                 'modified_by': p[self.MUSER + 1],
477                                 'uuid': p[self.UUID + 1],
478                                 'checksum': p[self.CHECKSUM + 1]})
479         return objects
480     
481     @backend_method
482     def list_object_permissions(self, user, account, container, prefix=''):
483         """Return a list of paths that enforce permissions under a container."""
484         
485         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
486         return self._list_object_permissions(user, account, container, prefix, True)
487     
488     @backend_method
489     def list_object_public(self, user, account, container, prefix=''):
490         """Return a dict mapping paths to public ids for objects that are public under a container."""
491         
492         logger.debug("list_object_public: %s %s %s", account, container, prefix)
493         public = {}
494         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
495             public[path] = p + ULTIMATE_ANSWER
496         return public
497     
498     @backend_method
499     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
500         """Return a dictionary with the object metadata for the domain."""
501         
502         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
503         self._can_read(user, account, container, name)
504         path, node = self._lookup_object(account, container, name)
505         props = self._get_version(node, version)
506         if version is None:
507             modified = props[self.MTIME]
508         else:
509             try:
510                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
511             except NameError: # Object may be deleted.
512                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
513                 if del_props is None:
514                     raise NameError('Object does not exist')
515                 modified = del_props[self.MTIME]
516         
517         meta = {}
518         if include_user_defined:
519             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
520         meta.update({'name': name,
521                      'bytes': props[self.SIZE],
522                      'type': props[self.TYPE],
523                      'hash': props[self.HASH],
524                      'version': props[self.SERIAL],
525                      'version_timestamp': props[self.MTIME],
526                      'modified': modified,
527                      'modified_by': props[self.MUSER],
528                      'uuid': props[self.UUID],
529                      'checksum': props[self.CHECKSUM]})
530         return meta
531     
532     @backend_method
533     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
534         """Update the metadata associated with the object for the domain and return the new version."""
535         
536         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
537         self._can_write(user, account, container, name)
538         path, node = self._lookup_object(account, container, name)
539         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
540         self._apply_versioning(account, container, src_version_id)
541         return dest_version_id
542     
543     @backend_method
544     def get_object_permissions(self, user, account, container, name):
545         """Return the action allowed on the object, the path
546         from which the object gets its permissions from,
547         along with a dictionary containing the permissions."""
548         
549         logger.debug("get_object_permissions: %s %s %s", account, container, name)
550         allowed = 'write'
551         permissions_path = self._get_permissions_path(account, container, name)
552         if user != account:
553             if self.permissions.access_check(permissions_path, self.WRITE, user):
554                 allowed = 'write'
555             elif self.permissions.access_check(permissions_path, self.READ, user):
556                 allowed = 'read'
557             else:
558                 raise NotAllowedError
559         self._lookup_object(account, container, name)
560         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
561     
562     @backend_method
563     def update_object_permissions(self, user, account, container, name, permissions):
564         """Update the permissions associated with the object."""
565         
566         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
567         if user != account:
568             raise NotAllowedError
569         path = self._lookup_object(account, container, name)[0]
570         self._check_permissions(path, permissions)
571         self.permissions.access_set(path, permissions)
572     
573     @backend_method
574     def get_object_public(self, user, account, container, name):
575         """Return the public id of the object if applicable."""
576         
577         logger.debug("get_object_public: %s %s %s", account, container, name)
578         self._can_read(user, account, container, name)
579         path = self._lookup_object(account, container, name)[0]
580         p = self.permissions.public_get(path)
581         if p is not None:
582             p += ULTIMATE_ANSWER
583         return p
584     
585     @backend_method
586     def update_object_public(self, user, account, container, name, public):
587         """Update the public status of the object."""
588         
589         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
590         self._can_write(user, account, container, name)
591         path = self._lookup_object(account, container, name)[0]
592         if not public:
593             self.permissions.public_unset(path)
594         else:
595             self.permissions.public_set(path)
596     
597     @backend_method
598     def get_object_hashmap(self, user, account, container, name, version=None):
599         """Return the object's size and a list with partial hashes."""
600         
601         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
602         self._can_read(user, account, container, name)
603         path, node = self._lookup_object(account, container, name)
604         props = self._get_version(node, version)
605         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
606         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
607     
608     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
609         if permissions is not None and user != account:
610             raise NotAllowedError
611         self._can_write(user, account, container, name)
612         if permissions is not None:
613             path = '/'.join((account, container, name))
614             self._check_permissions(path, permissions)
615         
616         account_path, account_node = self._lookup_account(account, True)
617         container_path, container_node = self._lookup_container(account, container)
618         path, node = self._put_object_node(container_path, container_node, name)
619         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
620         
621         # Check quota.
622         del_size = self._apply_versioning(account, container, pre_version_id)
623         size_delta = size - del_size
624         if size_delta > 0:
625             account_quota = long(self._get_policy(account_node)['quota'])
626             container_quota = long(self._get_policy(container_node)['quota'])
627             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
628                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
629                 # This must be executed in a transaction, so the version is never created if it fails.
630                 raise QuotaError
631         self._report_size_change(user, account, size_delta, {'action': 'object update'})
632         
633         if permissions is not None:
634             self.permissions.access_set(path, permissions)
635         return pre_version_id, dest_version_id
636     
637     @backend_method
638     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
639         """Create/update an object with the specified size and partial hashes."""
640         
641         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
642         if size == 0: # No such thing as an empty hashmap.
643             hashmap = [self.put_block('')]
644         map = HashMap(self.block_size, self.hash_algorithm)
645         map.extend([binascii.unhexlify(x) for x in hashmap])
646         missing = self.store.block_search(map)
647         if missing:
648             ie = IndexError()
649             ie.data = [binascii.hexlify(x) for x in missing]
650             raise ie
651         
652         hash = map.hash()
653         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
654         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
655         self.store.map_put(hash, map)
656         return dest_version_id
657     
658     @backend_method
659     def update_object_checksum(self, user, account, container, name, version, checksum):
660         """Update an object's checksum."""
661         
662         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
663         # Update objects with greater version and same hashmap and size (fix metadata updates).
664         self._can_write(user, account, container, name)
665         path, node = self._lookup_object(account, container, name)
666         props = self._get_version(node, version)
667         versions = self.node.node_get_versions(node)
668         for x in versions:
669             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
670                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
671     
672     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
673         self._can_read(user, src_account, src_container, src_name)
674         path, node = self._lookup_object(src_account, src_container, src_name)
675         # TODO: Will do another fetch of the properties in duplicate version...
676         props = self._get_version(node, src_version) # Check to see if source exists.
677         src_version_id = props[self.SERIAL]
678         hash = props[self.HASH]
679         size = props[self.SIZE]
680         
681         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
682         pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
683         self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
684         return dest_version_id
685     
686     @backend_method
687     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
688         """Copy an object's data and metadata."""
689         
690         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
691         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
692         return dest_version_id
693     
694     @backend_method
695     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
696         """Move an object's data and metadata."""
697         
698         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
699         if user != src_account:
700             raise NotAllowedError
701         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
702         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
703             self._delete_object(user, src_account, src_container, src_name)
704         return dest_version_id
705     
706     def _delete_object(self, user, account, container, name, until=None):
707         if user != account:
708             raise NotAllowedError
709         
710         if until is not None:
711             path = '/'.join((account, container, name))
712             node = self.node.node_lookup(path)
713             if node is None:
714                 return
715             hashes = []
716             size = 0
717             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
718             hashes += h
719             size += s
720             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
721             hashes += h
722             size += s
723             for h in hashes:
724                 self.store.map_delete(h)
725             self.node.node_purge(node, until, CLUSTER_DELETED)
726             try:
727                 props = self._get_version(node)
728             except NameError:
729                 self.permissions.access_clear(path)
730             self._report_size_change(user, account, -size, {'action': 'object purge'})
731             return
732         
733         path, node = self._lookup_object(account, container, name)
734         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
735         del_size = self._apply_versioning(account, container, src_version_id)
736         if del_size:
737             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
738         self.permissions.access_clear(path)
739     
740     @backend_method
741     def delete_object(self, user, account, container, name, until=None):
742         """Delete/purge an object."""
743         
744         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
745         self._delete_object(user, account, container, name, until)
746     
747     @backend_method
748     def list_versions(self, user, account, container, name):
749         """Return a list of all (version, version_timestamp) tuples for an object."""
750         
751         logger.debug("list_versions: %s %s %s", account, container, name)
752         self._can_read(user, account, container, name)
753         path, node = self._lookup_object(account, container, name)
754         versions = self.node.node_get_versions(node)
755         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
756     
757     @backend_method
758     def get_uuid(self, user, uuid):
759         """Return the (account, container, name) for the UUID given."""
760         
761         logger.debug("get_uuid: %s", uuid)
762         info = self.node.latest_uuid(uuid)
763         if info is None:
764             raise NameError
765         path, serial = info
766         account, container, name = path.split('/', 2)
767         self._can_read(user, account, container, name)
768         return (account, container, name)
769     
770     @backend_method
771     def get_public(self, user, public):
772         """Return the (account, container, name) for the public id given."""
773         
774         logger.debug("get_public: %s", public)
775         if public is None or public < ULTIMATE_ANSWER:
776             raise NameError
777         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
778         if path is None:
779             raise NameError
780         account, container, name = path.split('/', 2)
781         self._can_read(user, account, container, name)
782         return (account, container, name)
783     
784     @backend_method(autocommit=0)
785     def get_block(self, hash):
786         """Return a block's data."""
787         
788         logger.debug("get_block: %s", hash)
789         block = self.store.block_get(binascii.unhexlify(hash))
790         if not block:
791             raise NameError('Block does not exist')
792         return block
793     
794     @backend_method(autocommit=0)
795     def put_block(self, data):
796         """Store a block and return the hash."""
797         
798         logger.debug("put_block: %s", len(data))
799         return binascii.hexlify(self.store.block_put(data))
800     
801     @backend_method(autocommit=0)
802     def update_block(self, hash, data, offset=0):
803         """Update a known block and return the hash."""
804         
805         logger.debug("update_block: %s %s %s", hash, len(data), offset)
806         if offset == 0 and len(data) == self.block_size:
807             return self.put_block(data)
808         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
809         return binascii.hexlify(h)
810     
811     # Path functions.
812     
813     def _generate_uuid(self):
814         return str(uuidlib.uuid4())
815     
816     def _put_object_node(self, path, parent, name):
817         path = '/'.join((path, name))
818         node = self.node.node_lookup(path)
819         if node is None:
820             node = self.node.node_create(parent, path)
821         return path, node
822     
823     def _put_path(self, user, parent, path):
824         node = self.node.node_create(parent, path)
825         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
826         return node
827     
828     def _lookup_account(self, account, create=True):
829         node = self.node.node_lookup(account)
830         if node is None and create:
831             node = self._put_path(account, self.ROOTNODE, account) # User is account.
832         return account, node
833     
834     def _lookup_container(self, account, container):
835         path = '/'.join((account, container))
836         node = self.node.node_lookup(path)
837         if node is None:
838             raise NameError('Container does not exist')
839         return path, node
840     
841     def _lookup_object(self, account, container, name):
842         path = '/'.join((account, container, name))
843         node = self.node.node_lookup(path)
844         if node is None:
845             raise NameError('Object does not exist')
846         return path, node
847     
848     def _get_properties(self, node, until=None):
849         """Return properties until the timestamp given."""
850         
851         before = until if until is not None else inf
852         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
853         if props is None and until is not None:
854             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
855         if props is None:
856             raise NameError('Path does not exist')
857         return props
858     
859     def _get_statistics(self, node, until=None):
860         """Return count, sum of size and latest timestamp of everything under node."""
861         
862         if until is None:
863             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
864         else:
865             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
866         if stats is None:
867             stats = (0, 0, 0)
868         return stats
869     
870     def _get_version(self, node, version=None):
871         if version is None:
872             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
873             if props is None:
874                 raise NameError('Object does not exist')
875         else:
876             try:
877                 version = int(version)
878             except ValueError:
879                 raise IndexError('Version does not exist')
880             props = self.node.version_get_properties(version)
881             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
882                 raise IndexError('Version does not exist')
883         return props
884     
885     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
886         """Create a new version of the node."""
887         
888         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
889         if props is not None:
890             src_version_id = props[self.SERIAL]
891             src_hash = props[self.HASH]
892             src_size = props[self.SIZE]
893             src_type = props[self.TYPE]
894             src_checksum = props[self.CHECKSUM]
895         else:
896             src_version_id = None
897             src_hash = None
898             src_size = 0
899             src_type = ''
900             src_checksum = ''
901         if size is None: # Set metadata.
902             hash = src_hash # This way hash can be set to None (account or container).
903             size = src_size
904         if type is None:
905             type = src_type
906         if checksum is None:
907             checksum = src_checksum
908         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
909         
910         if src_node is None:
911             pre_version_id = src_version_id
912         else:
913             pre_version_id = None
914             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
915             if props is not None:
916                 pre_version_id = props[self.SERIAL]
917         if pre_version_id is not None:
918             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
919         
920         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
921         return pre_version_id, dest_version_id
922     
923     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
924         if src_version_id is not None:
925             self.node.attribute_copy(src_version_id, dest_version_id)
926         if not replace:
927             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
928             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
929         else:
930             self.node.attribute_del(dest_version_id, domain)
931             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
932     
933     def _put_metadata(self, user, node, domain, meta, replace=False):
934         """Create a new version and store metadata."""
935         
936         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
937         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
938         return src_version_id, dest_version_id
939     
940     def _list_limits(self, listing, marker, limit):
941         start = 0
942         if marker:
943             try:
944                 start = listing.index(marker) + 1
945             except ValueError:
946                 pass
947         if not limit or limit > 10000:
948             limit = 10000
949         return start, limit
950     
951     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
952         cont_prefix = path + '/'
953         prefix = cont_prefix + prefix
954         start = cont_prefix + marker if marker else None
955         before = until if until is not None else inf
956         filterq = keys if domain else []
957         sizeq = size_range
958         
959         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
960         objects.extend([(p, None) for p in prefixes] if virtual else [])
961         objects.sort(key=lambda x: x[0])
962         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
963         
964         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
965         return objects[start:start + limit]
966     
967     # Reporting functions.
968     
969     def _report_size_change(self, user, account, size, details={}):
970         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
971         account_node = self._lookup_account(account, True)[1]
972         total = self._get_statistics(account_node)[1]
973         details.update({'user': user, 'total': total})
974         self.queue.send(account, 'diskspace', size, details)
975     
976     # Policy functions.
977     
978     def _check_policy(self, policy):
979         for k in policy.keys():
980             if policy[k] == '':
981                 policy[k] = self.default_policy.get(k)
982         for k, v in policy.iteritems():
983             if k == 'quota':
984                 q = int(v) # May raise ValueError.
985                 if q < 0:
986                     raise ValueError
987             elif k == 'versioning':
988                 if v not in ['auto', 'none']:
989                     raise ValueError
990             else:
991                 raise ValueError
992     
993     def _put_policy(self, node, policy, replace):
994         if replace:
995             for k, v in self.default_policy.iteritems():
996                 if k not in policy:
997                     policy[k] = v
998         self.node.policy_set(node, policy)
999     
1000     def _get_policy(self, node):
1001         policy = self.default_policy.copy()
1002         policy.update(self.node.policy_get(node))
1003         return policy
1004     
1005     def _apply_versioning(self, account, container, version_id):
1006         """Delete the provided version if such is the policy.
1007            Return size of object removed.
1008         """
1009         
1010         if version_id is None:
1011             return 0
1012         path, node = self._lookup_container(account, container)
1013         versioning = self._get_policy(node)['versioning']
1014         if versioning != 'auto':
1015             hash, size = self.node.version_remove(version_id)
1016             self.store.map_delete(hash)
1017             return size
1018         return 0
1019     
1020     # Access control functions.
1021     
1022     def _check_groups(self, groups):
1023         # raise ValueError('Bad characters in groups')
1024         pass
1025     
1026     def _check_permissions(self, path, permissions):
1027         # raise ValueError('Bad characters in permissions')
1028         pass
1029     
1030     def _get_formatted_paths(self, paths):
1031         formatted = []
1032         for p in paths:
1033             node = self.node.node_lookup(p)
1034             if node is not None:
1035                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1036             if props is not None:
1037                 if props[self.TYPE] in ('application/directory', 'application/folder'):
1038                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1039                 formatted.append((p, self.MATCH_EXACT))
1040         return formatted
1041     
1042     def _get_permissions_path(self, account, container, name):
1043         path = '/'.join((account, container, name))
1044         permission_paths = self.permissions.access_inherit(path)
1045         permission_paths.sort()
1046         permission_paths.reverse()
1047         for p in permission_paths:
1048             if p == path:
1049                 return p
1050             else:
1051                 if p.count('/') < 2:
1052                     continue
1053                 node = self.node.node_lookup(p)
1054                 if node is not None:
1055                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1056                 if props is not None:
1057                     if props[self.TYPE] in ('application/directory', 'application/folder'):
1058                         return p
1059         return None
1060     
1061     def _can_read(self, user, account, container, name):
1062         if user == account:
1063             return True
1064         path = '/'.join((account, container, name))
1065         if self.permissions.public_get(path) is not None:
1066             return True
1067         path = self._get_permissions_path(account, container, name)
1068         if not path:
1069             raise NotAllowedError
1070         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1071             raise NotAllowedError
1072     
1073     def _can_write(self, user, account, container, name):
1074         if user == account:
1075             return True
1076         path = '/'.join((account, container, name))
1077         path = self._get_permissions_path(account, container, name)
1078         if not path:
1079             raise NotAllowedError
1080         if not self.permissions.access_check(path, self.WRITE, user):
1081             raise NotAllowedError
1082     
1083     def _allowed_accounts(self, user):
1084         allow = set()
1085         for path in self.permissions.access_list_paths(user):
1086             allow.add(path.split('/', 1)[0])
1087         return sorted(allow)
1088     
1089     def _allowed_containers(self, user, account):
1090         allow = set()
1091         for path in self.permissions.access_list_paths(user, account):
1092             allow.add(path.split('/', 2)[1])
1093         return sorted(allow)