Fix backend store.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, QuotaError, BaseBackend
42
43 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
44
45 inf = float('inf')
46
47
48 logger = logging.getLogger(__name__)
49
50
51 class HashMap(list):
52     
53     def __init__(self, blocksize, blockhash):
54         super(HashMap, self).__init__()
55         self.blocksize = blocksize
56         self.blockhash = blockhash
57     
58     def _hash_raw(self, v):
59         h = hashlib.new(self.blockhash)
60         h.update(v)
61         return h.digest()
62     
63     def hash(self):
64         if len(self) == 0:
65             return self._hash_raw('')
66         if len(self) == 1:
67             return self.__getitem__(0)
68         
69         h = list(self)
70         s = 2
71         while s < len(h):
72             s = s * 2
73         h += [('\x00' * len(h[0]))] * (s - len(h))
74         while len(h) > 1:
75             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
76         return h[0]
77
78
79 def backend_method(func=None, autocommit=1):
80     if func is None:
81         def fn(func):
82             return backend_method(func, autocommit)
83         return fn
84
85     if not autocommit:
86         return func
87     def fn(self, *args, **kw):
88         self.wrapper.execute()
89         try:
90             ret = func(self, *args, **kw)
91             self.wrapper.commit()
92             return ret
93         except:
94             self.wrapper.rollback()
95             raise
96     return fn
97
98
99 class ModularBackend(BaseBackend):
100     """A modular backend.
101     
102     Uses modules for SQL functions and storage.
103     """
104     
105     def __init__(self, db_module, db_connection, block_module, block_path):
106         self.hash_algorithm = 'sha256'
107         self.block_size = 4 * 1024 * 1024 # 4MB
108         
109         self.default_policy = {'quota': 0, 'versioning': 'manual'}
110         
111         __import__(db_module)
112         self.db_module = sys.modules[db_module]
113         self.wrapper = self.db_module.DBWrapper(db_connection)
114         
115         params = {'wrapper': self.wrapper}
116         self.permissions = self.db_module.Permissions(**params)
117         for x in ['READ', 'WRITE']:
118             setattr(self, x, getattr(self.db_module, x))
119         self.node = self.db_module.Node(**params)
120         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
121             setattr(self, x, getattr(self.db_module, x))
122         
123         __import__(block_module)
124         self.block_module = sys.modules[block_module]
125         
126         params = {'path': block_path,
127                   'block_size': self.block_size,
128                   'hash_algorithm': self.hash_algorithm}
129         self.store = self.block_module.Store(**params)
130     
131     def close(self):
132         self.wrapper.close()
133     
134     @backend_method
135     def list_accounts(self, user, marker=None, limit=10000):
136         """Return a list of accounts the user can access."""
137         
138         logger.debug("list_accounts: %s %s %s", user, marker, limit)
139         allowed = self._allowed_accounts(user)
140         start, limit = self._list_limits(allowed, marker, limit)
141         return allowed[start:start + limit]
142     
143     @backend_method
144     def get_account_meta(self, user, account, until=None):
145         """Return a dictionary with the account metadata."""
146         
147         logger.debug("get_account_meta: %s %s", account, until)
148         path, node = self._lookup_account(account, user == account)
149         if user != account:
150             if until or node is None or account not in self._allowed_accounts(user):
151                 raise NotAllowedError
152         try:
153             props = self._get_properties(node, until)
154             mtime = props[self.MTIME]
155         except NameError:
156             props = None
157             mtime = until
158         count, bytes, tstamp = self._get_statistics(node, until)
159         tstamp = max(tstamp, mtime)
160         if until is None:
161             modified = tstamp
162         else:
163             modified = self._get_statistics(node)[2] # Overall last modification.
164             modified = max(modified, mtime)
165         
166         if user != account:
167             meta = {'name': account}
168         else:
169             meta = {}
170             if props is not None:
171                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
172             if until is not None:
173                 meta.update({'until_timestamp': tstamp})
174             meta.update({'name': account, 'count': count, 'bytes': bytes})
175         meta.update({'modified': modified})
176         return meta
177     
178     @backend_method
179     def update_account_meta(self, user, account, meta, replace=False):
180         """Update the metadata associated with the account."""
181         
182         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
183         if user != account:
184             raise NotAllowedError
185         path, node = self._lookup_account(account, True)
186         self._put_metadata(user, node, meta, replace)
187     
188     @backend_method
189     def get_account_groups(self, user, account):
190         """Return a dictionary with the user groups defined for this account."""
191         
192         logger.debug("get_account_groups: %s", account)
193         if user != account:
194             if account not in self._allowed_accounts(user):
195                 raise NotAllowedError
196             return {}
197         self._lookup_account(account, True)
198         return self.permissions.group_dict(account)
199     
200     @backend_method
201     def update_account_groups(self, user, account, groups, replace=False):
202         """Update the groups associated with the account."""
203         
204         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
205         if user != account:
206             raise NotAllowedError
207         self._lookup_account(account, True)
208         self._check_groups(groups)
209         if replace:
210             self.permissions.group_destroy(account)
211         for k, v in groups.iteritems():
212             if not replace: # If not already deleted.
213                 self.permissions.group_delete(account, k)
214             if v:
215                 self.permissions.group_addmany(account, k, v)
216     
217     @backend_method
218     def get_account_policy(self, user, account):
219         """Return a dictionary with the account policy."""
220         
221         logger.debug("get_account_policy: %s", account)
222         if user != account:
223             if account not in self._allowed_accounts(user):
224                 raise NotAllowedError
225             return {}
226         path, node = self._lookup_account(account, True)
227         return self._get_policy(node)
228     
229     @backend_method
230     def update_account_policy(self, user, account, policy, replace=False):
231         """Update the policy associated with the account."""
232         
233         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
234         if user != account:
235             raise NotAllowedError
236         path, node = self._lookup_account(account, True)
237         self._check_policy(policy)
238         self._put_policy(node, policy, replace)
239     
240     @backend_method
241     def put_account(self, user, account, policy={}):
242         """Create a new account with the given name."""
243         
244         logger.debug("put_account: %s %s", account, policy)
245         if user != account:
246             raise NotAllowedError
247         node = self.node.node_lookup(account)
248         if node is not None:
249             raise NameError('Account already exists')
250         if policy:
251             self._check_policy(policy)
252         node = self._put_path(user, self.ROOTNODE, account)
253         self._put_policy(node, policy, True)
254     
255     @backend_method
256     def delete_account(self, user, account):
257         """Delete the account with the given name."""
258         
259         logger.debug("delete_account: %s", account)
260         if user != account:
261             raise NotAllowedError
262         node = self.node.node_lookup(account)
263         if node is None:
264             return
265         if not self.node.node_remove(node):
266             raise IndexError('Account is not empty')
267         self.permissions.group_destroy(account)
268     
269     @backend_method
270     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
271         """Return a list of containers existing under an account."""
272         
273         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
274         if user != account:
275             if until or account not in self._allowed_accounts(user):
276                 raise NotAllowedError
277             allowed = self._allowed_containers(user, account)
278             start, limit = self._list_limits(allowed, marker, limit)
279             return allowed[start:start + limit]
280         if shared:
281             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
282             allowed = list(set(allowed))
283             start, limit = self._list_limits(allowed, marker, limit)
284             return allowed[start:start + limit]
285         node = self.node.node_lookup(account)
286         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
287     
288     @backend_method
289     def get_container_meta(self, user, account, container, until=None):
290         """Return a dictionary with the container metadata."""
291         
292         logger.debug("get_container_meta: %s %s %s", account, container, until)
293         if user != account:
294             if until or container not in self._allowed_containers(user, account):
295                 raise NotAllowedError
296         path, node = self._lookup_container(account, container)
297         props = self._get_properties(node, until)
298         mtime = props[self.MTIME]
299         count, bytes, tstamp = self._get_statistics(node, until)
300         tstamp = max(tstamp, mtime)
301         if until is None:
302             modified = tstamp
303         else:
304             modified = self._get_statistics(node)[2] # Overall last modification.
305             modified = max(modified, mtime)
306         
307         if user != account:
308             meta = {'name': container}
309         else:
310             meta = dict(self.node.attribute_get(props[self.SERIAL]))
311             if until is not None:
312                 meta.update({'until_timestamp': tstamp})
313             meta.update({'name': container, 'count': count, 'bytes': bytes})
314         meta.update({'modified': modified})
315         return meta
316     
317     @backend_method
318     def update_container_meta(self, user, account, container, meta, replace=False):
319         """Update the metadata associated with the container."""
320         
321         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
322         if user != account:
323             raise NotAllowedError
324         path, node = self._lookup_container(account, container)
325         self._put_metadata(user, node, meta, replace)
326     
327     @backend_method
328     def get_container_policy(self, user, account, container):
329         """Return a dictionary with the container policy."""
330         
331         logger.debug("get_container_policy: %s %s", account, container)
332         if user != account:
333             if container not in self._allowed_containers(user, account):
334                 raise NotAllowedError
335             return {}
336         path, node = self._lookup_container(account, container)
337         return self._get_policy(node)
338     
339     @backend_method
340     def update_container_policy(self, user, account, container, policy, replace=False):
341         """Update the policy associated with the container."""
342         
343         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
344         if user != account:
345             raise NotAllowedError
346         path, node = self._lookup_container(account, container)
347         self._check_policy(policy)
348         self._put_policy(node, policy, replace)
349     
350     @backend_method
351     def put_container(self, user, account, container, policy={}):
352         """Create a new container with the given name."""
353         
354         logger.debug("put_container: %s %s %s", account, container, policy)
355         if user != account:
356             raise NotAllowedError
357         try:
358             path, node = self._lookup_container(account, container)
359         except NameError:
360             pass
361         else:
362             raise NameError('Container already exists')
363         if policy:
364             self._check_policy(policy)
365         path = '/'.join((account, container))
366         node = self._put_path(user, self._lookup_account(account, True)[1], path)
367         self._put_policy(node, policy, True)
368     
369     @backend_method
370     def delete_container(self, user, account, container, until=None):
371         """Delete/purge the container with the given name."""
372         
373         logger.debug("delete_container: %s %s %s", account, container, until)
374         if user != account:
375             raise NotAllowedError
376         path, node = self._lookup_container(account, container)
377         
378         if until is not None:
379             self.node.node_purge_children(node, until, CLUSTER_HISTORY)
380             self.node.node_purge_children(node, until, CLUSTER_DELETED)
381             return
382         
383         if self._get_statistics(node)[0] > 0:
384             raise IndexError('Container is not empty')
385         self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
386         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
387         self.node.node_remove(node)
388     
389     @backend_method
390     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
391         """Return a list of objects existing under a container."""
392         
393         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
394         allowed = []
395         if user != account:
396             if until:
397                 raise NotAllowedError
398             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
399             if not allowed:
400                 raise NotAllowedError
401         else:
402             if shared:
403                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
404                 if not allowed:
405                     return []
406         path, node = self._lookup_container(account, container)
407         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
408     
409     @backend_method
410     def list_object_meta(self, user, account, container, until=None):
411         """Return a list with all the container's object meta keys."""
412         
413         logger.debug("list_object_meta: %s %s %s", account, container, until)
414         allowed = []
415         if user != account:
416             if until:
417                 raise NotAllowedError
418             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
419             if not allowed:
420                 raise NotAllowedError
421         path, node = self._lookup_container(account, container)
422         before = until if until is not None else inf
423         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
424     
425     @backend_method
426     def get_object_meta(self, user, account, container, name, version=None):
427         """Return a dictionary with the object metadata."""
428         
429         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
430         self._can_read(user, account, container, name)
431         path, node = self._lookup_object(account, container, name)
432         props = self._get_version(node, version)
433         if version is None:
434             modified = props[self.MTIME]
435         else:
436             try:
437                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
438             except NameError: # Object may be deleted.
439                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
440                 if del_props is None:
441                     raise NameError('Object does not exist')
442                 modified = del_props[self.MTIME]
443         
444         meta = dict(self.node.attribute_get(props[self.SERIAL]))
445         meta.update({'name': name, 'bytes': props[self.SIZE]})
446         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
447         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
448         return meta
449     
450     @backend_method
451     def update_object_meta(self, user, account, container, name, meta, replace=False):
452         """Update the metadata associated with the object."""
453         
454         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
455         self._can_write(user, account, container, name)
456         path, node = self._lookup_object(account, container, name)
457         return self._put_metadata(user, node, meta, replace)
458     
459     @backend_method
460     def get_object_permissions(self, user, account, container, name):
461         """Return the action allowed on the object, the path
462         from which the object gets its permissions from,
463         along with a dictionary containing the permissions."""
464         
465         logger.debug("get_object_permissions: %s %s %s", account, container, name)
466         allowed = 'write'
467         if user != account:
468             path = '/'.join((account, container, name))
469             if self.permissions.access_check(path, self.WRITE, user):
470                 allowed = 'write'
471             elif self.permissions.access_check(path, self.READ, user):
472                 allowed = 'read'
473             else:
474                 raise NotAllowedError
475         path = self._lookup_object(account, container, name)[0]
476         return (allowed,) + self.permissions.access_inherit(path)
477     
478     @backend_method
479     def update_object_permissions(self, user, account, container, name, permissions):
480         """Update the permissions associated with the object."""
481         
482         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
483         if user != account:
484             raise NotAllowedError
485         path = self._lookup_object(account, container, name)[0]
486         self._check_permissions(path, permissions)
487         self.permissions.access_set(path, permissions)
488     
489     @backend_method
490     def get_object_public(self, user, account, container, name):
491         """Return the public URL of the object if applicable."""
492         
493         logger.debug("get_object_public: %s %s %s", account, container, name)
494         self._can_read(user, account, container, name)
495         path = self._lookup_object(account, container, name)[0]
496         if self.permissions.public_check(path):
497             return '/public/' + path
498         return None
499     
500     @backend_method
501     def update_object_public(self, user, account, container, name, public):
502         """Update the public status of the object."""
503         
504         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
505         self._can_write(user, account, container, name)
506         path = self._lookup_object(account, container, name)[0]
507         if not public:
508             self.permissions.public_unset(path)
509         else:
510             self.permissions.public_set(path)
511     
512     @backend_method
513     def get_object_hashmap(self, user, account, container, name, version=None):
514         """Return the object's size and a list with partial hashes."""
515         
516         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
517         self._can_read(user, account, container, name)
518         path, node = self._lookup_object(account, container, name)
519         props = self._get_version(node, version)
520         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
521         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
522     
523     def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
524         if permissions is not None and user != account:
525             raise NotAllowedError
526         self._can_write(user, account, container, name)
527         if permissions is not None:
528             path = '/'.join((account, container, name))
529             self._check_permissions(path, permissions)
530         
531         account_path, account_node = self._lookup_account(account, True)
532         container_path, container_node = self._lookup_container(account, container)
533         path, node = self._put_object_node(container_path, container_node, name)
534         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
535         
536         # Check quota.
537         size_delta = size # Change with versioning.
538         if size_delta > 0:
539             account_quota = long(self._get_policy(account_node)['quota'])
540             container_quota = long(self._get_policy(container_node)['quota'])
541             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
542                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
543                 # This must be executed in a transaction, so the version is never created if it fails.
544                 raise QuotaError
545         
546         if not replace_meta and src_version_id is not None:
547             self.node.attribute_copy(src_version_id, dest_version_id)
548         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
549         if permissions is not None:
550             self.permissions.access_set(path, permissions)
551         return dest_version_id
552     
553     @backend_method
554     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
555         """Create/update an object with the specified size and partial hashes."""
556         
557         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
558         if size == 0: # No such thing as an empty hashmap.
559             hashmap = [self.put_block('')]
560         map = HashMap(self.block_size, self.hash_algorithm)
561         map.extend([binascii.unhexlify(x) for x in hashmap])
562         missing = self.store.block_search(map)
563         if missing:
564             ie = IndexError()
565             ie.data = [binascii.hexlify(x) for x in missing]
566             raise ie
567         
568         hash = map.hash()
569         dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
570         self.store.map_put(hash, map)
571         return dest_version_id
572     
573     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
574         self._can_read(user, src_account, src_container, src_name)
575         path, node = self._lookup_object(src_account, src_container, src_name)
576         props = self._get_version(node, src_version)
577         src_version_id = props[self.SERIAL]
578         hash = props[self.HASH]
579         size = props[self.SIZE]
580         
581         if replace_meta:
582             meta = dest_meta
583         else:
584             meta = {}
585         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
586         if not replace_meta:
587             self.node.attribute_copy(src_version_id, dest_version_id)
588             self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
589         return dest_version_id
590     
591     @backend_method
592     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
593         """Copy an object's data and metadata."""
594         
595         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
596         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
597     
598     @backend_method
599     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
600         """Move an object's data and metadata."""
601         
602         logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
603         if user != src_account:
604             raise NotAllowedError
605         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
606         self._delete_object(user, src_account, src_container, src_name)
607         return dest_version_id
608     
609     def _delete_object(self, user, account, container, name, until=None):
610         if user != account:
611             raise NotAllowedError
612         
613         if until is not None:
614             path = '/'.join((account, container, name))
615             node = self.node.node_lookup(path)
616             if node is None:
617                 return
618             self.node.node_purge(node, until, CLUSTER_NORMAL)
619             self.node.node_purge(node, until, CLUSTER_HISTORY)
620             self.node.node_purge_children(node, until, CLUSTER_DELETED)
621             try:
622                 props = self._get_version(node)
623             except NameError:
624                 pass
625             else:
626                 self.permissions.access_clear(path)
627             return
628         
629         path, node = self._lookup_object(account, container, name)
630         src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
631         self.permissions.access_clear(path)
632     
633     @backend_method
634     def delete_object(self, user, account, container, name, until=None):
635         """Delete/purge an object."""
636         
637         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
638         self._delete_object(user, account, container, name, until)
639     
640     @backend_method
641     def list_versions(self, user, account, container, name):
642         """Return a list of all (version, version_timestamp) tuples for an object."""
643         
644         logger.debug("list_versions: %s %s %s", account, container, name)
645         self._can_read(user, account, container, name)
646         path, node = self._lookup_object(account, container, name)
647         versions = self.node.node_get_versions(node)
648         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
649     
650     @backend_method(autocommit=0)
651     def get_block(self, hash):
652         """Return a block's data."""
653         
654         logger.debug("get_block: %s", hash)
655         block = self.store.block_get(binascii.unhexlify(hash))
656         if not block:
657             raise NameError('Block does not exist')
658         return block
659     
660     @backend_method(autocommit=0)
661     def put_block(self, data):
662         """Store a block and return the hash."""
663         
664         logger.debug("put_block: %s", len(data))
665         return binascii.hexlify(self.store.block_put(data))
666     
667     @backend_method(autocommit=0)
668     def update_block(self, hash, data, offset=0):
669         """Update a known block and return the hash."""
670         
671         logger.debug("update_block: %s %s %s", hash, len(data), offset)
672         if offset == 0 and len(data) == self.block_size:
673             return self.put_block(data)
674         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
675         return binascii.hexlify(h)
676     
677     # Path functions.
678     
679     def _put_object_node(self, path, parent, name):
680         path = '/'.join((path, name))
681         node = self.node.node_lookup(path)
682         if node is None:
683             node = self.node.node_create(parent, path)
684         return path, node
685     
686     def _put_path(self, user, parent, path):
687         node = self.node.node_create(parent, path)
688         self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
689         return node
690     
691     def _lookup_account(self, account, create=True):
692         node = self.node.node_lookup(account)
693         if node is None and create:
694             node = self._put_path(account, self.ROOTNODE, account) # User is account.
695         return account, node
696     
697     def _lookup_container(self, account, container):
698         path = '/'.join((account, container))
699         node = self.node.node_lookup(path)
700         if node is None:
701             raise NameError('Container does not exist')
702         return path, node
703     
704     def _lookup_object(self, account, container, name):
705         path = '/'.join((account, container, name))
706         node = self.node.node_lookup(path)
707         if node is None:
708             raise NameError('Object does not exist')
709         return path, node
710     
711     def _get_properties(self, node, until=None):
712         """Return properties until the timestamp given."""
713         
714         before = until if until is not None else inf
715         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
716         if props is None and until is not None:
717             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
718         if props is None:
719             raise NameError('Path does not exist')
720         return props
721     
722     def _get_statistics(self, node, until=None):
723         """Return count, sum of size and latest timestamp of everything under node."""
724         
725         if until is None:
726             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
727         else:
728             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
729         if stats is None:
730             stats = (0, 0, 0)
731         return stats
732     
733     def _get_version(self, node, version=None):
734         if version is None:
735             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
736             if props is None:
737                 raise NameError('Object does not exist')
738         else:
739             try:
740                 version = int(version)
741             except ValueError:
742                 raise IndexError('Version does not exist')
743             props = self.node.version_get_properties(version)
744             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
745                 raise IndexError('Version does not exist')
746         return props
747     
748     def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
749         """Create a new version of the node."""
750         
751         props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
752         if props is not None:
753             src_version_id = props[self.SERIAL]
754             src_hash = props[self.HASH]
755             src_size = props[self.SIZE]
756         else:
757             src_version_id = None
758             src_hash = None
759             src_size = 0
760         if size is None:
761             hash = src_hash # This way hash can be set to None.
762             size = src_size
763         
764         if src_version_id is not None:
765             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
766         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
767         return src_version_id, dest_version_id
768     
769     def _put_metadata(self, user, node, meta, replace=False):
770         """Create a new version and store metadata."""
771         
772         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
773         
774         # TODO: Merge with other functions that update metadata...
775         if not replace:
776             if src_version_id is not None:
777                 self.node.attribute_copy(src_version_id, dest_version_id)
778             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
779             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
780         else:
781             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
782         return dest_version_id
783     
784     def _list_limits(self, listing, marker, limit):
785         start = 0
786         if marker:
787             try:
788                 start = listing.index(marker) + 1
789             except ValueError:
790                 pass
791         if not limit or limit > 10000:
792             limit = 10000
793         return start, limit
794     
795     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
796         cont_prefix = path + '/'
797         prefix = cont_prefix + prefix
798         start = cont_prefix + marker if marker else None
799         before = until if until is not None else inf
800         filterq = ','.join(keys) if keys else None
801         
802         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
803         objects.extend([(p, None) for p in prefixes] if virtual else [])
804         objects.sort(key=lambda x: x[0])
805         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
806         
807         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
808         return objects[start:start + limit]
809     
810     # Policy functions.
811     
812     def _check_policy(self, policy):
813         for k in policy.keys():
814             if policy[k] == '':
815                 policy[k] = self.default_policy.get(k)
816         for k, v in policy.iteritems():
817             if k == 'quota':
818                 q = int(v) # May raise ValueError.
819                 if q < 0:
820                     raise ValueError
821             elif k == 'versioning':
822                 if v not in ['auto', 'manual', 'none']:
823                     raise ValueError
824             else:
825                 raise ValueError
826     
827     def _put_policy(self, node, policy, replace):
828         if replace:
829             for k, v in self.default_policy.iteritems():
830                 if k not in policy:
831                     policy[k] = v
832         self.node.policy_set(node, policy)
833     
834     def _get_policy(self, node):
835         policy = self.default_policy.copy()
836         policy.update(self.node.policy_get(node))
837         return policy
838     
839     # Access control functions.
840     
841     def _check_groups(self, groups):
842         # raise ValueError('Bad characters in groups')
843         pass
844     
845     def _check_permissions(self, path, permissions):
846         # raise ValueError('Bad characters in permissions')
847         
848         # Check for existing permissions.
849         paths = self.permissions.access_list(path)
850         if paths:
851             ae = AttributeError()
852             ae.data = paths
853             raise ae
854     
855     def _can_read(self, user, account, container, name):
856         if user == account:
857             return True
858         path = '/'.join((account, container, name))
859         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
860             raise NotAllowedError
861     
862     def _can_write(self, user, account, container, name):
863         if user == account:
864             return True
865         path = '/'.join((account, container, name))
866         if not self.permissions.access_check(path, self.WRITE, user):
867             raise NotAllowedError
868     
869     def _allowed_accounts(self, user):
870         allow = set()
871         for path in self.permissions.access_list_paths(user):
872             allow.add(path.split('/', 1)[0])
873         return sorted(allow)
874     
875     def _allowed_containers(self, user, account):
876         allow = set()
877         for path in self.permissions.access_list_paths(user, account):
878             allow.add(path.split('/', 2)[1])
879         return sorted(allow)