backend components in SQLAlchemy: Progress IV
[pithos] / pithos / backends / modular_alchemy.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import os
35 import time
36 import sqlite3
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, BaseBackend
42 from lib_alchemy.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
43 from lib_alchemy.permissions import Permissions, READ, WRITE
44 from lib_alchemy.policy import Policy
45 from sqlalchemy import create_engine
46 from lib.hashfiler import Mapper, Blocker
47
48 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
49
50 inf = float('inf')
51
52
53 logger = logging.getLogger(__name__)
54
55 def backend_method(func=None, autocommit=1):
56     if func is None:
57         def fn(func):
58             return backend_method(func, autocommit)
59         return fn
60
61     if not autocommit:
62         return func
63     def fn(self, *args, **kw):
64         trans = self.con.begin()
65         try:
66             ret = func(self, *args, **kw)
67             trans.commit()
68             return ret
69         except:
70             trans.rollback()
71             raise
72     return fn
73
74
75 class ModularBackend(BaseBackend):
76     """A modular backend.
77     
78     Uses modules for SQL functions and storage.
79     """
80     
81     def __init__(self, db, db_options):
82         self.hash_algorithm = 'sha256'
83         self.block_size = 4 * 1024 * 1024 # 4MB
84         
85         self.default_policy = {'quota': 0, 'versioning': 'auto'}
86         
87         basepath = os.path.split(db)[0]
88         if basepath and not os.path.exists(basepath):
89             os.makedirs(basepath)
90         if not os.path.isdir(basepath):
91             raise RuntimeError("Cannot open database at '%s'" % (db,))
92         
93         connection_str = 'postgresql://%s:%s@%s/%s' % db_options
94         engine = create_engine(connection_str, echo=True)
95         self.con = engine.connect()
96         
97         params = {'blocksize': self.block_size,
98                   'blockpath': basepath + '/blocks',
99                   'hashtype': self.hash_algorithm}
100         self.blocker = Blocker(**params)
101         
102         params = {'mappath': basepath + '/maps',
103                   'namelen': self.blocker.hashlen}
104         self.mapper = Mapper(**params)
105         
106         params = {'connection': self.con,
107                   'engine': engine}
108         self.permissions = Permissions(**params)
109         self.policy = Policy(**params)
110         self.node = Node(**params)
111     
112     @backend_method
113     def list_accounts(self, user, marker=None, limit=10000):
114         """Return a list of accounts the user can access."""
115         
116         logger.debug("list_accounts: %s %s %s", user, marker, limit)
117         allowed = self._allowed_accounts(user)
118         start, limit = self._list_limits(allowed, marker, limit)
119         return allowed[start:start + limit]
120     
121     @backend_method
122     def get_account_meta(self, user, account, until=None):
123         """Return a dictionary with the account metadata."""
124         
125         logger.debug("get_account_meta: %s %s", account, until)
126         path, node = self._lookup_account(account, user == account)
127         if user != account:
128             if until or node is None or account not in self._allowed_accounts(user):
129                 raise NotAllowedError
130         try:
131             props = self._get_properties(node, until)
132             mtime = props[MTIME]
133         except NameError:
134             props = None
135             mtime = until
136         count, bytes, tstamp = self._get_statistics(node, until)
137         tstamp = max(tstamp, mtime)
138         if until is None:
139             modified = tstamp
140         else:
141             modified = self._get_statistics(node)[2] # Overall last modification.
142             modified = max(modified, mtime)
143         
144         if user != account:
145             meta = {'name': account}
146         else:
147             meta = {}
148             if props is not None:
149                 meta.update(dict(self.node.attribute_get(props[SERIAL])))
150             if until is not None:
151                 meta.update({'until_timestamp': tstamp})
152             meta.update({'name': account, 'count': count, 'bytes': bytes})
153         meta.update({'modified': modified})
154         return meta
155     
156     @backend_method
157     def update_account_meta(self, user, account, meta, replace=False):
158         """Update the metadata associated with the account."""
159         
160         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
161         if user != account:
162             raise NotAllowedError
163         path, node = self._lookup_account(account, True)
164         self._put_metadata(user, node, meta, replace, False)
165     
166     @backend_method
167     def get_account_groups(self, user, account):
168         """Return a dictionary with the user groups defined for this account."""
169         
170         logger.debug("get_account_groups: %s", account)
171         if user != account:
172             if account not in self._allowed_accounts(user):
173                 raise NotAllowedError
174             return {}
175         self._lookup_account(account, True)
176         return self.permissions.group_dict(account)
177     
178     @backend_method
179     def update_account_groups(self, user, account, groups, replace=False):
180         """Update the groups associated with the account."""
181         
182         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
183         if user != account:
184             raise NotAllowedError
185         self._lookup_account(account, True)
186         self._check_groups(groups)
187         if replace:
188             self.permissions.group_destroy(account)
189         for k, v in groups.iteritems():
190             if not replace: # If not already deleted.
191                 self.permissions.group_delete(account, k)
192             if v:
193                 self.permissions.group_addmany(account, k, v)
194     
195     @backend_method
196     def put_account(self, user, account):
197         """Create a new account with the given name."""
198         
199         logger.debug("put_account: %s", account)
200         if user != account:
201             raise NotAllowedError
202         node = self.node.node_lookup(account)
203         if node is not None:
204             raise NameError('Account already exists')
205         self._put_path(user, ROOTNODE, account)
206     
207     @backend_method
208     def delete_account(self, user, account):
209         """Delete the account with the given name."""
210         
211         logger.debug("delete_account: %s", account)
212         if user != account:
213             raise NotAllowedError
214         node = self.node.node_lookup(account)
215         if node is None:
216             return
217         if not self.node.node_remove(node):
218             raise IndexError('Account is not empty')
219         self.permissions.group_destroy(account)
220     
221     @backend_method
222     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
223         """Return a list of containers existing under an account."""
224         
225         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
226         if user != account:
227             if until or account not in self._allowed_accounts(user):
228                 raise NotAllowedError
229             allowed = self._allowed_containers(user, account)
230             start, limit = self._list_limits(allowed, marker, limit)
231             return allowed[start:start + limit]
232         if shared:
233             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
234             start, limit = self._list_limits(allowed, marker, limit)
235             return allowed[start:start + limit]
236         node = self.node.node_lookup(account)
237         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
238     
239     @backend_method
240     def get_container_meta(self, user, account, container, until=None):
241         """Return a dictionary with the container metadata."""
242         
243         logger.debug("get_container_meta: %s %s %s", account, container, until)
244         if user != account:
245             if until or container not in self._allowed_containers(user, account):
246                 raise NotAllowedError
247         path, node = self._lookup_container(account, container)
248         props = self._get_properties(node, until)
249         mtime = props[MTIME]
250         count, bytes, tstamp = self._get_statistics(node, until)
251         tstamp = max(tstamp, mtime)
252         if until is None:
253             modified = tstamp
254         else:
255             modified = self._get_statistics(node)[2] # Overall last modification.
256             modified = max(modified, mtime)
257         
258         if user != account:
259             meta = {'name': container}
260         else:
261             meta = dict(self.node.attribute_get(props[SERIAL]))
262             if until is not None:
263                 meta.update({'until_timestamp': tstamp})
264             meta.update({'name': container, 'count': count, 'bytes': bytes})
265         meta.update({'modified': modified})
266         return meta
267     
268     @backend_method
269     def update_container_meta(self, user, account, container, meta, replace=False):
270         """Update the metadata associated with the container."""
271         
272         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
273         if user != account:
274             raise NotAllowedError
275         path, node = self._lookup_container(account, container)
276         self._put_metadata(user, node, meta, replace, False)
277     
278     @backend_method
279     def get_container_policy(self, user, account, container):
280         """Return a dictionary with the container policy."""
281         
282         logger.debug("get_container_policy: %s %s", account, container)
283         if user != account:
284             if container not in self._allowed_containers(user, account):
285                 raise NotAllowedError
286             return {}
287         path = self._lookup_container(account, container)[0]
288         return self.policy.policy_get(path)
289     
290     @backend_method
291     def update_container_policy(self, user, account, container, policy, replace=False):
292         """Update the policy associated with the account."""
293         
294         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
295         if user != account:
296             raise NotAllowedError
297         path = self._lookup_container(account, container)[0]
298         self._check_policy(policy)
299         if replace:
300             for k, v in self.default_policy.iteritems():
301                 if k not in policy:
302                     policy[k] = v
303         self.policy.policy_set(path, policy)
304     
305     @backend_method
306     def put_container(self, user, account, container, policy=None):
307         """Create a new container with the given name."""
308         
309         logger.debug("put_container: %s %s %s", account, container, policy)
310         if user != account:
311             raise NotAllowedError
312         try:
313             path, node = self._lookup_container(account, container)
314         except NameError:
315             pass
316         else:
317             raise NameError('Container already exists')
318         if policy:
319             self._check_policy(policy)
320         path = '/'.join((account, container))
321         self._put_path(user, self._lookup_account(account, True)[1], path)
322         for k, v in self.default_policy.iteritems():
323             if k not in policy:
324                 policy[k] = v
325         self.policy.policy_set(path, policy)
326     
327     @backend_method
328     def delete_container(self, user, account, container, until=None):
329         """Delete/purge the container with the given name."""
330         
331         logger.debug("delete_container: %s %s %s", account, container, until)
332         if user != account:
333             raise NotAllowedError
334         path, node = self._lookup_container(account, container)
335         
336         if until is not None:
337             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
338             for v in versions:
339                 self.mapper.map_remv(v)
340             self.node.node_purge_children(node, until, CLUSTER_DELETED)
341             return
342         
343         if self._get_statistics(node)[0] > 0:
344             raise IndexError('Container is not empty')
345         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
346         for v in versions:
347             self.mapper.map_remv(v)
348         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
349         self.node.node_remove(node)
350         self.policy.policy_unset(path)
351     
352     @backend_method
353     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
354         """Return a list of objects existing under a container."""
355         
356         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
357         allowed = []
358         if user != account:
359             if until:
360                 raise NotAllowedError
361             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
362             if not allowed:
363                 raise NotAllowedError
364         else:
365             if shared:
366                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
367                 if not allowed:
368                     return []
369         path, node = self._lookup_container(account, container)
370         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
371     
372     @backend_method
373     def list_object_meta(self, user, account, container, until=None):
374         """Return a list with all the container's object meta keys."""
375         
376         logger.debug("list_object_meta: %s %s %s", account, container, until)
377         allowed = []
378         if user != account:
379             if until:
380                 raise NotAllowedError
381             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
382             if not allowed:
383                 raise NotAllowedError
384         path, node = self._lookup_container(account, container)
385         before = until if until is not None else inf
386         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
387     
388     @backend_method
389     def get_object_meta(self, user, account, container, name, version=None):
390         """Return a dictionary with the object metadata."""
391         
392         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
393         self._can_read(user, account, container, name)
394         path, node = self._lookup_object(account, container, name)
395         props = self._get_version(node, version)
396         if version is None:
397             modified = props[MTIME]
398         else:
399             modified = self._get_version(node)[MTIME] # Overall last modification.
400         
401         meta = dict(self.node.attribute_get(props[SERIAL]))
402         meta.update({'name': name, 'bytes': props[SIZE]})
403         meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
404         meta.update({'modified': modified, 'modified_by': props[MUSER]})
405         return meta
406     
407     @backend_method
408     def update_object_meta(self, user, account, container, name, meta, replace=False):
409         """Update the metadata associated with the object."""
410         
411         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
412         self._can_write(user, account, container, name)
413         path, node = self._lookup_object(account, container, name)
414         self._put_metadata(user, node, meta, replace)
415     
416     @backend_method
417     def get_object_permissions(self, user, account, container, name):
418         """Return the path from which this object gets its permissions from,\
419         along with a dictionary containing the permissions."""
420         
421         logger.debug("get_object_permissions: %s %s %s", account, container, name)
422         self._can_read(user, account, container, name)
423         path = self._lookup_object(account, container, name)[0]
424         return self.permissions.access_inherit(path)
425     
426     @backend_method
427     def update_object_permissions(self, user, account, container, name, permissions):
428         """Update the permissions associated with the object."""
429         
430         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
431         if user != account:
432             raise NotAllowedError
433         path = self._lookup_object(account, container, name)[0]
434         self._check_permissions(path, permissions)
435         self.permissions.access_set(path, permissions)
436     
437     @backend_method
438     def get_object_public(self, user, account, container, name):
439         """Return the public URL of the object if applicable."""
440         
441         logger.debug("get_object_public: %s %s %s", account, container, name)
442         self._can_read(user, account, container, name)
443         path = self._lookup_object(account, container, name)[0]
444         if self.permissions.public_check(path):
445             return '/public/' + path
446         return None
447     
448     @backend_method
449     def update_object_public(self, user, account, container, name, public):
450         """Update the public status of the object."""
451         
452         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
453         self._can_write(user, account, container, name)
454         path = self._lookup_object(account, container, name)[0]
455         if not public:
456             self.permissions.public_unset(path)
457         else:
458             self.permissions.public_set(path)
459     
460     @backend_method
461     def get_object_hashmap(self, user, account, container, name, version=None):
462         """Return the object's size and a list with partial hashes."""
463         
464         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
465         self._can_read(user, account, container, name)
466         path, node = self._lookup_object(account, container, name)
467         props = self._get_version(node, version)
468         hashmap = self.mapper.map_retr(props[SERIAL])
469         return props[SIZE], [binascii.hexlify(x) for x in hashmap]
470     
471     @backend_method
472     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
473         """Create/update an object with the specified size and partial hashes."""
474         
475         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
476         if permissions is not None and user != account:
477             raise NotAllowedError
478         self._can_write(user, account, container, name)
479         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
480         if missing:
481             ie = IndexError()
482             ie.data = missing
483             raise ie
484         if permissions is not None:
485             self._check_permissions(path, permissions)
486         path, node = self._put_object_node(account, container, name)
487         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
488         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
489         if not replace_meta and src_version_id is not None:
490             self.node.attribute_copy(src_version_id, dest_version_id)
491         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
492         if permissions is not None:
493             self.permissions.access_set(path, permissions)
494     
495     @backend_method
496     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
497         """Copy an object's data and metadata."""
498         
499         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
500         if permissions is not None and user != account:
501             raise NotAllowedError
502         self._can_read(user, account, src_container, src_name)
503         self._can_write(user, account, dest_container, dest_name)
504         src_path, src_node = self._lookup_object(account, src_container, src_name)
505         if permissions is not None:
506             self._check_permissions(dest_path, permissions)
507         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
508         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
509         if src_version_id is not None:
510             self._copy_data(src_version_id, dest_version_id)
511         if not replace_meta and src_version_id is not None:
512             self.node.attribute_copy(src_version_id, dest_version_id)
513         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
514         if permissions is not None:
515             self.permissions.access_set(dest_path, permissions)
516     
517     @backend_method
518     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
519         """Move an object's data and metadata."""
520         
521         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
522         self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
523         self.delete_object(user, account, src_container, src_name)
524     
525     @backend_method
526     def delete_object(self, user, account, container, name, until=None):
527         """Delete/purge an object."""
528         
529         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
530         if user != account:
531             raise NotAllowedError
532         
533         if until is not None:
534             path = '/'.join((account, container, name))
535             node = self.node.node_lookup(path)
536             if node is None:
537                 return
538             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
539             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
540             for v in versions:
541                 self.mapper.map_remv(v)
542             self.node.node_purge_children(node, until, CLUSTER_DELETED)
543             try:
544                 props = self._get_version(node)
545             except NameError:
546                 pass
547             else:
548                 self.permissions.access_clear(path)
549             return
550         
551         path, node = self._lookup_object(account, container, name)
552         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
553         self.permissions.access_clear(path)
554     
555     @backend_method
556     def list_versions(self, user, account, container, name):
557         """Return a list of all (version, version_timestamp) tuples for an object."""
558         
559         logger.debug("list_versions: %s %s %s", account, container, name)
560         self._can_read(user, account, container, name)
561         path, node = self._lookup_object(account, container, name)
562         return self.node.node_get_versions(node, ['serial', 'mtime'])
563     
564     @backend_method(autocommit=0)
565     def get_block(self, hash):
566         """Return a block's data."""
567         
568         logger.debug("get_block: %s", hash)
569         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
570         if not blocks:
571             raise NameError('Block does not exist')
572         return blocks[0]
573     
574     @backend_method(autocommit=0)
575     def put_block(self, data):
576         """Store a block and return the hash."""
577         
578         logger.debug("put_block: %s", len(data))
579         hashes, absent = self.blocker.block_stor((data,))
580         return binascii.hexlify(hashes[0])
581     
582     @backend_method(autocommit=0)
583     def update_block(self, hash, data, offset=0):
584         """Update a known block and return the hash."""
585         
586         logger.debug("update_block: %s %s %s", hash, len(data), offset)
587         if offset == 0 and len(data) == self.block_size:
588             return self.put_block(data)
589         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
590         return binascii.hexlify(h)
591     
592     # Path functions.
593     
594     def _put_object_node(self, account, container, name):
595         path, parent = self._lookup_container(account, container)
596         path = '/'.join((path, name))
597         node = self.node.node_lookup(path)
598         if node is None:
599             node = self.node.node_create(parent, path)
600         return path, node
601     
602     def _put_path(self, user, parent, path):
603         node = self.node.node_create(parent, path)
604         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
605         return node
606     
607     def _lookup_account(self, account, create=True):
608         node = self.node.node_lookup(account)
609         if node is None and create:
610             node = self._put_path(account, ROOTNODE, account) # User is account.
611         return account, node
612     
613     def _lookup_container(self, account, container):
614         path = '/'.join((account, container))
615         node = self.node.node_lookup(path)
616         if node is None:
617             raise NameError('Container does not exist')
618         return path, node
619     
620     def _lookup_object(self, account, container, name):
621         path = '/'.join((account, container, name))
622         node = self.node.node_lookup(path)
623         if node is None:
624             raise NameError('Object does not exist')
625         return path, node
626     
627     def _get_properties(self, node, until=None):
628         """Return properties until the timestamp given."""
629         
630         before = until if until is not None else inf
631         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
632         if props is None and until is not None:
633             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
634         if props is None:
635             raise NameError('Path does not exist')
636         return props
637     
638     def _get_statistics(self, node, until=None):
639         """Return count, sum of size and latest timestamp of everything under node."""
640         
641         if until is None:
642             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
643         else:
644             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
645         if stats is None:
646             stats = (0, 0, 0)
647         return stats
648     
649     def _get_version(self, node, version=None):
650         if version is None:
651             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
652             if props is None:
653                 raise NameError('Object does not exist')
654         else:
655             props = self.node.version_get_properties(version)
656             if props is None or props[CLUSTER] == CLUSTER_DELETED:
657                 raise IndexError('Version does not exist')
658         return props
659     
660     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
661         
662         # Get source serial and size.
663         if src_version is not None:
664             src_props = self._get_version(src_node, src_version)
665             src_version_id = src_props[SERIAL]
666             size = src_props[SIZE]
667         else:
668             # Latest or create from scratch.
669             try:
670                 src_props = self._get_version(src_node)
671                 src_version_id = src_props[SERIAL]
672                 size = src_props[SIZE]
673             except NameError:
674                 src_version_id = None
675                 size = 0
676         if dest_size is not None:
677             size = dest_size
678         
679         # Move the latest version at destination to CLUSTER_HISTORY and create new.
680         if src_node == dest_node and src_version is None and src_version_id is not None:
681             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
682         else:
683             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
684             if dest_props is not None:
685                 self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
686         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
687         
688         return src_version_id, dest_version_id
689     
690     def _copy_data(self, src_version, dest_version):
691         hashmap = self.mapper.map_retr(src_version)
692         self.mapper.map_stor(dest_version, hashmap)
693     
694     def _get_metadata(self, version):
695         if version is None:
696             return {}
697         return dict(self.node.attribute_get(version))
698     
699     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
700         """Create a new version and store metadata."""
701         
702         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
703         if not replace:
704             if src_version_id is not None:
705                 self.node.attribute_copy(src_version_id, dest_version_id)
706             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
707             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
708         else:
709             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
710         if copy_data and src_version_id is not None:
711             self._copy_data(src_version_id, dest_version_id)
712     
713     def _list_limits(self, listing, marker, limit):
714         start = 0
715         if marker:
716             try:
717                 start = listing.index(marker) + 1
718             except ValueError:
719                 pass
720         if not limit or limit > 10000:
721             limit = 10000
722         return start, limit
723     
724     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
725         cont_prefix = path + '/'
726         prefix = cont_prefix + prefix
727         start = cont_prefix + marker if marker else None
728         before = until if until is not None else inf
729         filterq = ','.join(keys) if keys else None
730         
731         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
732         objects.extend([(p, None) for p in prefixes] if virtual else [])
733         objects.sort()
734         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
735         
736         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
737         return objects[start:start + limit]
738     
739     # Policy functions.
740     
741     def _check_policy(self, policy):
742         for k in policy.keys():
743             if policy[k] == '':
744                 policy[k] = self.default_policy.get(k)
745         for k, v in policy.iteritems():
746             if k == 'quota':
747                 q = int(v) # May raise ValueError.
748                 if q < 0:
749                     raise ValueError
750             elif k == 'versioning':
751                 if v not in ['auto', 'manual', 'none']:
752                     raise ValueError
753             else:
754                 raise ValueError
755     
756     # Access control functions.
757     
758     def _check_groups(self, groups):
759         # raise ValueError('Bad characters in groups')
760         pass
761     
762     def _check_permissions(self, path, permissions):
763         # raise ValueError('Bad characters in permissions')
764         
765         # Check for existing permissions.
766         paths = self.permissions.access_list(path)
767         if paths:
768             ae = AttributeError()
769             ae.data = paths
770             raise ae
771     
772     def _can_read(self, user, account, container, name):
773         if user == account:
774             return True
775         path = '/'.join((account, container, name))
776         if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
777             raise NotAllowedError
778     
779     def _can_write(self, user, account, container, name):
780         if user == account:
781             return True
782         path = '/'.join((account, container, name))
783         if not self.permissions.access_check(path, WRITE, user):
784             raise NotAllowedError
785     
786     def _allowed_accounts(self, user):
787         allow = set()
788         for path in self.permissions.access_list_paths(user):
789             allow.add(path.split('/', 1)[0])
790         return sorted(allow)
791     
792     def _allowed_containers(self, user, account):
793         allow = set()
794         for path in self.permissions.access_list_paths(user, account):
795             allow.add(path.split('/', 2)[1])
796         return sorted(allow)