Save hash maps like blocks - based on their hash.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, BaseBackend
42 from lib.hashfiler import Mapper, Blocker
43
44 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
45
46 inf = float('inf')
47
48
49 logger = logging.getLogger(__name__)
50
51
52 class HashMap(list):
53     
54     def __init__(self, blocksize, blockhash):
55         super(HashMap, self).__init__()
56         self.blocksize = blocksize
57         self.blockhash = blockhash
58     
59     def _hash_raw(self, v):
60         h = hashlib.new(self.blockhash)
61         h.update(v)
62         return h.digest()
63     
64     def hash(self):
65         if len(self) == 0:
66             return self._hash_raw('')
67         if len(self) == 1:
68             return self.__getitem__(0)
69         
70         h = list(self)
71         s = 2
72         while s < len(h):
73             s = s * 2
74         h += [('\x00' * len(h[0]))] * (s - len(h))
75         while len(h) > 1:
76             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
77         return h[0]
78
79
80 def backend_method(func=None, autocommit=1):
81     if func is None:
82         def fn(func):
83             return backend_method(func, autocommit)
84         return fn
85
86     if not autocommit:
87         return func
88     def fn(self, *args, **kw):
89         self.wrapper.execute()
90         try:
91             ret = func(self, *args, **kw)
92             self.wrapper.commit()
93             return ret
94         except:
95             self.wrapper.rollback()
96             raise
97     return fn
98
99
100 class ModularBackend(BaseBackend):
101     """A modular backend.
102     
103     Uses modules for SQL functions and hashfiler for storage.
104     """
105     
106     def __init__(self, mod, path, db):
107         self.hash_algorithm = 'sha256'
108         self.block_size = 4 * 1024 * 1024 # 4MB
109         
110         self.default_policy = {'quota': 0, 'versioning': 'auto'}
111         
112         if path and not os.path.exists(path):
113             os.makedirs(path)
114         if not os.path.isdir(path):
115             raise RuntimeError("Cannot open path '%s'" % (path,))
116         
117         __import__(mod)
118         self.mod = sys.modules[mod]
119         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
120         
121         params = {'blocksize': self.block_size,
122                   'blockpath': os.path.join(path + '/blocks'),
123                   'hashtype': self.hash_algorithm}
124         self.blocker = Blocker(**params)
125         
126         params = {'mappath': os.path.join(path + '/maps'),
127                   'namelen': self.blocker.hashlen}
128         self.mapper = Mapper(**params)
129         
130         params = {'wrapper': self.wrapper}
131         self.permissions = self.mod.permissions.Permissions(**params)
132         for x in ['READ', 'WRITE']:
133             setattr(self, x, getattr(self.mod.permissions, x))
134         self.policy = self.mod.policy.Policy(**params)
135         self.node = self.mod.node.Node(**params)
136         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
137             setattr(self, x, getattr(self.mod.node, x))
138     
139     @backend_method
140     def list_accounts(self, user, marker=None, limit=10000):
141         """Return a list of accounts the user can access."""
142         
143         logger.debug("list_accounts: %s %s %s", user, marker, limit)
144         allowed = self._allowed_accounts(user)
145         start, limit = self._list_limits(allowed, marker, limit)
146         return allowed[start:start + limit]
147     
148     @backend_method
149     def get_account_meta(self, user, account, until=None):
150         """Return a dictionary with the account metadata."""
151         
152         logger.debug("get_account_meta: %s %s", account, until)
153         path, node = self._lookup_account(account, user == account)
154         if user != account:
155             if until or node is None or account not in self._allowed_accounts(user):
156                 raise NotAllowedError
157         try:
158             props = self._get_properties(node, until)
159             mtime = props[self.MTIME]
160         except NameError:
161             props = None
162             mtime = until
163         count, bytes, tstamp = self._get_statistics(node, until)
164         tstamp = max(tstamp, mtime)
165         if until is None:
166             modified = tstamp
167         else:
168             modified = self._get_statistics(node)[2] # Overall last modification.
169             modified = max(modified, mtime)
170         
171         if user != account:
172             meta = {'name': account}
173         else:
174             meta = {}
175             if props is not None:
176                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
177             if until is not None:
178                 meta.update({'until_timestamp': tstamp})
179             meta.update({'name': account, 'count': count, 'bytes': bytes})
180         meta.update({'modified': modified})
181         return meta
182     
183     @backend_method
184     def update_account_meta(self, user, account, meta, replace=False):
185         """Update the metadata associated with the account."""
186         
187         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
188         if user != account:
189             raise NotAllowedError
190         path, node = self._lookup_account(account, True)
191         self._put_metadata(user, node, meta, replace, False)
192     
193     @backend_method
194     def get_account_groups(self, user, account):
195         """Return a dictionary with the user groups defined for this account."""
196         
197         logger.debug("get_account_groups: %s", account)
198         if user != account:
199             if account not in self._allowed_accounts(user):
200                 raise NotAllowedError
201             return {}
202         self._lookup_account(account, True)
203         return self.permissions.group_dict(account)
204     
205     @backend_method
206     def update_account_groups(self, user, account, groups, replace=False):
207         """Update the groups associated with the account."""
208         
209         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
210         if user != account:
211             raise NotAllowedError
212         self._lookup_account(account, True)
213         self._check_groups(groups)
214         if replace:
215             self.permissions.group_destroy(account)
216         for k, v in groups.iteritems():
217             if not replace: # If not already deleted.
218                 self.permissions.group_delete(account, k)
219             if v:
220                 self.permissions.group_addmany(account, k, v)
221     
222     @backend_method
223     def put_account(self, user, account):
224         """Create a new account with the given name."""
225         
226         logger.debug("put_account: %s", account)
227         if user != account:
228             raise NotAllowedError
229         node = self.node.node_lookup(account)
230         if node is not None:
231             raise NameError('Account already exists')
232         self._put_path(user, self.ROOTNODE, account)
233     
234     @backend_method
235     def delete_account(self, user, account):
236         """Delete the account with the given name."""
237         
238         logger.debug("delete_account: %s", account)
239         if user != account:
240             raise NotAllowedError
241         node = self.node.node_lookup(account)
242         if node is None:
243             return
244         if not self.node.node_remove(node):
245             raise IndexError('Account is not empty')
246         self.permissions.group_destroy(account)
247     
248     @backend_method
249     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
250         """Return a list of containers existing under an account."""
251         
252         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
253         if user != account:
254             if until or account not in self._allowed_accounts(user):
255                 raise NotAllowedError
256             allowed = self._allowed_containers(user, account)
257             start, limit = self._list_limits(allowed, marker, limit)
258             return allowed[start:start + limit]
259         if shared:
260             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
261             allowed = list(set(allowed))
262             start, limit = self._list_limits(allowed, marker, limit)
263             return allowed[start:start + limit]
264         node = self.node.node_lookup(account)
265         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
266     
267     @backend_method
268     def get_container_meta(self, user, account, container, until=None):
269         """Return a dictionary with the container metadata."""
270         
271         logger.debug("get_container_meta: %s %s %s", account, container, until)
272         if user != account:
273             if until or container not in self._allowed_containers(user, account):
274                 raise NotAllowedError
275         path, node = self._lookup_container(account, container)
276         props = self._get_properties(node, until)
277         mtime = props[self.MTIME]
278         count, bytes, tstamp = self._get_statistics(node, until)
279         tstamp = max(tstamp, mtime)
280         if until is None:
281             modified = tstamp
282         else:
283             modified = self._get_statistics(node)[2] # Overall last modification.
284             modified = max(modified, mtime)
285         
286         if user != account:
287             meta = {'name': container}
288         else:
289             meta = dict(self.node.attribute_get(props[self.SERIAL]))
290             if until is not None:
291                 meta.update({'until_timestamp': tstamp})
292             meta.update({'name': container, 'count': count, 'bytes': bytes})
293         meta.update({'modified': modified})
294         return meta
295     
296     @backend_method
297     def update_container_meta(self, user, account, container, meta, replace=False):
298         """Update the metadata associated with the container."""
299         
300         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
301         if user != account:
302             raise NotAllowedError
303         path, node = self._lookup_container(account, container)
304         self._put_metadata(user, node, meta, replace, False)
305     
306     @backend_method
307     def get_container_policy(self, user, account, container):
308         """Return a dictionary with the container policy."""
309         
310         logger.debug("get_container_policy: %s %s", account, container)
311         if user != account:
312             if container not in self._allowed_containers(user, account):
313                 raise NotAllowedError
314             return {}
315         path = self._lookup_container(account, container)[0]
316         return self.policy.policy_get(path)
317     
318     @backend_method
319     def update_container_policy(self, user, account, container, policy, replace=False):
320         """Update the policy associated with the account."""
321         
322         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
323         if user != account:
324             raise NotAllowedError
325         path = self._lookup_container(account, container)[0]
326         self._check_policy(policy)
327         if replace:
328             for k, v in self.default_policy.iteritems():
329                 if k not in policy:
330                     policy[k] = v
331         self.policy.policy_set(path, policy)
332     
333     @backend_method
334     def put_container(self, user, account, container, policy=None):
335         """Create a new container with the given name."""
336         
337         logger.debug("put_container: %s %s %s", account, container, policy)
338         if user != account:
339             raise NotAllowedError
340         try:
341             path, node = self._lookup_container(account, container)
342         except NameError:
343             pass
344         else:
345             raise NameError('Container already exists')
346         if policy:
347             self._check_policy(policy)
348         path = '/'.join((account, container))
349         self._put_path(user, self._lookup_account(account, True)[1], path)
350         for k, v in self.default_policy.iteritems():
351             if k not in policy:
352                 policy[k] = v
353         self.policy.policy_set(path, policy)
354     
355     @backend_method
356     def delete_container(self, user, account, container, until=None):
357         """Delete/purge the container with the given name."""
358         
359         logger.debug("delete_container: %s %s %s", account, container, until)
360         if user != account:
361             raise NotAllowedError
362         path, node = self._lookup_container(account, container)
363         
364         if until is not None:
365             self.node.node_purge_children(node, until, CLUSTER_HISTORY)
366             self.node.node_purge_children(node, until, CLUSTER_DELETED)
367             return
368         
369         if self._get_statistics(node)[0] > 0:
370             raise IndexError('Container is not empty')
371         self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
372         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
373         self.node.node_remove(node)
374         self.policy.policy_unset(path)
375     
376     @backend_method
377     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
378         """Return a list of objects existing under a container."""
379         
380         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
381         allowed = []
382         if user != account:
383             if until:
384                 raise NotAllowedError
385             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
386             if not allowed:
387                 raise NotAllowedError
388         else:
389             if shared:
390                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
391                 if not allowed:
392                     return []
393         path, node = self._lookup_container(account, container)
394         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
395     
396     @backend_method
397     def list_object_meta(self, user, account, container, until=None):
398         """Return a list with all the container's object meta keys."""
399         
400         logger.debug("list_object_meta: %s %s %s", account, container, until)
401         allowed = []
402         if user != account:
403             if until:
404                 raise NotAllowedError
405             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
406             if not allowed:
407                 raise NotAllowedError
408         path, node = self._lookup_container(account, container)
409         before = until if until is not None else inf
410         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
411     
412     @backend_method
413     def get_object_meta(self, user, account, container, name, version=None):
414         """Return a dictionary with the object metadata."""
415         
416         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
417         self._can_read(user, account, container, name)
418         path, node = self._lookup_object(account, container, name)
419         props = self._get_version(node, version)
420         if version is None:
421             modified = props[self.MTIME]
422         else:
423             try:
424                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
425             except NameError: # Object may be deleted.
426                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
427                 if del_props is None:
428                     raise NameError('Object does not exist')
429                 modified = del_props[self.MTIME]
430         
431         meta = dict(self.node.attribute_get(props[self.SERIAL]))
432         meta.update({'name': name, 'bytes': props[self.SIZE]})
433         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
434         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
435         return meta
436     
437     @backend_method
438     def update_object_meta(self, user, account, container, name, meta, replace=False):
439         """Update the metadata associated with the object."""
440         
441         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
442         self._can_write(user, account, container, name)
443         path, node = self._lookup_object(account, container, name)
444         return self._put_metadata(user, node, meta, replace)
445     
446     @backend_method
447     def get_object_permissions(self, user, account, container, name):
448         """Return the action allowed on the object, the path
449         from which the object gets its permissions from,
450         along with a dictionary containing the permissions."""
451         
452         logger.debug("get_object_permissions: %s %s %s", account, container, name)
453         allowed = 'write'
454         if user != account:
455             path = '/'.join((account, container, name))
456             if self.permissions.access_check(path, self.WRITE, user):
457                 allowed = 'write'
458             elif self.permissions.access_check(path, self.READ, user):
459                 allowed = 'read'
460             else:
461                 raise NotAllowedError
462         path = self._lookup_object(account, container, name)[0]
463         return (allowed,) + self.permissions.access_inherit(path)
464     
465     @backend_method
466     def update_object_permissions(self, user, account, container, name, permissions):
467         """Update the permissions associated with the object."""
468         
469         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
470         if user != account:
471             raise NotAllowedError
472         path = self._lookup_object(account, container, name)[0]
473         self._check_permissions(path, permissions)
474         self.permissions.access_set(path, permissions)
475     
476     @backend_method
477     def get_object_public(self, user, account, container, name):
478         """Return the public URL of the object if applicable."""
479         
480         logger.debug("get_object_public: %s %s %s", account, container, name)
481         self._can_read(user, account, container, name)
482         path = self._lookup_object(account, container, name)[0]
483         if self.permissions.public_check(path):
484             return '/public/' + path
485         return None
486     
487     @backend_method
488     def update_object_public(self, user, account, container, name, public):
489         """Update the public status of the object."""
490         
491         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
492         self._can_write(user, account, container, name)
493         path = self._lookup_object(account, container, name)[0]
494         if not public:
495             self.permissions.public_unset(path)
496         else:
497             self.permissions.public_set(path)
498     
499     @backend_method
500     def get_object_hashmap(self, user, account, container, name, version=None):
501         """Return the object's size and a list with partial hashes."""
502         
503         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
504         self._can_read(user, account, container, name)
505         path, node = self._lookup_object(account, container, name)
506         props = self._get_version(node, version)
507         hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH]))
508         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
509     
510     @backend_method
511     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
512         """Create/update an object with the specified size and partial hashes."""
513         
514         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
515         if permissions is not None and user != account:
516             raise NotAllowedError
517         self._can_write(user, account, container, name)
518         map = HashMap(self.block_size, self.hash_algorithm)
519         map.extend([binascii.unhexlify(x) for x in hashmap])
520         missing = self.blocker.block_ping(map)
521         if missing:
522             ie = IndexError()
523             ie.data = [binascii.hexlify(x) for x in missing]
524             raise ie
525         if permissions is not None:
526             path = '/'.join((account, container, name))
527             self._check_permissions(path, permissions)
528         path, node = self._put_object_node(account, container, name)
529         hash = map.hash()
530         src_version_id, dest_version_id = self._copy_version(user, node, None, node, binascii.hexlify(hash), size)
531         self.mapper.map_stor(hash, map)
532         if not replace_meta and src_version_id is not None:
533             self.node.attribute_copy(src_version_id, dest_version_id)
534         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
535         if permissions is not None:
536             self.permissions.access_set(path, permissions)
537         return dest_version_id
538     
539     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
540         if permissions is not None and user != dest_account:
541             raise NotAllowedError
542         self._can_read(user, src_account, src_container, src_name)
543         self._can_write(user, dest_account, dest_container, dest_name)
544         src_path, src_node = self._lookup_object(src_account, src_container, src_name)
545         self._get_version(src_node, src_version)
546         if permissions is not None:
547             dest_path = '/'.join((dest_account, dest_container, dest_name))
548             self._check_permissions(dest_path, permissions)
549         dest_path, dest_node = self._put_object_node(dest_account, dest_container, dest_name)
550         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
551         if not replace_meta and src_version_id is not None:
552             self.node.attribute_copy(src_version_id, dest_version_id)
553         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
554         if permissions is not None:
555             self.permissions.access_set(dest_path, permissions)
556         return dest_version_id
557     
558     @backend_method
559     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
560         """Copy an object's data and metadata."""
561         
562         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
563         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
564     
565     @backend_method
566     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
567         """Move an object's data and metadata."""
568         
569         logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
570         if user != src_account:
571             raise NotAllowedError
572         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
573         self._delete_object(user, src_account, src_container, src_name)
574         return dest_version_id
575     
576     def _delete_object(self, user, account, container, name, until=None):
577         if user != account:
578             raise NotAllowedError
579         
580         if until is not None:
581             path = '/'.join((account, container, name))
582             node = self.node.node_lookup(path)
583             if node is None:
584                 return
585             self.node.node_purge(node, until, CLUSTER_NORMAL)
586             self.node.node_purge(node, until, CLUSTER_HISTORY)
587             self.node.node_purge_children(node, until, CLUSTER_DELETED)
588             try:
589                 props = self._get_version(node)
590             except NameError:
591                 pass
592             else:
593                 self.permissions.access_clear(path)
594             return
595         
596         path, node = self._lookup_object(account, container, name)
597         self._copy_version(user, node, None, node, None, 0, CLUSTER_DELETED)
598         self.permissions.access_clear(path)
599     
600     @backend_method
601     def delete_object(self, user, account, container, name, until=None):
602         """Delete/purge an object."""
603         
604         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
605         self._delete_object(user, account, container, name, until)
606     
607     @backend_method
608     def list_versions(self, user, account, container, name):
609         """Return a list of all (version, version_timestamp) tuples for an object."""
610         
611         logger.debug("list_versions: %s %s %s", account, container, name)
612         self._can_read(user, account, container, name)
613         path, node = self._lookup_object(account, container, name)
614         versions = self.node.node_get_versions(node)
615         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
616     
617     @backend_method(autocommit=0)
618     def get_block(self, hash):
619         """Return a block's data."""
620         
621         logger.debug("get_block: %s", hash)
622         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
623         if not blocks:
624             raise NameError('Block does not exist')
625         return blocks[0]
626     
627     @backend_method(autocommit=0)
628     def put_block(self, data):
629         """Store a block and return the hash."""
630         
631         logger.debug("put_block: %s", len(data))
632         hashes, absent = self.blocker.block_stor((data,))
633         return binascii.hexlify(hashes[0])
634     
635     @backend_method(autocommit=0)
636     def update_block(self, hash, data, offset=0):
637         """Update a known block and return the hash."""
638         
639         logger.debug("update_block: %s %s %s", hash, len(data), offset)
640         if offset == 0 and len(data) == self.block_size:
641             return self.put_block(data)
642         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
643         return binascii.hexlify(h)
644     
645     # Path functions.
646     
647     def _put_object_node(self, account, container, name):
648         path, parent = self._lookup_container(account, container)
649         path = '/'.join((path, name))
650         node = self.node.node_lookup(path)
651         if node is None:
652             node = self.node.node_create(parent, path)
653         return path, node
654     
655     def _put_path(self, user, parent, path):
656         node = self.node.node_create(parent, path)
657         self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
658         return node
659     
660     def _lookup_account(self, account, create=True):
661         node = self.node.node_lookup(account)
662         if node is None and create:
663             node = self._put_path(account, self.ROOTNODE, account) # User is account.
664         return account, node
665     
666     def _lookup_container(self, account, container):
667         path = '/'.join((account, container))
668         node = self.node.node_lookup(path)
669         if node is None:
670             raise NameError('Container does not exist')
671         return path, node
672     
673     def _lookup_object(self, account, container, name):
674         path = '/'.join((account, container, name))
675         node = self.node.node_lookup(path)
676         if node is None:
677             raise NameError('Object does not exist')
678         return path, node
679     
680     def _get_properties(self, node, until=None):
681         """Return properties until the timestamp given."""
682         
683         before = until if until is not None else inf
684         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
685         if props is None and until is not None:
686             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
687         if props is None:
688             raise NameError('Path does not exist')
689         return props
690     
691     def _get_statistics(self, node, until=None):
692         """Return count, sum of size and latest timestamp of everything under node."""
693         
694         if until is None:
695             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
696         else:
697             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
698         if stats is None:
699             stats = (0, 0, 0)
700         return stats
701     
702     def _get_version(self, node, version=None):
703         if version is None:
704             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
705             if props is None:
706                 raise NameError('Object does not exist')
707         else:
708             props = self.node.version_get_properties(version)
709             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
710                 raise IndexError('Version does not exist')
711         return props
712     
713     def _copy_version(self, user, src_node, src_version, dest_node, dest_hash=None, dest_size=None, dest_cluster=CLUSTER_NORMAL):
714         
715         # Get source serial and size.
716         if src_version is not None:
717             src_props = self._get_version(src_node, src_version)
718             src_version_id = src_props[self.SERIAL]
719             hash = src_props[self.HASH]
720             size = src_props[self.SIZE]
721         else:
722             # Latest or create from scratch.
723             try:
724                 src_props = self._get_version(src_node)
725                 src_version_id = src_props[self.SERIAL]
726                 hash = src_props[self.HASH]
727                 size = src_props[self.SIZE]
728             except NameError:
729                 src_version_id = None
730                 hash = None
731                 size = 0
732         if dest_hash is not None:
733             hash = dest_hash
734         if dest_size is not None:
735             size = dest_size
736         
737         # Move the latest version at destination to CLUSTER_HISTORY and create new.
738         if src_node == dest_node and src_version is None and src_version_id is not None:
739             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
740         else:
741             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
742             if dest_props is not None:
743                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
744         dest_version_id, mtime = self.node.version_create(dest_node, hash, size, src_version_id, user, dest_cluster)
745         
746         return src_version_id, dest_version_id
747     
748     def _get_metadata(self, version):
749         if version is None:
750             return {}
751         return dict(self.node.attribute_get(version))
752     
753     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
754         """Create a new version and store metadata."""
755         
756         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
757         if not replace:
758             if src_version_id is not None:
759                 self.node.attribute_copy(src_version_id, dest_version_id)
760             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
761             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
762         else:
763             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
764         return dest_version_id
765     
766     def _list_limits(self, listing, marker, limit):
767         start = 0
768         if marker:
769             try:
770                 start = listing.index(marker) + 1
771             except ValueError:
772                 pass
773         if not limit or limit > 10000:
774             limit = 10000
775         return start, limit
776     
777     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
778         cont_prefix = path + '/'
779         prefix = cont_prefix + prefix
780         start = cont_prefix + marker if marker else None
781         before = until if until is not None else inf
782         filterq = ','.join(keys) if keys else None
783         
784         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
785         objects.extend([(p, None) for p in prefixes] if virtual else [])
786         objects.sort(key=lambda x: x[0])
787         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
788         
789         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
790         return objects[start:start + limit]
791     
792     # Policy functions.
793     
794     def _check_policy(self, policy):
795         for k in policy.keys():
796             if policy[k] == '':
797                 policy[k] = self.default_policy.get(k)
798         for k, v in policy.iteritems():
799             if k == 'quota':
800                 q = int(v) # May raise ValueError.
801                 if q < 0:
802                     raise ValueError
803             elif k == 'versioning':
804                 if v not in ['auto', 'manual', 'none']:
805                     raise ValueError
806             else:
807                 raise ValueError
808     
809     # Access control functions.
810     
811     def _check_groups(self, groups):
812         # raise ValueError('Bad characters in groups')
813         pass
814     
815     def _check_permissions(self, path, permissions):
816         # raise ValueError('Bad characters in permissions')
817         
818         # Check for existing permissions.
819         paths = self.permissions.access_list(path)
820         if paths:
821             ae = AttributeError()
822             ae.data = paths
823             raise ae
824     
825     def _can_read(self, user, account, container, name):
826         if user == account:
827             return True
828         path = '/'.join((account, container, name))
829         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
830             raise NotAllowedError
831     
832     def _can_write(self, user, account, container, name):
833         if user == account:
834             return True
835         path = '/'.join((account, container, name))
836         if not self.permissions.access_check(path, self.WRITE, user):
837             raise NotAllowedError
838     
839     def _allowed_accounts(self, user):
840         allow = set()
841         for path in self.permissions.access_list_paths(user):
842             allow.add(path.split('/', 1)[0])
843         return sorted(allow)
844     
845     def _allowed_containers(self, user, account):
846         allow = set()
847         for path in self.permissions.access_list_paths(user, account):
848             allow.add(path.split('/', 2)[1])
849         return sorted(allow)