fix list_containers
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s", account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s", account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s", account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s", account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s", account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s %s", account, marker, limit, shared, until, public)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared or public:
327             allowed = []
328             if shared:
329                 allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330             if public:
331                 allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332             allowed = list(set(allowed))
333             start, limit = self._list_limits(allowed, marker, limit)
334             return allowed[start:start + limit]
335         node = self.node.node_lookup(account)
336         return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
337     
338     @backend_method
339     def list_container_meta(self, user, account, container, domain, until=None):
340         """Return a list with all the container's object meta keys for the domain."""
341         
342         logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
343         allowed = []
344         if user != account:
345             if until:
346                 raise NotAllowedError
347             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
348             if not allowed:
349                 raise NotAllowedError
350         path, node = self._lookup_container(account, container)
351         before = until if until is not None else inf
352         allowed = self._get_formatted_paths(allowed)
353         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
354     
355     @backend_method
356     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
357         """Return a dictionary with the container metadata for the domain."""
358         
359         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
360         if user != account:
361             if until or container not in self._allowed_containers(user, account):
362                 raise NotAllowedError
363         path, node = self._lookup_container(account, container)
364         props = self._get_properties(node, until)
365         mtime = props[self.MTIME]
366         count, bytes, tstamp = self._get_statistics(node, until)
367         tstamp = max(tstamp, mtime)
368         if until is None:
369             modified = tstamp
370         else:
371             modified = self._get_statistics(node)[2] # Overall last modification.
372             modified = max(modified, mtime)
373         
374         if user != account:
375             meta = {'name': container}
376         else:
377             meta = {}
378             if include_user_defined:
379                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
380             if until is not None:
381                 meta.update({'until_timestamp': tstamp})
382             meta.update({'name': container, 'count': count, 'bytes': bytes})
383         meta.update({'modified': modified})
384         return meta
385     
386     @backend_method
387     def update_container_meta(self, user, account, container, domain, meta, replace=False):
388         """Update the metadata associated with the container for the domain."""
389         
390         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
391         if user != account:
392             raise NotAllowedError
393         path, node = self._lookup_container(account, container)
394         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
395         if src_version_id is not None:
396             versioning = self._get_policy(node)['versioning']
397             if versioning != 'auto':
398                 self.node.version_remove(src_version_id)
399     
400     @backend_method
401     def get_container_policy(self, user, account, container):
402         """Return a dictionary with the container policy."""
403         
404         logger.debug("get_container_policy: %s %s", account, container)
405         if user != account:
406             if container not in self._allowed_containers(user, account):
407                 raise NotAllowedError
408             return {}
409         path, node = self._lookup_container(account, container)
410         return self._get_policy(node)
411     
412     @backend_method
413     def update_container_policy(self, user, account, container, policy, replace=False):
414         """Update the policy associated with the container."""
415         
416         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
417         if user != account:
418             raise NotAllowedError
419         path, node = self._lookup_container(account, container)
420         self._check_policy(policy)
421         self._put_policy(node, policy, replace)
422     
423     @backend_method
424     def put_container(self, user, account, container, policy={}):
425         """Create a new container with the given name."""
426         
427         logger.debug("put_container: %s %s %s", account, container, policy)
428         if user != account:
429             raise NotAllowedError
430         try:
431             path, node = self._lookup_container(account, container)
432         except NameError:
433             pass
434         else:
435             raise NameError('Container already exists')
436         if policy:
437             self._check_policy(policy)
438         path = '/'.join((account, container))
439         node = self._put_path(user, self._lookup_account(account, True)[1], path)
440         self._put_policy(node, policy, True)
441     
442     @backend_method
443     def delete_container(self, user, account, container, until=None):
444         """Delete/purge the container with the given name."""
445         
446         logger.debug("delete_container: %s %s %s", account, container, until)
447         if user != account:
448             raise NotAllowedError
449         path, node = self._lookup_container(account, container)
450         
451         if until is not None:
452             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
453             for h in hashes:
454                 self.store.map_delete(h)
455             self.node.node_purge_children(node, until, CLUSTER_DELETED)
456             self._report_size_change(user, account, -size, {'action': 'container purge'})
457             return
458         
459         if self._get_statistics(node)[0] > 0:
460             raise IndexError('Container is not empty')
461         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
462         for h in hashes:
463             self.store.map_delete(h)
464         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
465         self.node.node_remove(node)
466         self._report_size_change(user, account, -size, {'action': 'container delete'})
467     
468     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
469         if user != account and until:
470             raise NotAllowedError
471         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
472         if (shared or public) and not allowed:
473             return []
474         path, node = self._lookup_container(account, container)
475         allowed = self._get_formatted_paths(allowed)
476         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
477     
478     def _list_object_permissions(self, user, account, container, prefix, shared, public):
479         allowed = []
480         path = '/'.join((account, container, prefix)).rstrip('/')
481         if user != account:
482             allowed = self.permissions.access_list_paths(user, path)
483             if not allowed:
484                 raise NotAllowedError
485         else:
486             allowed = []
487             if shared:
488                 allowed.extend(self.permissions.access_list_shared(path))
489             if public:
490                 allowed.extend([x[0] for x in self.permissions.public_list(path)])
491             allowed = list(set(allowed))
492             if not allowed:
493                 return []
494         return allowed
495     
496     @backend_method
497     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
498         """Return a list of object (name, version_id) tuples existing under a container."""
499         
500         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
501         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
502     
503     @backend_method
504     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
505         """Return a list of object metadata dicts existing under a container."""
506         
507         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
508         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
509         objects = []
510         for p in props:
511             if len(p) == 2:
512                 objects.append({'subdir': p[0]})
513             else:
514                 objects.append({'name': p[0],
515                                 'bytes': p[self.SIZE + 1],
516                                 'type': p[self.TYPE + 1],
517                                 'hash': p[self.HASH + 1],
518                                 'version': p[self.SERIAL + 1],
519                                 'version_timestamp': p[self.MTIME + 1],
520                                 'modified': p[self.MTIME + 1] if until is None else None,
521                                 'modified_by': p[self.MUSER + 1],
522                                 'uuid': p[self.UUID + 1],
523                                 'checksum': p[self.CHECKSUM + 1]})
524         return objects
525     
526     @backend_method
527     def list_object_permissions(self, user, account, container, prefix=''):
528         """Return a list of paths that enforce permissions under a container."""
529         
530         logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
531         return self._list_object_permissions(user, account, container, prefix, True, False)
532     
533     @backend_method
534     def list_object_public(self, user, account, container, prefix=''):
535         """Return a dict mapping paths to public ids for objects that are public under a container."""
536         
537         logger.debug("list_object_public: %s %s %s", account, container, prefix)
538         public = {}
539         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
540             public[path] = p + ULTIMATE_ANSWER
541         return public
542     
543     @backend_method
544     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
545         """Return a dictionary with the object metadata for the domain."""
546         
547         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
548         self._can_read(user, account, container, name)
549         path, node = self._lookup_object(account, container, name)
550         props = self._get_version(node, version)
551         if version is None:
552             modified = props[self.MTIME]
553         else:
554             try:
555                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
556             except NameError: # Object may be deleted.
557                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
558                 if del_props is None:
559                     raise NameError('Object does not exist')
560                 modified = del_props[self.MTIME]
561         
562         meta = {}
563         if include_user_defined:
564             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
565         meta.update({'name': name,
566                      'bytes': props[self.SIZE],
567                      'type': props[self.TYPE],
568                      'hash': props[self.HASH],
569                      'version': props[self.SERIAL],
570                      'version_timestamp': props[self.MTIME],
571                      'modified': modified,
572                      'modified_by': props[self.MUSER],
573                      'uuid': props[self.UUID],
574                      'checksum': props[self.CHECKSUM]})
575         return meta
576     
577     @backend_method
578     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
579         """Update the metadata associated with the object for the domain and return the new version."""
580         
581         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
582         self._can_write(user, account, container, name)
583         path, node = self._lookup_object(account, container, name)
584         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
585         self._apply_versioning(account, container, src_version_id)
586         return dest_version_id
587     
588     @backend_method
589     def get_object_permissions(self, user, account, container, name):
590         """Return the action allowed on the object, the path
591         from which the object gets its permissions from,
592         along with a dictionary containing the permissions."""
593         
594         logger.debug("get_object_permissions: %s %s %s", account, container, name)
595         allowed = 'write'
596         permissions_path = self._get_permissions_path(account, container, name)
597         if user != account:
598             if self.permissions.access_check(permissions_path, self.WRITE, user):
599                 allowed = 'write'
600             elif self.permissions.access_check(permissions_path, self.READ, user):
601                 allowed = 'read'
602             else:
603                 raise NotAllowedError
604         self._lookup_object(account, container, name)
605         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
606     
607     @backend_method
608     def update_object_permissions(self, user, account, container, name, permissions):
609         """Update the permissions associated with the object."""
610         
611         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
612         if user != account:
613             raise NotAllowedError
614         path = self._lookup_object(account, container, name)[0]
615         self._check_permissions(path, permissions)
616         self.permissions.access_set(path, permissions)
617         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
618     
619     @backend_method
620     def get_object_public(self, user, account, container, name):
621         """Return the public id of the object if applicable."""
622         
623         logger.debug("get_object_public: %s %s %s", account, container, name)
624         self._can_read(user, account, container, name)
625         path = self._lookup_object(account, container, name)[0]
626         p = self.permissions.public_get(path)
627         if p is not None:
628             p += ULTIMATE_ANSWER
629         return p
630     
631     @backend_method
632     def update_object_public(self, user, account, container, name, public):
633         """Update the public status of the object."""
634         
635         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
636         self._can_write(user, account, container, name)
637         path = self._lookup_object(account, container, name)[0]
638         if not public:
639             self.permissions.public_unset(path)
640         else:
641             self.permissions.public_set(path)
642     
643     @backend_method
644     def get_object_hashmap(self, user, account, container, name, version=None):
645         """Return the object's size and a list with partial hashes."""
646         
647         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
648         self._can_read(user, account, container, name)
649         path, node = self._lookup_object(account, container, name)
650         props = self._get_version(node, version)
651         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
652         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
653     
654     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
655         if permissions is not None and user != account:
656             raise NotAllowedError
657         self._can_write(user, account, container, name)
658         if permissions is not None:
659             path = '/'.join((account, container, name))
660             self._check_permissions(path, permissions)
661         
662         account_path, account_node = self._lookup_account(account, True)
663         container_path, container_node = self._lookup_container(account, container)
664         path, node = self._put_object_node(container_path, container_node, name)
665         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
666         
667         # Handle meta.
668         if src_version_id is None:
669             src_version_id = pre_version_id
670         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
671         
672         # Check quota.
673         del_size = self._apply_versioning(account, container, pre_version_id)
674         size_delta = size - del_size
675         if size_delta > 0:
676             account_quota = long(self._get_policy(account_node)['quota'])
677             container_quota = long(self._get_policy(container_node)['quota'])
678             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
679                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
680                 # This must be executed in a transaction, so the version is never created if it fails.
681                 raise QuotaError
682         self._report_size_change(user, account, size_delta, {'action': 'object update'})
683         
684         if permissions is not None:
685             self.permissions.access_set(path, permissions)
686             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
687         
688         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
689         return dest_version_id
690     
691     @backend_method
692     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
693         """Create/update an object with the specified size and partial hashes."""
694         
695         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
696         if size == 0: # No such thing as an empty hashmap.
697             hashmap = [self.put_block('')]
698         map = HashMap(self.block_size, self.hash_algorithm)
699         map.extend([binascii.unhexlify(x) for x in hashmap])
700         missing = self.store.block_search(map)
701         if missing:
702             ie = IndexError()
703             ie.data = [binascii.hexlify(x) for x in missing]
704             raise ie
705         
706         hash = map.hash()
707         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
708         self.store.map_put(hash, map)
709         return dest_version_id
710     
711     @backend_method
712     def update_object_checksum(self, user, account, container, name, version, checksum):
713         """Update an object's checksum."""
714         
715         logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
716         # Update objects with greater version and same hashmap and size (fix metadata updates).
717         self._can_write(user, account, container, name)
718         path, node = self._lookup_object(account, container, name)
719         props = self._get_version(node, version)
720         versions = self.node.node_get_versions(node)
721         for x in versions:
722             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
723                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
724     
725     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
726         self._can_read(user, src_account, src_container, src_name)
727         path, node = self._lookup_object(src_account, src_container, src_name)
728         # TODO: Will do another fetch of the properties in duplicate version...
729         props = self._get_version(node, src_version) # Check to see if source exists.
730         src_version_id = props[self.SERIAL]
731         hash = props[self.HASH]
732         size = props[self.SIZE]
733         
734         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
735         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
736         return dest_version_id
737     
738     @backend_method
739     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
740         """Copy an object's data and metadata."""
741         
742         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
743         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
744         return dest_version_id
745     
746     @backend_method
747     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
748         """Move an object's data and metadata."""
749         
750         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
751         if user != src_account:
752             raise NotAllowedError
753         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
754         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
755             self._delete_object(user, src_account, src_container, src_name)
756         return dest_version_id
757     
758     def _delete_object(self, user, account, container, name, until=None):
759         if user != account:
760             raise NotAllowedError
761         
762         if until is not None:
763             path = '/'.join((account, container, name))
764             node = self.node.node_lookup(path)
765             if node is None:
766                 return
767             hashes = []
768             size = 0
769             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
770             hashes += h
771             size += s
772             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
773             hashes += h
774             size += s
775             for h in hashes:
776                 self.store.map_delete(h)
777             self.node.node_purge(node, until, CLUSTER_DELETED)
778             try:
779                 props = self._get_version(node)
780             except NameError:
781                 self.permissions.access_clear(path)
782             self._report_size_change(user, account, -size, {'action': 'object purge'})
783             return
784         
785         path, node = self._lookup_object(account, container, name)
786         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
787         del_size = self._apply_versioning(account, container, src_version_id)
788         if del_size:
789             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
790         self._report_object_change(user, account, path, details={'action': 'object delete'})
791         self.permissions.access_clear(path)
792     
793     @backend_method
794     def delete_object(self, user, account, container, name, until=None):
795         """Delete/purge an object."""
796         
797         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
798         self._delete_object(user, account, container, name, until)
799     
800     @backend_method
801     def list_versions(self, user, account, container, name):
802         """Return a list of all (version, version_timestamp) tuples for an object."""
803         
804         logger.debug("list_versions: %s %s %s", account, container, name)
805         self._can_read(user, account, container, name)
806         path, node = self._lookup_object(account, container, name)
807         versions = self.node.node_get_versions(node)
808         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
809     
810     @backend_method
811     def get_uuid(self, user, uuid):
812         """Return the (account, container, name) for the UUID given."""
813         
814         logger.debug("get_uuid: %s", uuid)
815         info = self.node.latest_uuid(uuid)
816         if info is None:
817             raise NameError
818         path, serial = info
819         account, container, name = path.split('/', 2)
820         self._can_read(user, account, container, name)
821         return (account, container, name)
822     
823     @backend_method
824     def get_public(self, user, public):
825         """Return the (account, container, name) for the public id given."""
826         
827         logger.debug("get_public: %s", public)
828         if public is None or public < ULTIMATE_ANSWER:
829             raise NameError
830         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
831         if path is None:
832             raise NameError
833         account, container, name = path.split('/', 2)
834         self._can_read(user, account, container, name)
835         return (account, container, name)
836     
837     @backend_method(autocommit=0)
838     def get_block(self, hash):
839         """Return a block's data."""
840         
841         logger.debug("get_block: %s", hash)
842         block = self.store.block_get(binascii.unhexlify(hash))
843         if not block:
844             raise NameError('Block does not exist')
845         return block
846     
847     @backend_method(autocommit=0)
848     def put_block(self, data):
849         """Store a block and return the hash."""
850         
851         logger.debug("put_block: %s", len(data))
852         return binascii.hexlify(self.store.block_put(data))
853     
854     @backend_method(autocommit=0)
855     def update_block(self, hash, data, offset=0):
856         """Update a known block and return the hash."""
857         
858         logger.debug("update_block: %s %s %s", hash, len(data), offset)
859         if offset == 0 and len(data) == self.block_size:
860             return self.put_block(data)
861         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
862         return binascii.hexlify(h)
863     
864     # Path functions.
865     
866     def _generate_uuid(self):
867         return str(uuidlib.uuid4())
868     
869     def _put_object_node(self, path, parent, name):
870         path = '/'.join((path, name))
871         node = self.node.node_lookup(path)
872         if node is None:
873             node = self.node.node_create(parent, path)
874         return path, node
875     
876     def _put_path(self, user, parent, path):
877         node = self.node.node_create(parent, path)
878         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
879         return node
880     
881     def _lookup_account(self, account, create=True):
882         node = self.node.node_lookup(account)
883         if node is None and create:
884             node = self._put_path(account, self.ROOTNODE, account) # User is account.
885         return account, node
886     
887     def _lookup_container(self, account, container):
888         path = '/'.join((account, container))
889         node = self.node.node_lookup(path)
890         if node is None:
891             raise NameError('Container does not exist')
892         return path, node
893     
894     def _lookup_object(self, account, container, name):
895         path = '/'.join((account, container, name))
896         node = self.node.node_lookup(path)
897         if node is None:
898             raise NameError('Object does not exist')
899         return path, node
900     
901     def _get_properties(self, node, until=None):
902         """Return properties until the timestamp given."""
903         
904         before = until if until is not None else inf
905         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
906         if props is None and until is not None:
907             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
908         if props is None:
909             raise NameError('Path does not exist')
910         return props
911     
912     def _get_statistics(self, node, until=None):
913         """Return count, sum of size and latest timestamp of everything under node."""
914         
915         if until is None:
916             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
917         else:
918             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
919         if stats is None:
920             stats = (0, 0, 0)
921         return stats
922     
923     def _get_version(self, node, version=None):
924         if version is None:
925             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
926             if props is None:
927                 raise NameError('Object does not exist')
928         else:
929             try:
930                 version = int(version)
931             except ValueError:
932                 raise IndexError('Version does not exist')
933             props = self.node.version_get_properties(version)
934             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
935                 raise IndexError('Version does not exist')
936         return props
937     
938     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
939         """Create a new version of the node."""
940         
941         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
942         if props is not None:
943             src_version_id = props[self.SERIAL]
944             src_hash = props[self.HASH]
945             src_size = props[self.SIZE]
946             src_type = props[self.TYPE]
947             src_checksum = props[self.CHECKSUM]
948         else:
949             src_version_id = None
950             src_hash = None
951             src_size = 0
952             src_type = ''
953             src_checksum = ''
954         if size is None: # Set metadata.
955             hash = src_hash # This way hash can be set to None (account or container).
956             size = src_size
957         if type is None:
958             type = src_type
959         if checksum is None:
960             checksum = src_checksum
961         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
962         
963         if src_node is None:
964             pre_version_id = src_version_id
965         else:
966             pre_version_id = None
967             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
968             if props is not None:
969                 pre_version_id = props[self.SERIAL]
970         if pre_version_id is not None:
971             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
972         
973         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
974         return pre_version_id, dest_version_id
975     
976     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
977         if src_version_id is not None:
978             self.node.attribute_copy(src_version_id, dest_version_id)
979         if not replace:
980             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
981             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
982         else:
983             self.node.attribute_del(dest_version_id, domain)
984             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
985     
986     def _put_metadata(self, user, node, domain, meta, replace=False):
987         """Create a new version and store metadata."""
988         
989         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
990         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
991         return src_version_id, dest_version_id
992     
993     def _list_limits(self, listing, marker, limit):
994         start = 0
995         if marker:
996             try:
997                 start = listing.index(marker) + 1
998             except ValueError:
999                 pass
1000         if not limit or limit > 10000:
1001             limit = 10000
1002         return start, limit
1003     
1004     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1005         cont_prefix = path + '/'
1006         prefix = cont_prefix + prefix
1007         start = cont_prefix + marker if marker else None
1008         before = until if until is not None else inf
1009         filterq = keys if domain else []
1010         sizeq = size_range
1011         
1012         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1013         objects.extend([(p, None) for p in prefixes] if virtual else [])
1014         objects.sort(key=lambda x: x[0])
1015         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1016         
1017         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1018         return objects[start:start + limit]
1019     
1020     # Reporting functions.
1021     
1022     def _report_size_change(self, user, account, size, details={}):
1023         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1024         account_node = self._lookup_account(account, True)[1]
1025         total = self._get_statistics(account_node)[1]
1026         details.update({'user': user, 'total': total})
1027         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1028     
1029     def _report_object_change(self, user, account, path, details={}):
1030         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1031         details.update({'user': user})
1032         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1033     
1034     def _report_sharing_change(self, user, account, path, details={}):
1035         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1036         details.update({'user': user})
1037         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1038     
1039     # Policy functions.
1040     
1041     def _check_policy(self, policy):
1042         for k in policy.keys():
1043             if policy[k] == '':
1044                 policy[k] = self.default_policy.get(k)
1045         for k, v in policy.iteritems():
1046             if k == 'quota':
1047                 q = int(v) # May raise ValueError.
1048                 if q < 0:
1049                     raise ValueError
1050             elif k == 'versioning':
1051                 if v not in ['auto', 'none']:
1052                     raise ValueError
1053             else:
1054                 raise ValueError
1055     
1056     def _put_policy(self, node, policy, replace):
1057         if replace:
1058             for k, v in self.default_policy.iteritems():
1059                 if k not in policy:
1060                     policy[k] = v
1061         self.node.policy_set(node, policy)
1062     
1063     def _get_policy(self, node):
1064         policy = self.default_policy.copy()
1065         policy.update(self.node.policy_get(node))
1066         return policy
1067     
1068     def _apply_versioning(self, account, container, version_id):
1069         """Delete the provided version if such is the policy.
1070            Return size of object removed.
1071         """
1072         
1073         if version_id is None:
1074             return 0
1075         path, node = self._lookup_container(account, container)
1076         versioning = self._get_policy(node)['versioning']
1077         if versioning != 'auto':
1078             hash, size = self.node.version_remove(version_id)
1079             self.store.map_delete(hash)
1080             return size
1081         return 0
1082     
1083     # Access control functions.
1084     
1085     def _check_groups(self, groups):
1086         # raise ValueError('Bad characters in groups')
1087         pass
1088     
1089     def _check_permissions(self, path, permissions):
1090         # raise ValueError('Bad characters in permissions')
1091         pass
1092     
1093     def _get_formatted_paths(self, paths):
1094         formatted = []
1095         for p in paths:
1096             node = self.node.node_lookup(p)
1097             if node is not None:
1098                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1099             if props is not None:
1100                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1101                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1102                 formatted.append((p, self.MATCH_EXACT))
1103         return formatted
1104     
1105     def _get_permissions_path(self, account, container, name):
1106         path = '/'.join((account, container, name))
1107         permission_paths = self.permissions.access_inherit(path)
1108         permission_paths.sort()
1109         permission_paths.reverse()
1110         for p in permission_paths:
1111             if p == path:
1112                 return p
1113             else:
1114                 if p.count('/') < 2:
1115                     continue
1116                 node = self.node.node_lookup(p)
1117                 if node is not None:
1118                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1119                 if props is not None:
1120                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1121                         return p
1122         return None
1123     
1124     def _can_read(self, user, account, container, name):
1125         if user == account:
1126             return True
1127         path = '/'.join((account, container, name))
1128         if self.permissions.public_get(path) is not None:
1129             return True
1130         path = self._get_permissions_path(account, container, name)
1131         if not path:
1132             raise NotAllowedError
1133         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1134             raise NotAllowedError
1135     
1136     def _can_write(self, user, account, container, name):
1137         if user == account:
1138             return True
1139         path = '/'.join((account, container, name))
1140         path = self._get_permissions_path(account, container, name)
1141         if not path:
1142             raise NotAllowedError
1143         if not self.permissions.access_check(path, self.WRITE, user):
1144             raise NotAllowedError
1145     
1146     def _allowed_accounts(self, user):
1147         allow = set()
1148         for path in self.permissions.access_list_paths(user):
1149             allow.add(path.split('/', 1)[0])
1150         return sorted(allow)
1151     
1152     def _allowed_containers(self, user, account):
1153         allow = set()
1154         for path in self.permissions.access_list_paths(user, account):
1155             allow.add(path.split('/', 2)[1])
1156         return sorted(allow)