Fixes for non-automatic container versioning policy.
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79
80 QUEUE_MESSAGE_KEY = '#'
81 QUEUE_CLIENT_ID = 2 # Pithos.
82
83 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
84
85 inf = float('inf')
86
87 ULTIMATE_ANSWER = 42
88
89
90 logger = logging.getLogger(__name__)
91
92
93 def backend_method(func=None, autocommit=1):
94     if func is None:
95         def fn(func):
96             return backend_method(func, autocommit)
97         return fn
98
99     if not autocommit:
100         return func
101     def fn(self, *args, **kw):
102         self.wrapper.execute()
103         try:
104             ret = func(self, *args, **kw)
105             self.wrapper.commit()
106             return ret
107         except:
108             self.wrapper.rollback()
109             raise
110     return fn
111
112
113 class ModularBackend(BaseBackend):
114     """A modular backend.
115     
116     Uses modules for SQL functions and storage.
117     """
118     
119     def __init__(self, db_module=None, db_connection=None,
120                  block_module=None, block_path=None,
121                  queue_module=None, queue_connection=None):
122         db_module = db_module or DEFAULT_DB_MODULE
123         db_connection = db_connection or DEFAULT_DB_CONNECTION
124         block_module = block_module or DEFAULT_BLOCK_MODULE
125         block_path = block_path or DEFAULT_BLOCK_PATH
126         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
127         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
128         
129         self.hash_algorithm = 'sha256'
130         self.block_size = 4 * 1024 * 1024 # 4MB
131         
132         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
133         
134         def load_module(m):
135             __import__(m)
136             return sys.modules[m]
137         
138         self.db_module = load_module(db_module)
139         self.wrapper = self.db_module.DBWrapper(db_connection)
140         params = {'wrapper': self.wrapper}
141         self.permissions = self.db_module.Permissions(**params)
142         for x in ['READ', 'WRITE']:
143             setattr(self, x, getattr(self.db_module, x))
144         self.node = self.db_module.Node(**params)
145         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
146             setattr(self, x, getattr(self.db_module, x))
147         
148         self.block_module = load_module(block_module)
149         params = {'path': block_path,
150                   'block_size': self.block_size,
151                   'hash_algorithm': self.hash_algorithm}
152         self.store = self.block_module.Store(**params)
153
154         if queue_module and queue_connection:
155             self.queue_module = load_module(queue_module)
156             params = {'exchange': queue_connection,
157                       'message_key': QUEUE_MESSAGE_KEY,
158                       'client_id': QUEUE_CLIENT_ID}
159             self.queue = self.queue_module.Queue(**params)
160         else:
161             class NoQueue:
162                 def send(self, *args):
163                     pass
164                 
165                 def close(self):
166                     pass
167             
168             self.queue = NoQueue()
169     
170     def close(self):
171         self.wrapper.close()
172         self.queue.close()
173     
174     @backend_method
175     def list_accounts(self, user, marker=None, limit=10000):
176         """Return a list of accounts the user can access."""
177         
178         logger.debug("list_accounts: %s %s %s", user, marker, limit)
179         allowed = self._allowed_accounts(user)
180         start, limit = self._list_limits(allowed, marker, limit)
181         return allowed[start:start + limit]
182     
183     @backend_method
184     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
185         """Return a dictionary with the account metadata for the domain."""
186         
187         logger.debug("get_account_meta: %s %s %s", account, domain, until)
188         path, node = self._lookup_account(account, user == account)
189         if user != account:
190             if until or node is None or account not in self._allowed_accounts(user):
191                 raise NotAllowedError
192         try:
193             props = self._get_properties(node, until)
194             mtime = props[self.MTIME]
195         except NameError:
196             props = None
197             mtime = until
198         count, bytes, tstamp = self._get_statistics(node, until)
199         tstamp = max(tstamp, mtime)
200         if until is None:
201             modified = tstamp
202         else:
203             modified = self._get_statistics(node)[2] # Overall last modification.
204             modified = max(modified, mtime)
205         
206         if user != account:
207             meta = {'name': account}
208         else:
209             meta = {}
210             if props is not None and include_user_defined:
211                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
212             if until is not None:
213                 meta.update({'until_timestamp': tstamp})
214             meta.update({'name': account, 'count': count, 'bytes': bytes})
215         meta.update({'modified': modified})
216         return meta
217     
218     @backend_method
219     def update_account_meta(self, user, account, domain, meta, replace=False):
220         """Update the metadata associated with the account for the domain."""
221         
222         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
223         if user != account:
224             raise NotAllowedError
225         path, node = self._lookup_account(account, True)
226         self._put_metadata(user, node, domain, meta, replace)
227     
228     @backend_method
229     def get_account_groups(self, user, account):
230         """Return a dictionary with the user groups defined for this account."""
231         
232         logger.debug("get_account_groups: %s", account)
233         if user != account:
234             if account not in self._allowed_accounts(user):
235                 raise NotAllowedError
236             return {}
237         self._lookup_account(account, True)
238         return self.permissions.group_dict(account)
239     
240     @backend_method
241     def update_account_groups(self, user, account, groups, replace=False):
242         """Update the groups associated with the account."""
243         
244         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
245         if user != account:
246             raise NotAllowedError
247         self._lookup_account(account, True)
248         self._check_groups(groups)
249         if replace:
250             self.permissions.group_destroy(account)
251         for k, v in groups.iteritems():
252             if not replace: # If not already deleted.
253                 self.permissions.group_delete(account, k)
254             if v:
255                 self.permissions.group_addmany(account, k, v)
256     
257     @backend_method
258     def get_account_policy(self, user, account):
259         """Return a dictionary with the account policy."""
260         
261         logger.debug("get_account_policy: %s", account)
262         if user != account:
263             if account not in self._allowed_accounts(user):
264                 raise NotAllowedError
265             return {}
266         path, node = self._lookup_account(account, True)
267         return self._get_policy(node)
268     
269     @backend_method
270     def update_account_policy(self, user, account, policy, replace=False):
271         """Update the policy associated with the account."""
272         
273         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
274         if user != account:
275             raise NotAllowedError
276         path, node = self._lookup_account(account, True)
277         self._check_policy(policy)
278         self._put_policy(node, policy, replace)
279     
280     @backend_method
281     def put_account(self, user, account, policy={}):
282         """Create a new account with the given name."""
283         
284         logger.debug("put_account: %s %s", account, policy)
285         if user != account:
286             raise NotAllowedError
287         node = self.node.node_lookup(account)
288         if node is not None:
289             raise NameError('Account already exists')
290         if policy:
291             self._check_policy(policy)
292         node = self._put_path(user, self.ROOTNODE, account)
293         self._put_policy(node, policy, True)
294     
295     @backend_method
296     def delete_account(self, user, account):
297         """Delete the account with the given name."""
298         
299         logger.debug("delete_account: %s", account)
300         if user != account:
301             raise NotAllowedError
302         node = self.node.node_lookup(account)
303         if node is None:
304             return
305         if not self.node.node_remove(node):
306             raise IndexError('Account is not empty')
307         self.permissions.group_destroy(account)
308     
309     @backend_method
310     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
311         """Return a list of containers existing under an account."""
312         
313         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
314         if user != account:
315             if until or account not in self._allowed_accounts(user):
316                 raise NotAllowedError
317             allowed = self._allowed_containers(user, account)
318             start, limit = self._list_limits(allowed, marker, limit)
319             return allowed[start:start + limit]
320         if shared:
321             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
322             allowed = list(set(allowed))
323             start, limit = self._list_limits(allowed, marker, limit)
324             return allowed[start:start + limit]
325         node = self.node.node_lookup(account)
326         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
327     
328     @backend_method
329     def list_container_meta(self, user, account, container, domain, until=None):
330         """Return a list with all the container's object meta keys for the domain."""
331         
332         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
333         allowed = []
334         if user != account:
335             if until:
336                 raise NotAllowedError
337             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
338             if not allowed:
339                 raise NotAllowedError
340         path, node = self._lookup_container(account, container)
341         before = until if until is not None else inf
342         allowed = self._get_formatted_paths(allowed)
343         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
344     
345     @backend_method
346     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
347         """Return a dictionary with the container metadata for the domain."""
348         
349         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
350         if user != account:
351             if until or container not in self._allowed_containers(user, account):
352                 raise NotAllowedError
353         path, node = self._lookup_container(account, container)
354         props = self._get_properties(node, until)
355         mtime = props[self.MTIME]
356         count, bytes, tstamp = self._get_statistics(node, until)
357         tstamp = max(tstamp, mtime)
358         if until is None:
359             modified = tstamp
360         else:
361             modified = self._get_statistics(node)[2] # Overall last modification.
362             modified = max(modified, mtime)
363         
364         if user != account:
365             meta = {'name': container}
366         else:
367             meta = {}
368             if include_user_defined:
369                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
370             if until is not None:
371                 meta.update({'until_timestamp': tstamp})
372             meta.update({'name': container, 'count': count, 'bytes': bytes})
373         meta.update({'modified': modified})
374         return meta
375     
376     @backend_method
377     def update_container_meta(self, user, account, container, domain, meta, replace=False):
378         """Update the metadata associated with the container for the domain."""
379         
380         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
381         if user != account:
382             raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
385         if src_version_id is not None:
386             versioning = self._get_policy(node)['versioning']
387             if versioning != 'auto':
388                 self.node.version_remove(src_version_id)
389     
390     @backend_method
391     def get_container_policy(self, user, account, container):
392         """Return a dictionary with the container policy."""
393         
394         logger.debug("get_container_policy: %s %s", account, container)
395         if user != account:
396             if container not in self._allowed_containers(user, account):
397                 raise NotAllowedError
398             return {}
399         path, node = self._lookup_container(account, container)
400         return self._get_policy(node)
401     
402     @backend_method
403     def update_container_policy(self, user, account, container, policy, replace=False):
404         """Update the policy associated with the container."""
405         
406         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
407         if user != account:
408             raise NotAllowedError
409         path, node = self._lookup_container(account, container)
410         self._check_policy(policy)
411         self._put_policy(node, policy, replace)
412     
413     @backend_method
414     def put_container(self, user, account, container, policy={}):
415         """Create a new container with the given name."""
416         
417         logger.debug("put_container: %s %s %s", account, container, policy)
418         if user != account:
419             raise NotAllowedError
420         try:
421             path, node = self._lookup_container(account, container)
422         except NameError:
423             pass
424         else:
425             raise NameError('Container already exists')
426         if policy:
427             self._check_policy(policy)
428         path = '/'.join((account, container))
429         node = self._put_path(user, self._lookup_account(account, True)[1], path)
430         self._put_policy(node, policy, True)
431     
432     @backend_method
433     def delete_container(self, user, account, container, until=None):
434         """Delete/purge the container with the given name."""
435         
436         logger.debug("delete_container: %s %s %s", account, container, until)
437         if user != account:
438             raise NotAllowedError
439         path, node = self._lookup_container(account, container)
440         
441         if until is not None:
442             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
443             for h in hashes:
444                 self.store.map_delete(h)
445             self.node.node_purge_children(node, until, CLUSTER_DELETED)
446             self._report_size_change(user, account, -size, {'action': 'container purge'})
447             return
448         
449         if self._get_statistics(node)[0] > 0:
450             raise IndexError('Container is not empty')
451         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
452         for h in hashes:
453             self.store.map_delete(h)
454         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
455         self.node.node_remove(node)
456         self._report_size_change(user, account, -size, {'action': 'container delete'})
457     
458     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
459         if user != account and until:
460             raise NotAllowedError
461         allowed = self._list_object_permissions(user, account, container, prefix, shared)
462         if shared and not allowed:
463             return []
464         path, node = self._lookup_container(account, container)
465         allowed = self._get_formatted_paths(allowed)
466         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
467     
468     def _list_object_permissions(self, user, account, container, prefix, shared):
469         allowed = []
470         path = '/'.join((account, container, prefix)).rstrip('/')
471         if user != account:
472             allowed = self.permissions.access_list_paths(user, path)
473             if not allowed:
474                 raise NotAllowedError
475         else:
476             if shared:
477                 allowed = self.permissions.access_list_shared(path)
478                 if not allowed:
479                     return []
480         return allowed
481     
482     @backend_method
483     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
484         """Return a list of object (name, version_id) tuples existing under a container."""
485         
486         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
487         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
488     
489     @backend_method
490     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
491         """Return a list of object metadata dicts existing under a container."""
492         
493         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
494         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
495         objects = []
496         for p in props:
497             if len(p) == 2:
498                 objects.append({'subdir': p[0]})
499             else:
500                 objects.append({'name': p[0],
501                                 'bytes': p[self.SIZE + 1],
502                                 'type': p[self.TYPE + 1],
503                                 'hash': p[self.HASH + 1],
504                                 'version': p[self.SERIAL + 1],
505                                 'version_timestamp': p[self.MTIME + 1],
506                                 'modified': p[self.MTIME + 1] if until is None else None,
507                                 'modified_by': p[self.MUSER + 1],
508                                 'uuid': p[self.UUID + 1],
509                                 'checksum': p[self.CHECKSUM + 1]})
510         return objects
511     
512     @backend_method
513     def list_object_permissions(self, user, account, container, prefix=''):
514         """Return a list of paths that enforce permissions under a container."""
515         
516         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
517         return self._list_object_permissions(user, account, container, prefix, True)
518     
519     @backend_method
520     def list_object_public(self, user, account, container, prefix=''):
521         """Return a dict mapping paths to public ids for objects that are public under a container."""
522         
523         logger.debug("list_object_public: %s %s %s", account, container, prefix)
524         public = {}
525         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
526             public[path] = p + ULTIMATE_ANSWER
527         return public
528     
529     @backend_method
530     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
531         """Return a dictionary with the object metadata for the domain."""
532         
533         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
534         self._can_read(user, account, container, name)
535         path, node = self._lookup_object(account, container, name)
536         props = self._get_version(node, version)
537         if version is None:
538             modified = props[self.MTIME]
539         else:
540             try:
541                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
542             except NameError: # Object may be deleted.
543                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
544                 if del_props is None:
545                     raise NameError('Object does not exist')
546                 modified = del_props[self.MTIME]
547         
548         meta = {}
549         if include_user_defined:
550             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
551         meta.update({'name': name,
552                      'bytes': props[self.SIZE],
553                      'type': props[self.TYPE],
554                      'hash': props[self.HASH],
555                      'version': props[self.SERIAL],
556                      'version_timestamp': props[self.MTIME],
557                      'modified': modified,
558                      'modified_by': props[self.MUSER],
559                      'uuid': props[self.UUID],
560                      'checksum': props[self.CHECKSUM]})
561         return meta
562     
563     @backend_method
564     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
565         """Update the metadata associated with the object for the domain and return the new version."""
566         
567         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
568         self._can_write(user, account, container, name)
569         path, node = self._lookup_object(account, container, name)
570         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
571         self._apply_versioning(account, container, src_version_id)
572         return dest_version_id
573     
574     @backend_method
575     def get_object_permissions(self, user, account, container, name):
576         """Return the action allowed on the object, the path
577         from which the object gets its permissions from,
578         along with a dictionary containing the permissions."""
579         
580         logger.debug("get_object_permissions: %s %s %s", account, container, name)
581         allowed = 'write'
582         permissions_path = self._get_permissions_path(account, container, name)
583         if user != account:
584             if self.permissions.access_check(permissions_path, self.WRITE, user):
585                 allowed = 'write'
586             elif self.permissions.access_check(permissions_path, self.READ, user):
587                 allowed = 'read'
588             else:
589                 raise NotAllowedError
590         self._lookup_object(account, container, name)
591         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
592     
593     @backend_method
594     def update_object_permissions(self, user, account, container, name, permissions):
595         """Update the permissions associated with the object."""
596         
597         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
598         if user != account:
599             raise NotAllowedError
600         path = self._lookup_object(account, container, name)[0]
601         self._check_permissions(path, permissions)
602         self.permissions.access_set(path, permissions)
603     
604     @backend_method
605     def get_object_public(self, user, account, container, name):
606         """Return the public id of the object if applicable."""
607         
608         logger.debug("get_object_public: %s %s %s", account, container, name)
609         self._can_read(user, account, container, name)
610         path = self._lookup_object(account, container, name)[0]
611         p = self.permissions.public_get(path)
612         if p is not None:
613             p += ULTIMATE_ANSWER
614         return p
615     
616     @backend_method
617     def update_object_public(self, user, account, container, name, public):
618         """Update the public status of the object."""
619         
620         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
621         self._can_write(user, account, container, name)
622         path = self._lookup_object(account, container, name)[0]
623         if not public:
624             self.permissions.public_unset(path)
625         else:
626             self.permissions.public_set(path)
627     
628     @backend_method
629     def get_object_hashmap(self, user, account, container, name, version=None):
630         """Return the object's size and a list with partial hashes."""
631         
632         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
633         self._can_read(user, account, container, name)
634         path, node = self._lookup_object(account, container, name)
635         props = self._get_version(node, version)
636         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
637         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
638     
639     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
640         if permissions is not None and user != account:
641             raise NotAllowedError
642         self._can_write(user, account, container, name)
643         if permissions is not None:
644             path = '/'.join((account, container, name))
645             self._check_permissions(path, permissions)
646         
647         account_path, account_node = self._lookup_account(account, True)
648         container_path, container_node = self._lookup_container(account, container)
649         path, node = self._put_object_node(container_path, container_node, name)
650         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
651         
652         # Handle meta.
653         if src_version_id is None:
654             src_version_id = pre_version_id
655         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
656         
657         # Check quota.
658         del_size = self._apply_versioning(account, container, pre_version_id)
659         size_delta = size - del_size
660         if size_delta > 0:
661             account_quota = long(self._get_policy(account_node)['quota'])
662             container_quota = long(self._get_policy(container_node)['quota'])
663             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
664                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
665                 # This must be executed in a transaction, so the version is never created if it fails.
666                 raise QuotaError
667         self._report_size_change(user, account, size_delta, {'action': 'object update'})
668         
669         if permissions is not None:
670             self.permissions.access_set(path, permissions)
671         return dest_version_id
672     
673     @backend_method
674     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
675         """Create/update an object with the specified size and partial hashes."""
676         
677         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
678         if size == 0: # No such thing as an empty hashmap.
679             hashmap = [self.put_block('')]
680         map = HashMap(self.block_size, self.hash_algorithm)
681         map.extend([binascii.unhexlify(x) for x in hashmap])
682         missing = self.store.block_search(map)
683         if missing:
684             ie = IndexError()
685             ie.data = [binascii.hexlify(x) for x in missing]
686             raise ie
687         
688         hash = map.hash()
689         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
690         self.store.map_put(hash, map)
691         return dest_version_id
692     
693     @backend_method
694     def update_object_checksum(self, user, account, container, name, version, checksum):
695         """Update an object's checksum."""
696         
697         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
698         # Update objects with greater version and same hashmap and size (fix metadata updates).
699         self._can_write(user, account, container, name)
700         path, node = self._lookup_object(account, container, name)
701         props = self._get_version(node, version)
702         versions = self.node.node_get_versions(node)
703         for x in versions:
704             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
705                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
706     
707     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
708         self._can_read(user, src_account, src_container, src_name)
709         path, node = self._lookup_object(src_account, src_container, src_name)
710         # TODO: Will do another fetch of the properties in duplicate version...
711         props = self._get_version(node, src_version) # Check to see if source exists.
712         src_version_id = props[self.SERIAL]
713         hash = props[self.HASH]
714         size = props[self.SIZE]
715         
716         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
717         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
718         return dest_version_id
719     
720     @backend_method
721     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
722         """Copy an object's data and metadata."""
723         
724         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
725         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
726         return dest_version_id
727     
728     @backend_method
729     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
730         """Move an object's data and metadata."""
731         
732         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
733         if user != src_account:
734             raise NotAllowedError
735         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
736         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
737             self._delete_object(user, src_account, src_container, src_name)
738         return dest_version_id
739     
740     def _delete_object(self, user, account, container, name, until=None):
741         if user != account:
742             raise NotAllowedError
743         
744         if until is not None:
745             path = '/'.join((account, container, name))
746             node = self.node.node_lookup(path)
747             if node is None:
748                 return
749             hashes = []
750             size = 0
751             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
752             hashes += h
753             size += s
754             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
755             hashes += h
756             size += s
757             for h in hashes:
758                 self.store.map_delete(h)
759             self.node.node_purge(node, until, CLUSTER_DELETED)
760             try:
761                 props = self._get_version(node)
762             except NameError:
763                 self.permissions.access_clear(path)
764             self._report_size_change(user, account, -size, {'action': 'object purge'})
765             return
766         
767         path, node = self._lookup_object(account, container, name)
768         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
769         del_size = self._apply_versioning(account, container, src_version_id)
770         if del_size:
771             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
772         self.permissions.access_clear(path)
773     
774     @backend_method
775     def delete_object(self, user, account, container, name, until=None):
776         """Delete/purge an object."""
777         
778         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
779         self._delete_object(user, account, container, name, until)
780     
781     @backend_method
782     def list_versions(self, user, account, container, name):
783         """Return a list of all (version, version_timestamp) tuples for an object."""
784         
785         logger.debug("list_versions: %s %s %s", account, container, name)
786         self._can_read(user, account, container, name)
787         path, node = self._lookup_object(account, container, name)
788         versions = self.node.node_get_versions(node)
789         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
790     
791     @backend_method
792     def get_uuid(self, user, uuid):
793         """Return the (account, container, name) for the UUID given."""
794         
795         logger.debug("get_uuid: %s", uuid)
796         info = self.node.latest_uuid(uuid)
797         if info is None:
798             raise NameError
799         path, serial = info
800         account, container, name = path.split('/', 2)
801         self._can_read(user, account, container, name)
802         return (account, container, name)
803     
804     @backend_method
805     def get_public(self, user, public):
806         """Return the (account, container, name) for the public id given."""
807         
808         logger.debug("get_public: %s", public)
809         if public is None or public < ULTIMATE_ANSWER:
810             raise NameError
811         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
812         if path is None:
813             raise NameError
814         account, container, name = path.split('/', 2)
815         self._can_read(user, account, container, name)
816         return (account, container, name)
817     
818     @backend_method(autocommit=0)
819     def get_block(self, hash):
820         """Return a block's data."""
821         
822         logger.debug("get_block: %s", hash)
823         block = self.store.block_get(binascii.unhexlify(hash))
824         if not block:
825             raise NameError('Block does not exist')
826         return block
827     
828     @backend_method(autocommit=0)
829     def put_block(self, data):
830         """Store a block and return the hash."""
831         
832         logger.debug("put_block: %s", len(data))
833         return binascii.hexlify(self.store.block_put(data))
834     
835     @backend_method(autocommit=0)
836     def update_block(self, hash, data, offset=0):
837         """Update a known block and return the hash."""
838         
839         logger.debug("update_block: %s %s %s", hash, len(data), offset)
840         if offset == 0 and len(data) == self.block_size:
841             return self.put_block(data)
842         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
843         return binascii.hexlify(h)
844     
845     # Path functions.
846     
847     def _generate_uuid(self):
848         return str(uuidlib.uuid4())
849     
850     def _put_object_node(self, path, parent, name):
851         path = '/'.join((path, name))
852         node = self.node.node_lookup(path)
853         if node is None:
854             node = self.node.node_create(parent, path)
855         return path, node
856     
857     def _put_path(self, user, parent, path):
858         node = self.node.node_create(parent, path)
859         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
860         return node
861     
862     def _lookup_account(self, account, create=True):
863         node = self.node.node_lookup(account)
864         if node is None and create:
865             node = self._put_path(account, self.ROOTNODE, account) # User is account.
866         return account, node
867     
868     def _lookup_container(self, account, container):
869         path = '/'.join((account, container))
870         node = self.node.node_lookup(path)
871         if node is None:
872             raise NameError('Container does not exist')
873         return path, node
874     
875     def _lookup_object(self, account, container, name):
876         path = '/'.join((account, container, name))
877         node = self.node.node_lookup(path)
878         if node is None:
879             raise NameError('Object does not exist')
880         return path, node
881     
882     def _get_properties(self, node, until=None):
883         """Return properties until the timestamp given."""
884         
885         before = until if until is not None else inf
886         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
887         if props is None and until is not None:
888             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
889         if props is None:
890             raise NameError('Path does not exist')
891         return props
892     
893     def _get_statistics(self, node, until=None):
894         """Return count, sum of size and latest timestamp of everything under node."""
895         
896         if until is None:
897             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
898         else:
899             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
900         if stats is None:
901             stats = (0, 0, 0)
902         return stats
903     
904     def _get_version(self, node, version=None):
905         if version is None:
906             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
907             if props is None:
908                 raise NameError('Object does not exist')
909         else:
910             try:
911                 version = int(version)
912             except ValueError:
913                 raise IndexError('Version does not exist')
914             props = self.node.version_get_properties(version)
915             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
916                 raise IndexError('Version does not exist')
917         return props
918     
919     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
920         """Create a new version of the node."""
921         
922         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
923         if props is not None:
924             src_version_id = props[self.SERIAL]
925             src_hash = props[self.HASH]
926             src_size = props[self.SIZE]
927             src_type = props[self.TYPE]
928             src_checksum = props[self.CHECKSUM]
929         else:
930             src_version_id = None
931             src_hash = None
932             src_size = 0
933             src_type = ''
934             src_checksum = ''
935         if size is None: # Set metadata.
936             hash = src_hash # This way hash can be set to None (account or container).
937             size = src_size
938         if type is None:
939             type = src_type
940         if checksum is None:
941             checksum = src_checksum
942         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
943         
944         if src_node is None:
945             pre_version_id = src_version_id
946         else:
947             pre_version_id = None
948             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
949             if props is not None:
950                 pre_version_id = props[self.SERIAL]
951         if pre_version_id is not None:
952             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
953         
954         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
955         return pre_version_id, dest_version_id
956     
957     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
958         if src_version_id is not None:
959             self.node.attribute_copy(src_version_id, dest_version_id)
960         if not replace:
961             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
962             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
963         else:
964             self.node.attribute_del(dest_version_id, domain)
965             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
966     
967     def _put_metadata(self, user, node, domain, meta, replace=False):
968         """Create a new version and store metadata."""
969         
970         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
971         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
972         return src_version_id, dest_version_id
973     
974     def _list_limits(self, listing, marker, limit):
975         start = 0
976         if marker:
977             try:
978                 start = listing.index(marker) + 1
979             except ValueError:
980                 pass
981         if not limit or limit > 10000:
982             limit = 10000
983         return start, limit
984     
985     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
986         cont_prefix = path + '/'
987         prefix = cont_prefix + prefix
988         start = cont_prefix + marker if marker else None
989         before = until if until is not None else inf
990         filterq = keys if domain else []
991         sizeq = size_range
992         
993         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
994         objects.extend([(p, None) for p in prefixes] if virtual else [])
995         objects.sort(key=lambda x: x[0])
996         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
997         
998         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
999         return objects[start:start + limit]
1000     
1001     # Reporting functions.
1002     
1003     def _report_size_change(self, user, account, size, details={}):
1004         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1005         account_node = self._lookup_account(account, True)[1]
1006         total = self._get_statistics(account_node)[1]
1007         details.update({'user': user, 'total': total})
1008         self.queue.send(account, 'diskspace', size, details)
1009     
1010     # Policy functions.
1011     
1012     def _check_policy(self, policy):
1013         for k in policy.keys():
1014             if policy[k] == '':
1015                 policy[k] = self.default_policy.get(k)
1016         for k, v in policy.iteritems():
1017             if k == 'quota':
1018                 q = int(v) # May raise ValueError.
1019                 if q < 0:
1020                     raise ValueError
1021             elif k == 'versioning':
1022                 if v not in ['auto', 'none']:
1023                     raise ValueError
1024             else:
1025                 raise ValueError
1026     
1027     def _put_policy(self, node, policy, replace):
1028         if replace:
1029             for k, v in self.default_policy.iteritems():
1030                 if k not in policy:
1031                     policy[k] = v
1032         self.node.policy_set(node, policy)
1033     
1034     def _get_policy(self, node):
1035         policy = self.default_policy.copy()
1036         policy.update(self.node.policy_get(node))
1037         return policy
1038     
1039     def _apply_versioning(self, account, container, version_id):
1040         """Delete the provided version if such is the policy.
1041            Return size of object removed.
1042         """
1043         
1044         if version_id is None:
1045             return 0
1046         path, node = self._lookup_container(account, container)
1047         versioning = self._get_policy(node)['versioning']
1048         if versioning != 'auto':
1049             hash, size = self.node.version_remove(version_id)
1050             self.store.map_delete(hash)
1051             return size
1052         return 0
1053     
1054     # Access control functions.
1055     
1056     def _check_groups(self, groups):
1057         # raise ValueError('Bad characters in groups')
1058         pass
1059     
1060     def _check_permissions(self, path, permissions):
1061         # raise ValueError('Bad characters in permissions')
1062         pass
1063     
1064     def _get_formatted_paths(self, paths):
1065         formatted = []
1066         for p in paths:
1067             node = self.node.node_lookup(p)
1068             if node is not None:
1069                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1070             if props is not None:
1071                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1072                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1073                 formatted.append((p, self.MATCH_EXACT))
1074         return formatted
1075     
1076     def _get_permissions_path(self, account, container, name):
1077         path = '/'.join((account, container, name))
1078         permission_paths = self.permissions.access_inherit(path)
1079         permission_paths.sort()
1080         permission_paths.reverse()
1081         for p in permission_paths:
1082             if p == path:
1083                 return p
1084             else:
1085                 if p.count('/') < 2:
1086                     continue
1087                 node = self.node.node_lookup(p)
1088                 if node is not None:
1089                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1090                 if props is not None:
1091                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1092                         return p
1093         return None
1094     
1095     def _can_read(self, user, account, container, name):
1096         if user == account:
1097             return True
1098         path = '/'.join((account, container, name))
1099         if self.permissions.public_get(path) is not None:
1100             return True
1101         path = self._get_permissions_path(account, container, name)
1102         if not path:
1103             raise NotAllowedError
1104         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1105             raise NotAllowedError
1106     
1107     def _can_write(self, user, account, container, name):
1108         if user == account:
1109             return True
1110         path = '/'.join((account, container, name))
1111         path = self._get_permissions_path(account, container, name)
1112         if not path:
1113             raise NotAllowedError
1114         if not self.permissions.access_check(path, self.WRITE, user):
1115             raise NotAllowedError
1116     
1117     def _allowed_accounts(self, user):
1118         allow = set()
1119         for path in self.permissions.access_list_paths(user):
1120             allow.add(path.split('/', 1)[0])
1121         return sorted(allow)
1122     
1123     def _allowed_containers(self, user, account):
1124         allow = set()
1125         for path in self.permissions.access_list_paths(user, account):
1126             allow.add(path.split('/', 2)[1])
1127         return sorted(allow)