Remove lib package.
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79
80 QUEUE_MESSAGE_KEY = '#'
81 QUEUE_CLIENT_ID = 2 # Pithos.
82
83 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
84
85 inf = float('inf')
86
87 ULTIMATE_ANSWER = 42
88
89
90 logger = logging.getLogger(__name__)
91
92
93 def backend_method(func=None, autocommit=1):
94     if func is None:
95         def fn(func):
96             return backend_method(func, autocommit)
97         return fn
98
99     if not autocommit:
100         return func
101     def fn(self, *args, **kw):
102         self.wrapper.execute()
103         try:
104             ret = func(self, *args, **kw)
105             self.wrapper.commit()
106             return ret
107         except:
108             self.wrapper.rollback()
109             raise
110     return fn
111
112
113 class ModularBackend(BaseBackend):
114     """A modular backend.
115     
116     Uses modules for SQL functions and storage.
117     """
118     
119     def __init__(self, db_module=None, db_connection=None,
120                  block_module=None, block_path=None,
121                  queue_module=None, queue_connection=None):
122         db_module = db_module or DEFAULT_DB_MODULE
123         db_connection = db_connection or DEFAULT_DB_CONNECTION
124         block_module = block_module or DEFAULT_BLOCK_MODULE
125         block_path = block_path or DEFAULT_BLOCK_PATH
126         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
127         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
128         
129         self.hash_algorithm = 'sha256'
130         self.block_size = 4 * 1024 * 1024 # 4MB
131         
132         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
133         
134         def load_module(m):
135             __import__(m)
136             return sys.modules[m]
137         
138         self.db_module = load_module(db_module)
139         self.wrapper = self.db_module.DBWrapper(db_connection)
140         params = {'wrapper': self.wrapper}
141         self.permissions = self.db_module.Permissions(**params)
142         for x in ['READ', 'WRITE']:
143             setattr(self, x, getattr(self.db_module, x))
144         self.node = self.db_module.Node(**params)
145         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
146             setattr(self, x, getattr(self.db_module, x))
147         
148         self.block_module = load_module(block_module)
149         params = {'path': block_path,
150                   'block_size': self.block_size,
151                   'hash_algorithm': self.hash_algorithm}
152         self.store = self.block_module.Store(**params)
153
154         if queue_module and queue_connection:
155             self.queue_module = load_module(queue_module)
156             params = {'exchange': queue_connection,
157                       'message_key': QUEUE_MESSAGE_KEY,
158                       'client_id': QUEUE_CLIENT_ID}
159             self.queue = self.queue_module.Queue(**params)
160         else:
161             class NoQueue:
162                 def send(self, *args):
163                     pass
164                 
165                 def close(self):
166                     pass
167             
168             self.queue = NoQueue()
169     
170     def close(self):
171         self.wrapper.close()
172         self.queue.close()
173     
174     @backend_method
175     def list_accounts(self, user, marker=None, limit=10000):
176         """Return a list of accounts the user can access."""
177         
178         logger.debug("list_accounts: %s %s %s", user, marker, limit)
179         allowed = self._allowed_accounts(user)
180         start, limit = self._list_limits(allowed, marker, limit)
181         return allowed[start:start + limit]
182     
183     @backend_method
184     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
185         """Return a dictionary with the account metadata for the domain."""
186         
187         logger.debug("get_account_meta: %s %s %s", account, domain, until)
188         path, node = self._lookup_account(account, user == account)
189         if user != account:
190             if until or node is None or account not in self._allowed_accounts(user):
191                 raise NotAllowedError
192         try:
193             props = self._get_properties(node, until)
194             mtime = props[self.MTIME]
195         except NameError:
196             props = None
197             mtime = until
198         count, bytes, tstamp = self._get_statistics(node, until)
199         tstamp = max(tstamp, mtime)
200         if until is None:
201             modified = tstamp
202         else:
203             modified = self._get_statistics(node)[2] # Overall last modification.
204             modified = max(modified, mtime)
205         
206         if user != account:
207             meta = {'name': account}
208         else:
209             meta = {}
210             if props is not None and include_user_defined:
211                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
212             if until is not None:
213                 meta.update({'until_timestamp': tstamp})
214             meta.update({'name': account, 'count': count, 'bytes': bytes})
215         meta.update({'modified': modified})
216         return meta
217     
218     @backend_method
219     def update_account_meta(self, user, account, domain, meta, replace=False):
220         """Update the metadata associated with the account for the domain."""
221         
222         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
223         if user != account:
224             raise NotAllowedError
225         path, node = self._lookup_account(account, True)
226         self._put_metadata(user, node, domain, meta, replace)
227     
228     @backend_method
229     def get_account_groups(self, user, account):
230         """Return a dictionary with the user groups defined for this account."""
231         
232         logger.debug("get_account_groups: %s", account)
233         if user != account:
234             if account not in self._allowed_accounts(user):
235                 raise NotAllowedError
236             return {}
237         self._lookup_account(account, True)
238         return self.permissions.group_dict(account)
239     
240     @backend_method
241     def update_account_groups(self, user, account, groups, replace=False):
242         """Update the groups associated with the account."""
243         
244         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
245         if user != account:
246             raise NotAllowedError
247         self._lookup_account(account, True)
248         self._check_groups(groups)
249         if replace:
250             self.permissions.group_destroy(account)
251         for k, v in groups.iteritems():
252             if not replace: # If not already deleted.
253                 self.permissions.group_delete(account, k)
254             if v:
255                 self.permissions.group_addmany(account, k, v)
256     
257     @backend_method
258     def get_account_policy(self, user, account):
259         """Return a dictionary with the account policy."""
260         
261         logger.debug("get_account_policy: %s", account)
262         if user != account:
263             if account not in self._allowed_accounts(user):
264                 raise NotAllowedError
265             return {}
266         path, node = self._lookup_account(account, True)
267         return self._get_policy(node)
268     
269     @backend_method
270     def update_account_policy(self, user, account, policy, replace=False):
271         """Update the policy associated with the account."""
272         
273         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
274         if user != account:
275             raise NotAllowedError
276         path, node = self._lookup_account(account, True)
277         self._check_policy(policy)
278         self._put_policy(node, policy, replace)
279     
280     @backend_method
281     def put_account(self, user, account, policy={}):
282         """Create a new account with the given name."""
283         
284         logger.debug("put_account: %s %s", account, policy)
285         if user != account:
286             raise NotAllowedError
287         node = self.node.node_lookup(account)
288         if node is not None:
289             raise NameError('Account already exists')
290         if policy:
291             self._check_policy(policy)
292         node = self._put_path(user, self.ROOTNODE, account)
293         self._put_policy(node, policy, True)
294     
295     @backend_method
296     def delete_account(self, user, account):
297         """Delete the account with the given name."""
298         
299         logger.debug("delete_account: %s", account)
300         if user != account:
301             raise NotAllowedError
302         node = self.node.node_lookup(account)
303         if node is None:
304             return
305         if not self.node.node_remove(node):
306             raise IndexError('Account is not empty')
307         self.permissions.group_destroy(account)
308     
309     @backend_method
310     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
311         """Return a list of containers existing under an account."""
312         
313         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
314         if user != account:
315             if until or account not in self._allowed_accounts(user):
316                 raise NotAllowedError
317             allowed = self._allowed_containers(user, account)
318             start, limit = self._list_limits(allowed, marker, limit)
319             return allowed[start:start + limit]
320         if shared:
321             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
322             allowed = list(set(allowed))
323             start, limit = self._list_limits(allowed, marker, limit)
324             return allowed[start:start + limit]
325         node = self.node.node_lookup(account)
326         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
327     
328     @backend_method
329     def list_container_meta(self, user, account, container, domain, until=None):
330         """Return a list with all the container's object meta keys for the domain."""
331         
332         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
333         allowed = []
334         if user != account:
335             if until:
336                 raise NotAllowedError
337             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
338             if not allowed:
339                 raise NotAllowedError
340         path, node = self._lookup_container(account, container)
341         before = until if until is not None else inf
342         allowed = self._get_formatted_paths(allowed)
343         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
344     
345     @backend_method
346     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
347         """Return a dictionary with the container metadata for the domain."""
348         
349         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
350         if user != account:
351             if until or container not in self._allowed_containers(user, account):
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         props = self._get_properties(node, until)
355         mtime = props[self.MTIME]
356         count, bytes, tstamp = self._get_statistics(node, until)
357         tstamp = max(tstamp, mtime)
358         if until is None:
359             modified = tstamp
360         else:
361             modified = self._get_statistics(node)[2] # Overall last modification.
362             modified = max(modified, mtime)
363         
364         if user != account:
365             meta = {'name': container}
366         else:
367             meta = {}
368             if include_user_defined:
369                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
370             if until is not None:
371                 meta.update({'until_timestamp': tstamp})
372             meta.update({'name': container, 'count': count, 'bytes': bytes})
373         meta.update({'modified': modified})
374         return meta
375     
376     @backend_method
377     def update_container_meta(self, user, account, container, domain, meta, replace=False):
378         """Update the metadata associated with the container for the domain."""
379         
380         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
381         if user != account:
382             raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         self._put_metadata(user, node, domain, meta, replace)
385     
386     @backend_method
387     def get_container_policy(self, user, account, container):
388         """Return a dictionary with the container policy."""
389         
390         logger.debug("get_container_policy: %s %s", account, container)
391         if user != account:
392             if container not in self._allowed_containers(user, account):
393                 raise NotAllowedError
394             return {}
395         path, node = self._lookup_container(account, container)
396         return self._get_policy(node)
397     
398     @backend_method
399     def update_container_policy(self, user, account, container, policy, replace=False):
400         """Update the policy associated with the container."""
401         
402         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
403         if user != account:
404             raise NotAllowedError
405         path, node = self._lookup_container(account, container)
406         self._check_policy(policy)
407         self._put_policy(node, policy, replace)
408     
409     @backend_method
410     def put_container(self, user, account, container, policy={}):
411         """Create a new container with the given name."""
412         
413         logger.debug("put_container: %s %s %s", account, container, policy)
414         if user != account:
415             raise NotAllowedError
416         try:
417             path, node = self._lookup_container(account, container)
418         except NameError:
419             pass
420         else:
421             raise NameError('Container already exists')
422         if policy:
423             self._check_policy(policy)
424         path = '/'.join((account, container))
425         node = self._put_path(user, self._lookup_account(account, True)[1], path)
426         self._put_policy(node, policy, True)
427     
428     @backend_method
429     def delete_container(self, user, account, container, until=None):
430         """Delete/purge the container with the given name."""
431         
432         logger.debug("delete_container: %s %s %s", account, container, until)
433         if user != account:
434             raise NotAllowedError
435         path, node = self._lookup_container(account, container)
436         
437         if until is not None:
438             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
439             for h in hashes:
440                 self.store.map_delete(h)
441             self.node.node_purge_children(node, until, CLUSTER_DELETED)
442             self._report_size_change(user, account, -size, {'action': 'container purge'})
443             return
444         
445         if self._get_statistics(node)[0] > 0:
446             raise IndexError('Container is not empty')
447         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
448         for h in hashes:
449             self.store.map_delete(h)
450         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
451         self.node.node_remove(node)
452         self._report_size_change(user, account, -size, {'action': 'container delete'})
453     
454     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
455         if user != account and until:
456             raise NotAllowedError
457         allowed = self._list_object_permissions(user, account, container, prefix, shared)
458         if shared and not allowed:
459             return []
460         path, node = self._lookup_container(account, container)
461         allowed = self._get_formatted_paths(allowed)
462         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
463     
464     def _list_object_permissions(self, user, account, container, prefix, shared):
465         allowed = []
466         path = '/'.join((account, container, prefix)).rstrip('/')
467         if user != account:
468             allowed = self.permissions.access_list_paths(user, path)
469             if not allowed:
470                 raise NotAllowedError
471         else:
472             if shared:
473                 allowed = self.permissions.access_list_shared(path)
474                 if not allowed:
475                     return []
476         return allowed
477     
478     @backend_method
479     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
480         """Return a list of object (name, version_id) tuples existing under a container."""
481         
482         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
483         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
484     
485     @backend_method
486     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487         """Return a list of object metadata dicts existing under a container."""
488         
489         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
491         objects = []
492         for p in props:
493             if len(p) == 2:
494                 objects.append({'subdir': p[0]})
495             else:
496                 objects.append({'name': p[0],
497                                 'bytes': p[self.SIZE + 1],
498                                 'type': p[self.TYPE + 1],
499                                 'hash': p[self.HASH + 1],
500                                 'version': p[self.SERIAL + 1],
501                                 'version_timestamp': p[self.MTIME + 1],
502                                 'modified': p[self.MTIME + 1] if until is None else None,
503                                 'modified_by': p[self.MUSER + 1],
504                                 'uuid': p[self.UUID + 1],
505                                 'checksum': p[self.CHECKSUM + 1]})
506         return objects
507     
508     @backend_method
509     def list_object_permissions(self, user, account, container, prefix=''):
510         """Return a list of paths that enforce permissions under a container."""
511         
512         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
513         return self._list_object_permissions(user, account, container, prefix, True)
514     
515     @backend_method
516     def list_object_public(self, user, account, container, prefix=''):
517         """Return a dict mapping paths to public ids for objects that are public under a container."""
518         
519         logger.debug("list_object_public: %s %s %s", account, container, prefix)
520         public = {}
521         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
522             public[path] = p + ULTIMATE_ANSWER
523         return public
524     
525     @backend_method
526     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
527         """Return a dictionary with the object metadata for the domain."""
528         
529         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
530         self._can_read(user, account, container, name)
531         path, node = self._lookup_object(account, container, name)
532         props = self._get_version(node, version)
533         if version is None:
534             modified = props[self.MTIME]
535         else:
536             try:
537                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
538             except NameError: # Object may be deleted.
539                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
540                 if del_props is None:
541                     raise NameError('Object does not exist')
542                 modified = del_props[self.MTIME]
543         
544         meta = {}
545         if include_user_defined:
546             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
547         meta.update({'name': name,
548                      'bytes': props[self.SIZE],
549                      'type': props[self.TYPE],
550                      'hash': props[self.HASH],
551                      'version': props[self.SERIAL],
552                      'version_timestamp': props[self.MTIME],
553                      'modified': modified,
554                      'modified_by': props[self.MUSER],
555                      'uuid': props[self.UUID],
556                      'checksum': props[self.CHECKSUM]})
557         return meta
558     
559     @backend_method
560     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
561         """Update the metadata associated with the object for the domain and return the new version."""
562         
563         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
564         self._can_write(user, account, container, name)
565         path, node = self._lookup_object(account, container, name)
566         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
567         self._apply_versioning(account, container, src_version_id)
568         return dest_version_id
569     
570     @backend_method
571     def get_object_permissions(self, user, account, container, name):
572         """Return the action allowed on the object, the path
573         from which the object gets its permissions from,
574         along with a dictionary containing the permissions."""
575         
576         logger.debug("get_object_permissions: %s %s %s", account, container, name)
577         allowed = 'write'
578         permissions_path = self._get_permissions_path(account, container, name)
579         if user != account:
580             if self.permissions.access_check(permissions_path, self.WRITE, user):
581                 allowed = 'write'
582             elif self.permissions.access_check(permissions_path, self.READ, user):
583                 allowed = 'read'
584             else:
585                 raise NotAllowedError
586         self._lookup_object(account, container, name)
587         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
588     
589     @backend_method
590     def update_object_permissions(self, user, account, container, name, permissions):
591         """Update the permissions associated with the object."""
592         
593         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
594         if user != account:
595             raise NotAllowedError
596         path = self._lookup_object(account, container, name)[0]
597         self._check_permissions(path, permissions)
598         self.permissions.access_set(path, permissions)
599     
600     @backend_method
601     def get_object_public(self, user, account, container, name):
602         """Return the public id of the object if applicable."""
603         
604         logger.debug("get_object_public: %s %s %s", account, container, name)
605         self._can_read(user, account, container, name)
606         path = self._lookup_object(account, container, name)[0]
607         p = self.permissions.public_get(path)
608         if p is not None:
609             p += ULTIMATE_ANSWER
610         return p
611     
612     @backend_method
613     def update_object_public(self, user, account, container, name, public):
614         """Update the public status of the object."""
615         
616         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
617         self._can_write(user, account, container, name)
618         path = self._lookup_object(account, container, name)[0]
619         if not public:
620             self.permissions.public_unset(path)
621         else:
622             self.permissions.public_set(path)
623     
624     @backend_method
625     def get_object_hashmap(self, user, account, container, name, version=None):
626         """Return the object's size and a list with partial hashes."""
627         
628         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
629         self._can_read(user, account, container, name)
630         path, node = self._lookup_object(account, container, name)
631         props = self._get_version(node, version)
632         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
633         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
634     
635     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
636         if permissions is not None and user != account:
637             raise NotAllowedError
638         self._can_write(user, account, container, name)
639         if permissions is not None:
640             path = '/'.join((account, container, name))
641             self._check_permissions(path, permissions)
642         
643         account_path, account_node = self._lookup_account(account, True)
644         container_path, container_node = self._lookup_container(account, container)
645         path, node = self._put_object_node(container_path, container_node, name)
646         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
647         
648         # Check quota.
649         del_size = self._apply_versioning(account, container, pre_version_id)
650         size_delta = size - del_size
651         if size_delta > 0:
652             account_quota = long(self._get_policy(account_node)['quota'])
653             container_quota = long(self._get_policy(container_node)['quota'])
654             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
655                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
656                 # This must be executed in a transaction, so the version is never created if it fails.
657                 raise QuotaError
658         self._report_size_change(user, account, size_delta, {'action': 'object update'})
659         
660         if permissions is not None:
661             self.permissions.access_set(path, permissions)
662         return pre_version_id, dest_version_id
663     
664     @backend_method
665     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
666         """Create/update an object with the specified size and partial hashes."""
667         
668         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
669         if size == 0: # No such thing as an empty hashmap.
670             hashmap = [self.put_block('')]
671         map = HashMap(self.block_size, self.hash_algorithm)
672         map.extend([binascii.unhexlify(x) for x in hashmap])
673         missing = self.store.block_search(map)
674         if missing:
675             ie = IndexError()
676             ie.data = [binascii.hexlify(x) for x in missing]
677             raise ie
678         
679         hash = map.hash()
680         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
681         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
682         self.store.map_put(hash, map)
683         return dest_version_id
684     
685     @backend_method
686     def update_object_checksum(self, user, account, container, name, version, checksum):
687         """Update an object's checksum."""
688         
689         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
690         # Update objects with greater version and same hashmap and size (fix metadata updates).
691         self._can_write(user, account, container, name)
692         path, node = self._lookup_object(account, container, name)
693         props = self._get_version(node, version)
694         versions = self.node.node_get_versions(node)
695         for x in versions:
696             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
697                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
698     
699     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
700         self._can_read(user, src_account, src_container, src_name)
701         path, node = self._lookup_object(src_account, src_container, src_name)
702         # TODO: Will do another fetch of the properties in duplicate version...
703         props = self._get_version(node, src_version) # Check to see if source exists.
704         src_version_id = props[self.SERIAL]
705         hash = props[self.HASH]
706         size = props[self.SIZE]
707         
708         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
709         pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
710         self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
711         return dest_version_id
712     
713     @backend_method
714     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
715         """Copy an object's data and metadata."""
716         
717         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
718         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
719         return dest_version_id
720     
721     @backend_method
722     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
723         """Move an object's data and metadata."""
724         
725         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
726         if user != src_account:
727             raise NotAllowedError
728         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
729         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
730             self._delete_object(user, src_account, src_container, src_name)
731         return dest_version_id
732     
733     def _delete_object(self, user, account, container, name, until=None):
734         if user != account:
735             raise NotAllowedError
736         
737         if until is not None:
738             path = '/'.join((account, container, name))
739             node = self.node.node_lookup(path)
740             if node is None:
741                 return
742             hashes = []
743             size = 0
744             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
745             hashes += h
746             size += s
747             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
748             hashes += h
749             size += s
750             for h in hashes:
751                 self.store.map_delete(h)
752             self.node.node_purge(node, until, CLUSTER_DELETED)
753             try:
754                 props = self._get_version(node)
755             except NameError:
756                 self.permissions.access_clear(path)
757             self._report_size_change(user, account, -size, {'action': 'object purge'})
758             return
759         
760         path, node = self._lookup_object(account, container, name)
761         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
762         del_size = self._apply_versioning(account, container, src_version_id)
763         if del_size:
764             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
765         self.permissions.access_clear(path)
766     
767     @backend_method
768     def delete_object(self, user, account, container, name, until=None):
769         """Delete/purge an object."""
770         
771         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
772         self._delete_object(user, account, container, name, until)
773     
774     @backend_method
775     def list_versions(self, user, account, container, name):
776         """Return a list of all (version, version_timestamp) tuples for an object."""
777         
778         logger.debug("list_versions: %s %s %s", account, container, name)
779         self._can_read(user, account, container, name)
780         path, node = self._lookup_object(account, container, name)
781         versions = self.node.node_get_versions(node)
782         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
783     
784     @backend_method
785     def get_uuid(self, user, uuid):
786         """Return the (account, container, name) for the UUID given."""
787         
788         logger.debug("get_uuid: %s", uuid)
789         info = self.node.latest_uuid(uuid)
790         if info is None:
791             raise NameError
792         path, serial = info
793         account, container, name = path.split('/', 2)
794         self._can_read(user, account, container, name)
795         return (account, container, name)
796     
797     @backend_method
798     def get_public(self, user, public):
799         """Return the (account, container, name) for the public id given."""
800         
801         logger.debug("get_public: %s", public)
802         if public is None or public < ULTIMATE_ANSWER:
803             raise NameError
804         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
805         if path is None:
806             raise NameError
807         account, container, name = path.split('/', 2)
808         self._can_read(user, account, container, name)
809         return (account, container, name)
810     
811     @backend_method(autocommit=0)
812     def get_block(self, hash):
813         """Return a block's data."""
814         
815         logger.debug("get_block: %s", hash)
816         block = self.store.block_get(binascii.unhexlify(hash))
817         if not block:
818             raise NameError('Block does not exist')
819         return block
820     
821     @backend_method(autocommit=0)
822     def put_block(self, data):
823         """Store a block and return the hash."""
824         
825         logger.debug("put_block: %s", len(data))
826         return binascii.hexlify(self.store.block_put(data))
827     
828     @backend_method(autocommit=0)
829     def update_block(self, hash, data, offset=0):
830         """Update a known block and return the hash."""
831         
832         logger.debug("update_block: %s %s %s", hash, len(data), offset)
833         if offset == 0 and len(data) == self.block_size:
834             return self.put_block(data)
835         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
836         return binascii.hexlify(h)
837     
838     # Path functions.
839     
840     def _generate_uuid(self):
841         return str(uuidlib.uuid4())
842     
843     def _put_object_node(self, path, parent, name):
844         path = '/'.join((path, name))
845         node = self.node.node_lookup(path)
846         if node is None:
847             node = self.node.node_create(parent, path)
848         return path, node
849     
850     def _put_path(self, user, parent, path):
851         node = self.node.node_create(parent, path)
852         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
853         return node
854     
855     def _lookup_account(self, account, create=True):
856         node = self.node.node_lookup(account)
857         if node is None and create:
858             node = self._put_path(account, self.ROOTNODE, account) # User is account.
859         return account, node
860     
861     def _lookup_container(self, account, container):
862         path = '/'.join((account, container))
863         node = self.node.node_lookup(path)
864         if node is None:
865             raise NameError('Container does not exist')
866         return path, node
867     
868     def _lookup_object(self, account, container, name):
869         path = '/'.join((account, container, name))
870         node = self.node.node_lookup(path)
871         if node is None:
872             raise NameError('Object does not exist')
873         return path, node
874     
875     def _get_properties(self, node, until=None):
876         """Return properties until the timestamp given."""
877         
878         before = until if until is not None else inf
879         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
880         if props is None and until is not None:
881             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
882         if props is None:
883             raise NameError('Path does not exist')
884         return props
885     
886     def _get_statistics(self, node, until=None):
887         """Return count, sum of size and latest timestamp of everything under node."""
888         
889         if until is None:
890             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
891         else:
892             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
893         if stats is None:
894             stats = (0, 0, 0)
895         return stats
896     
897     def _get_version(self, node, version=None):
898         if version is None:
899             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
900             if props is None:
901                 raise NameError('Object does not exist')
902         else:
903             try:
904                 version = int(version)
905             except ValueError:
906                 raise IndexError('Version does not exist')
907             props = self.node.version_get_properties(version)
908             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
909                 raise IndexError('Version does not exist')
910         return props
911     
912     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
913         """Create a new version of the node."""
914         
915         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
916         if props is not None:
917             src_version_id = props[self.SERIAL]
918             src_hash = props[self.HASH]
919             src_size = props[self.SIZE]
920             src_type = props[self.TYPE]
921             src_checksum = props[self.CHECKSUM]
922         else:
923             src_version_id = None
924             src_hash = None
925             src_size = 0
926             src_type = ''
927             src_checksum = ''
928         if size is None: # Set metadata.
929             hash = src_hash # This way hash can be set to None (account or container).
930             size = src_size
931         if type is None:
932             type = src_type
933         if checksum is None:
934             checksum = src_checksum
935         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
936         
937         if src_node is None:
938             pre_version_id = src_version_id
939         else:
940             pre_version_id = None
941             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
942             if props is not None:
943                 pre_version_id = props[self.SERIAL]
944         if pre_version_id is not None:
945             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
946         
947         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
948         return pre_version_id, dest_version_id
949     
950     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
951         if src_version_id is not None:
952             self.node.attribute_copy(src_version_id, dest_version_id)
953         if not replace:
954             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
955             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
956         else:
957             self.node.attribute_del(dest_version_id, domain)
958             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
959     
960     def _put_metadata(self, user, node, domain, meta, replace=False):
961         """Create a new version and store metadata."""
962         
963         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
964         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
965         return src_version_id, dest_version_id
966     
967     def _list_limits(self, listing, marker, limit):
968         start = 0
969         if marker:
970             try:
971                 start = listing.index(marker) + 1
972             except ValueError:
973                 pass
974         if not limit or limit > 10000:
975             limit = 10000
976         return start, limit
977     
978     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
979         cont_prefix = path + '/'
980         prefix = cont_prefix + prefix
981         start = cont_prefix + marker if marker else None
982         before = until if until is not None else inf
983         filterq = keys if domain else []
984         sizeq = size_range
985         
986         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
987         objects.extend([(p, None) for p in prefixes] if virtual else [])
988         objects.sort(key=lambda x: x[0])
989         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
990         
991         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
992         return objects[start:start + limit]
993     
994     # Reporting functions.
995     
996     def _report_size_change(self, user, account, size, details={}):
997         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
998         account_node = self._lookup_account(account, True)[1]
999         total = self._get_statistics(account_node)[1]
1000         details.update({'user': user, 'total': total})
1001         self.queue.send(account, 'diskspace', size, details)
1002     
1003     # Policy functions.
1004     
1005     def _check_policy(self, policy):
1006         for k in policy.keys():
1007             if policy[k] == '':
1008                 policy[k] = self.default_policy.get(k)
1009         for k, v in policy.iteritems():
1010             if k == 'quota':
1011                 q = int(v) # May raise ValueError.
1012                 if q < 0:
1013                     raise ValueError
1014             elif k == 'versioning':
1015                 if v not in ['auto', 'none']:
1016                     raise ValueError
1017             else:
1018                 raise ValueError
1019     
1020     def _put_policy(self, node, policy, replace):
1021         if replace:
1022             for k, v in self.default_policy.iteritems():
1023                 if k not in policy:
1024                     policy[k] = v
1025         self.node.policy_set(node, policy)
1026     
1027     def _get_policy(self, node):
1028         policy = self.default_policy.copy()
1029         policy.update(self.node.policy_get(node))
1030         return policy
1031     
1032     def _apply_versioning(self, account, container, version_id):
1033         """Delete the provided version if such is the policy.
1034            Return size of object removed.
1035         """
1036         
1037         if version_id is None:
1038             return 0
1039         path, node = self._lookup_container(account, container)
1040         versioning = self._get_policy(node)['versioning']
1041         if versioning != 'auto':
1042             hash, size = self.node.version_remove(version_id)
1043             self.store.map_delete(hash)
1044             return size
1045         return 0
1046     
1047     # Access control functions.
1048     
1049     def _check_groups(self, groups):
1050         # raise ValueError('Bad characters in groups')
1051         pass
1052     
1053     def _check_permissions(self, path, permissions):
1054         # raise ValueError('Bad characters in permissions')
1055         pass
1056     
1057     def _get_formatted_paths(self, paths):
1058         formatted = []
1059         for p in paths:
1060             node = self.node.node_lookup(p)
1061             if node is not None:
1062                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1063             if props is not None:
1064                 if props[self.TYPE] in ('application/directory', 'application/folder'):
1065                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1066                 formatted.append((p, self.MATCH_EXACT))
1067         return formatted
1068     
1069     def _get_permissions_path(self, account, container, name):
1070         path = '/'.join((account, container, name))
1071         permission_paths = self.permissions.access_inherit(path)
1072         permission_paths.sort()
1073         permission_paths.reverse()
1074         for p in permission_paths:
1075             if p == path:
1076                 return p
1077             else:
1078                 if p.count('/') < 2:
1079                     continue
1080                 node = self.node.node_lookup(p)
1081                 if node is not None:
1082                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1083                 if props is not None:
1084                     if props[self.TYPE] in ('application/directory', 'application/folder'):
1085                         return p
1086         return None
1087     
1088     def _can_read(self, user, account, container, name):
1089         if user == account:
1090             return True
1091         path = '/'.join((account, container, name))
1092         if self.permissions.public_get(path) is not None:
1093             return True
1094         path = self._get_permissions_path(account, container, name)
1095         if not path:
1096             raise NotAllowedError
1097         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1098             raise NotAllowedError
1099     
1100     def _can_write(self, user, account, container, name):
1101         if user == account:
1102             return True
1103         path = '/'.join((account, container, name))
1104         path = self._get_permissions_path(account, container, name)
1105         if not path:
1106             raise NotAllowedError
1107         if not self.permissions.access_check(path, self.WRITE, user):
1108             raise NotAllowedError
1109     
1110     def _allowed_accounts(self, user):
1111         allow = set()
1112         for path in self.permissions.access_list_paths(user):
1113             allow.add(path.split('/', 1)[0])
1114         return sorted(allow)
1115     
1116     def _allowed_containers(self, user, account):
1117         allow = set()
1118         for path in self.permissions.access_list_paths(user, account):
1119             allow.add(path.split('/', 2)[1])
1120         return sorted(allow)