Make backend implementations compatible with the new settings.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import os
35 import time
36 import sqlite3
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, BaseBackend
42 from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
43 from lib.permissions import Permissions, READ, WRITE
44 from lib.policy import Policy
45 from lib.hashfiler import Mapper, Blocker
46
47 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
48
49 inf = float('inf')
50
51
52 logger = logging.getLogger(__name__)
53
54 def backend_method(func=None, autocommit=1):
55     if func is None:
56         def fn(func):
57             return backend_method(func, autocommit)
58         return fn
59
60     if not autocommit:
61         return func
62     def fn(self, *args, **kw):
63         self.con.execute('begin deferred')
64         try:
65             ret = func(self, *args, **kw)
66             self.con.commit()
67             return ret
68         except:
69             self.con.rollback()
70             raise
71     return fn
72
73
74 class ModularBackend(BaseBackend):
75     """A modular backend.
76     
77     Uses modules for SQL functions and storage.
78     """
79     
80     def __init__(self, db, db_options):
81         self.hash_algorithm = 'sha256'
82         self.block_size = 4 * 1024 * 1024 # 4MB
83         
84         self.default_policy = {'quota': 0, 'versioning': 'auto'}
85         
86         basepath = os.path.split(db)[0]
87         if basepath and not os.path.exists(basepath):
88             os.makedirs(basepath)
89         if not os.path.isdir(basepath):
90             raise RuntimeError("Cannot open database at '%s'" % (db,))
91         
92         self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)        
93         
94         params = {'blocksize': self.block_size,
95                   'blockpath': basepath + '/blocks',
96                   'hashtype': self.hash_algorithm}
97         self.blocker = Blocker(**params)
98         
99         params = {'mappath': basepath + '/maps',
100                   'namelen': self.blocker.hashlen}
101         self.mapper = Mapper(**params)
102         
103         params = {'connection': self.con,
104                   'cursor': self.con.cursor()}
105         self.permissions = Permissions(**params)
106         self.policy = Policy(**params)
107         self.node = Node(**params)
108         
109         self.con.commit()
110     
111     @backend_method
112     def list_accounts(self, user, marker=None, limit=10000):
113         """Return a list of accounts the user can access."""
114         
115         logger.debug("list_accounts: %s %s %s", user, marker, limit)
116         allowed = self._allowed_accounts(user)
117         start, limit = self._list_limits(allowed, marker, limit)
118         return allowed[start:start + limit]
119     
120     @backend_method
121     def get_account_meta(self, user, account, until=None):
122         """Return a dictionary with the account metadata."""
123         
124         logger.debug("get_account_meta: %s %s", account, until)
125         path, node = self._lookup_account(account, user == account)
126         if user != account:
127             if until or node is None or account not in self._allowed_accounts(user):
128                 raise NotAllowedError
129         try:
130             props = self._get_properties(node, until)
131             mtime = props[MTIME]
132         except NameError:
133             props = None
134             mtime = until
135         count, bytes, tstamp = self._get_statistics(node, until)
136         tstamp = max(tstamp, mtime)
137         if until is None:
138             modified = tstamp
139         else:
140             modified = self._get_statistics(node)[2] # Overall last modification.
141             modified = max(modified, mtime)
142         
143         if user != account:
144             meta = {'name': account}
145         else:
146             meta = {}
147             if props is not None:
148                 meta.update(dict(self.node.attribute_get(props[SERIAL])))
149             if until is not None:
150                 meta.update({'until_timestamp': tstamp})
151             meta.update({'name': account, 'count': count, 'bytes': bytes})
152         meta.update({'modified': modified})
153         return meta
154     
155     @backend_method
156     def update_account_meta(self, user, account, meta, replace=False):
157         """Update the metadata associated with the account."""
158         
159         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
160         if user != account:
161             raise NotAllowedError
162         path, node = self._lookup_account(account, True)
163         self._put_metadata(user, node, meta, replace, False)
164     
165     @backend_method
166     def get_account_groups(self, user, account):
167         """Return a dictionary with the user groups defined for this account."""
168         
169         logger.debug("get_account_groups: %s", account)
170         if user != account:
171             if account not in self._allowed_accounts(user):
172                 raise NotAllowedError
173             return {}
174         self._lookup_account(account, True)
175         return self.permissions.group_dict(account)
176     
177     @backend_method
178     def update_account_groups(self, user, account, groups, replace=False):
179         """Update the groups associated with the account."""
180         
181         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
182         if user != account:
183             raise NotAllowedError
184         self._lookup_account(account, True)
185         self._check_groups(groups)
186         if replace:
187             self.permissions.group_destroy(account)
188         for k, v in groups.iteritems():
189             if not replace: # If not already deleted.
190                 self.permissions.group_delete(account, k)
191             if v:
192                 self.permissions.group_addmany(account, k, v)
193     
194     @backend_method
195     def put_account(self, user, account):
196         """Create a new account with the given name."""
197         
198         logger.debug("put_account: %s", account)
199         if user != account:
200             raise NotAllowedError
201         node = self.node.node_lookup(account)
202         if node is not None:
203             raise NameError('Account already exists')
204         self._put_path(user, ROOTNODE, account)
205     
206     @backend_method
207     def delete_account(self, user, account):
208         """Delete the account with the given name."""
209         
210         logger.debug("delete_account: %s", account)
211         if user != account:
212             raise NotAllowedError
213         node = self.node.node_lookup(account)
214         if node is None:
215             return
216         if not self.node.node_remove(node):
217             raise IndexError('Account is not empty')
218         self.permissions.group_destroy(account)
219     
220     @backend_method
221     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
222         """Return a list of containers existing under an account."""
223         
224         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
225         if user != account:
226             if until or account not in self._allowed_accounts(user):
227                 raise NotAllowedError
228             allowed = self._allowed_containers(user, account)
229             start, limit = self._list_limits(allowed, marker, limit)
230             return allowed[start:start + limit]
231         if shared:
232             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
233             start, limit = self._list_limits(allowed, marker, limit)
234             return allowed[start:start + limit]
235         node = self.node.node_lookup(account)
236         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
237     
238     @backend_method
239     def get_container_meta(self, user, account, container, until=None):
240         """Return a dictionary with the container metadata."""
241         
242         logger.debug("get_container_meta: %s %s %s", account, container, until)
243         if user != account:
244             if until or container not in self._allowed_containers(user, account):
245                 raise NotAllowedError
246         path, node = self._lookup_container(account, container)
247         props = self._get_properties(node, until)
248         mtime = props[MTIME]
249         count, bytes, tstamp = self._get_statistics(node, until)
250         tstamp = max(tstamp, mtime)
251         if until is None:
252             modified = tstamp
253         else:
254             modified = self._get_statistics(node)[2] # Overall last modification.
255             modified = max(modified, mtime)
256         
257         if user != account:
258             meta = {'name': container}
259         else:
260             meta = dict(self.node.attribute_get(props[SERIAL]))
261             if until is not None:
262                 meta.update({'until_timestamp': tstamp})
263             meta.update({'name': container, 'count': count, 'bytes': bytes})
264         meta.update({'modified': modified})
265         return meta
266     
267     @backend_method
268     def update_container_meta(self, user, account, container, meta, replace=False):
269         """Update the metadata associated with the container."""
270         
271         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
272         if user != account:
273             raise NotAllowedError
274         path, node = self._lookup_container(account, container)
275         self._put_metadata(user, node, meta, replace, False)
276     
277     @backend_method
278     def get_container_policy(self, user, account, container):
279         """Return a dictionary with the container policy."""
280         
281         logger.debug("get_container_policy: %s %s", account, container)
282         if user != account:
283             if container not in self._allowed_containers(user, account):
284                 raise NotAllowedError
285             return {}
286         path = self._lookup_container(account, container)[0]
287         return self.policy.policy_get(path)
288     
289     @backend_method
290     def update_container_policy(self, user, account, container, policy, replace=False):
291         """Update the policy associated with the account."""
292         
293         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
294         if user != account:
295             raise NotAllowedError
296         path = self._lookup_container(account, container)[0]
297         self._check_policy(policy)
298         if replace:
299             for k, v in self.default_policy.iteritems():
300                 if k not in policy:
301                     policy[k] = v
302         self.policy.policy_set(path, policy)
303     
304     @backend_method
305     def put_container(self, user, account, container, policy=None):
306         """Create a new container with the given name."""
307         
308         logger.debug("put_container: %s %s %s", account, container, policy)
309         if user != account:
310             raise NotAllowedError
311         try:
312             path, node = self._lookup_container(account, container)
313         except NameError:
314             pass
315         else:
316             raise NameError('Container already exists')
317         if policy:
318             self._check_policy(policy)
319         path = '/'.join((account, container))
320         self._put_path(user, self._lookup_account(account, True)[1], path)
321         for k, v in self.default_policy.iteritems():
322             if k not in policy:
323                 policy[k] = v
324         self.policy.policy_set(path, policy)
325     
326     @backend_method
327     def delete_container(self, user, account, container, until=None):
328         """Delete/purge the container with the given name."""
329         
330         logger.debug("delete_container: %s %s %s", account, container, until)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         
335         if until is not None:
336             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
337             for v in versions:
338                 self.mapper.map_remv(v)
339             self.node.node_purge_children(node, until, CLUSTER_DELETED)
340             return
341         
342         if self._get_statistics(node)[0] > 0:
343             raise IndexError('Container is not empty')
344         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
345         for v in versions:
346             self.mapper.map_remv(v)
347         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
348         self.node.node_remove(node)
349         self.policy.policy_unset(path)
350     
351     @backend_method
352     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
353         """Return a list of objects existing under a container."""
354         
355         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
356         allowed = []
357         if user != account:
358             if until:
359                 raise NotAllowedError
360             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
361             if not allowed:
362                 raise NotAllowedError
363         else:
364             if shared:
365                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
366                 if not allowed:
367                     return []
368         path, node = self._lookup_container(account, container)
369         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
370     
371     @backend_method
372     def list_object_meta(self, user, account, container, until=None):
373         """Return a list with all the container's object meta keys."""
374         
375         logger.debug("list_object_meta: %s %s %s", account, container, until)
376         allowed = []
377         if user != account:
378             if until:
379                 raise NotAllowedError
380             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381             if not allowed:
382                 raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         before = until if until is not None else inf
385         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
386     
387     @backend_method
388     def get_object_meta(self, user, account, container, name, version=None):
389         """Return a dictionary with the object metadata."""
390         
391         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
392         self._can_read(user, account, container, name)
393         path, node = self._lookup_object(account, container, name)
394         props = self._get_version(node, version)
395         if version is None:
396             modified = props[MTIME]
397         else:
398             modified = self._get_version(node)[MTIME] # Overall last modification.
399         
400         meta = dict(self.node.attribute_get(props[SERIAL]))
401         meta.update({'name': name, 'bytes': props[SIZE]})
402         meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
403         meta.update({'modified': modified, 'modified_by': props[MUSER]})
404         return meta
405     
406     @backend_method
407     def update_object_meta(self, user, account, container, name, meta, replace=False):
408         """Update the metadata associated with the object."""
409         
410         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
411         self._can_write(user, account, container, name)
412         path, node = self._lookup_object(account, container, name)
413         return self._put_metadata(user, node, meta, replace)
414     
415     @backend_method
416     def get_object_permissions(self, user, account, container, name):
417         """Return the path from which this object gets its permissions from,\
418         along with a dictionary containing the permissions."""
419         
420         logger.debug("get_object_permissions: %s %s %s", account, container, name)
421         self._can_read(user, account, container, name)
422         path = self._lookup_object(account, container, name)[0]
423         return self.permissions.access_inherit(path)
424     
425     @backend_method
426     def update_object_permissions(self, user, account, container, name, permissions):
427         """Update the permissions associated with the object."""
428         
429         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
430         if user != account:
431             raise NotAllowedError
432         path = self._lookup_object(account, container, name)[0]
433         self._check_permissions(path, permissions)
434         self.permissions.access_set(path, permissions)
435     
436     @backend_method
437     def get_object_public(self, user, account, container, name):
438         """Return the public URL of the object if applicable."""
439         
440         logger.debug("get_object_public: %s %s %s", account, container, name)
441         self._can_read(user, account, container, name)
442         path = self._lookup_object(account, container, name)[0]
443         if self.permissions.public_check(path):
444             return '/public/' + path
445         return None
446     
447     @backend_method
448     def update_object_public(self, user, account, container, name, public):
449         """Update the public status of the object."""
450         
451         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
452         self._can_write(user, account, container, name)
453         path = self._lookup_object(account, container, name)[0]
454         if not public:
455             self.permissions.public_unset(path)
456         else:
457             self.permissions.public_set(path)
458     
459     @backend_method
460     def get_object_hashmap(self, user, account, container, name, version=None):
461         """Return the object's size and a list with partial hashes."""
462         
463         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
464         self._can_read(user, account, container, name)
465         path, node = self._lookup_object(account, container, name)
466         props = self._get_version(node, version)
467         hashmap = self.mapper.map_retr(props[SERIAL])
468         return props[SIZE], [binascii.hexlify(x) for x in hashmap]
469     
470     @backend_method
471     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
472         """Create/update an object with the specified size and partial hashes."""
473         
474         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
475         if permissions is not None and user != account:
476             raise NotAllowedError
477         self._can_write(user, account, container, name)
478         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
479         if missing:
480             ie = IndexError()
481             ie.data = missing
482             raise ie
483         if permissions is not None:
484             self._check_permissions(path, permissions)
485         path, node = self._put_object_node(account, container, name)
486         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
487         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
488         if not replace_meta and src_version_id is not None:
489             self.node.attribute_copy(src_version_id, dest_version_id)
490         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
491         if permissions is not None:
492             self.permissions.access_set(path, permissions)
493         return dest_version_id
494     
495     @backend_method
496     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
497         """Copy an object's data and metadata."""
498         
499         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
500         if permissions is not None and user != account:
501             raise NotAllowedError
502         self._can_read(user, account, src_container, src_name)
503         self._can_write(user, account, dest_container, dest_name)
504         src_path, src_node = self._lookup_object(account, src_container, src_name)
505         if permissions is not None:
506             self._check_permissions(dest_path, permissions)
507         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
508         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
509         if src_version_id is not None:
510             self._copy_data(src_version_id, dest_version_id)
511         if not replace_meta and src_version_id is not None:
512             self.node.attribute_copy(src_version_id, dest_version_id)
513         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
514         if permissions is not None:
515             self.permissions.access_set(dest_path, permissions)
516         return dest_version_id
517     
518     @backend_method
519     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
520         """Move an object's data and metadata."""
521         
522         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
523         dest_version_id = self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
524         self.delete_object(user, account, src_container, src_name)
525         return dest_version_id
526     
527     @backend_method
528     def delete_object(self, user, account, container, name, until=None):
529         """Delete/purge an object."""
530         
531         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
532         if user != account:
533             raise NotAllowedError
534         
535         if until is not None:
536             path = '/'.join((account, container, name))
537             node = self.node.node_lookup(path)
538             if node is None:
539                 return
540             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
541             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
542             for v in versions:
543                 self.mapper.map_remv(v)
544             self.node.node_purge_children(node, until, CLUSTER_DELETED)
545             try:
546                 props = self._get_version(node)
547             except NameError:
548                 pass
549             else:
550                 self.permissions.access_clear(path)
551             return
552         
553         path, node = self._lookup_object(account, container, name)
554         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
555         self.permissions.access_clear(path)
556     
557     @backend_method
558     def list_versions(self, user, account, container, name):
559         """Return a list of all (version, version_timestamp) tuples for an object."""
560         
561         logger.debug("list_versions: %s %s %s", account, container, name)
562         self._can_read(user, account, container, name)
563         path, node = self._lookup_object(account, container, name)
564         return self.node.node_get_versions(node, ['serial', 'mtime'])
565     
566     @backend_method(autocommit=0)
567     def get_block(self, hash):
568         """Return a block's data."""
569         
570         logger.debug("get_block: %s", hash)
571         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
572         if not blocks:
573             raise NameError('Block does not exist')
574         return blocks[0]
575     
576     @backend_method(autocommit=0)
577     def put_block(self, data):
578         """Store a block and return the hash."""
579         
580         logger.debug("put_block: %s", len(data))
581         hashes, absent = self.blocker.block_stor((data,))
582         return binascii.hexlify(hashes[0])
583     
584     @backend_method(autocommit=0)
585     def update_block(self, hash, data, offset=0):
586         """Update a known block and return the hash."""
587         
588         logger.debug("update_block: %s %s %s", hash, len(data), offset)
589         if offset == 0 and len(data) == self.block_size:
590             return self.put_block(data)
591         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
592         return binascii.hexlify(h)
593     
594     # Path functions.
595     
596     def _put_object_node(self, account, container, name):
597         path, parent = self._lookup_container(account, container)
598         path = '/'.join((path, name))
599         node = self.node.node_lookup(path)
600         if node is None:
601             node = self.node.node_create(parent, path)
602         return path, node
603     
604     def _put_path(self, user, parent, path):
605         node = self.node.node_create(parent, path)
606         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
607         return node
608     
609     def _lookup_account(self, account, create=True):
610         node = self.node.node_lookup(account)
611         if node is None and create:
612             node = self._put_path(account, ROOTNODE, account) # User is account.
613         return account, node
614     
615     def _lookup_container(self, account, container):
616         path = '/'.join((account, container))
617         node = self.node.node_lookup(path)
618         if node is None:
619             raise NameError('Container does not exist')
620         return path, node
621     
622     def _lookup_object(self, account, container, name):
623         path = '/'.join((account, container, name))
624         node = self.node.node_lookup(path)
625         if node is None:
626             raise NameError('Object does not exist')
627         return path, node
628     
629     def _get_properties(self, node, until=None):
630         """Return properties until the timestamp given."""
631         
632         before = until if until is not None else inf
633         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
634         if props is None and until is not None:
635             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
636         if props is None:
637             raise NameError('Path does not exist')
638         return props
639     
640     def _get_statistics(self, node, until=None):
641         """Return count, sum of size and latest timestamp of everything under node."""
642         
643         if until is None:
644             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
645         else:
646             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
647         if stats is None:
648             stats = (0, 0, 0)
649         return stats
650     
651     def _get_version(self, node, version=None):
652         if version is None:
653             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
654             if props is None:
655                 raise NameError('Object does not exist')
656         else:
657             props = self.node.version_get_properties(version)
658             if props is None or props[CLUSTER] == CLUSTER_DELETED:
659                 raise IndexError('Version does not exist')
660         return props
661     
662     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
663         
664         # Get source serial and size.
665         if src_version is not None:
666             src_props = self._get_version(src_node, src_version)
667             src_version_id = src_props[SERIAL]
668             size = src_props[SIZE]
669         else:
670             # Latest or create from scratch.
671             try:
672                 src_props = self._get_version(src_node)
673                 src_version_id = src_props[SERIAL]
674                 size = src_props[SIZE]
675             except NameError:
676                 src_version_id = None
677                 size = 0
678         if dest_size is not None:
679             size = dest_size
680         
681         # Move the latest version at destination to CLUSTER_HISTORY and create new.
682         if src_node == dest_node and src_version is None and src_version_id is not None:
683             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
684         else:
685             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
686             if dest_props is not None:
687                 self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
688         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
689         
690         return src_version_id, dest_version_id
691     
692     def _copy_data(self, src_version, dest_version):
693         hashmap = self.mapper.map_retr(src_version)
694         self.mapper.map_stor(dest_version, hashmap)
695     
696     def _get_metadata(self, version):
697         if version is None:
698             return {}
699         return dict(self.node.attribute_get(version))
700     
701     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
702         """Create a new version and store metadata."""
703         
704         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
705         if not replace:
706             if src_version_id is not None:
707                 self.node.attribute_copy(src_version_id, dest_version_id)
708             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
709             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
710         else:
711             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
712         if copy_data and src_version_id is not None:
713             self._copy_data(src_version_id, dest_version_id)
714         return dest_version_id
715     
716     def _list_limits(self, listing, marker, limit):
717         start = 0
718         if marker:
719             try:
720                 start = listing.index(marker) + 1
721             except ValueError:
722                 pass
723         if not limit or limit > 10000:
724             limit = 10000
725         return start, limit
726     
727     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
728         cont_prefix = path + '/'
729         prefix = cont_prefix + prefix
730         start = cont_prefix + marker if marker else None
731         before = until if until is not None else inf
732         filterq = ','.join(keys) if keys else None
733         
734         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
735         objects.extend([(p, None) for p in prefixes] if virtual else [])
736         objects.sort()
737         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
738         
739         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
740         return objects[start:start + limit]
741     
742     # Policy functions.
743     
744     def _check_policy(self, policy):
745         for k in policy.keys():
746             if policy[k] == '':
747                 policy[k] = self.default_policy.get(k)
748         for k, v in policy.iteritems():
749             if k == 'quota':
750                 q = int(v) # May raise ValueError.
751                 if q < 0:
752                     raise ValueError
753             elif k == 'versioning':
754                 if v not in ['auto', 'manual', 'none']:
755                     raise ValueError
756             else:
757                 raise ValueError
758     
759     # Access control functions.
760     
761     def _check_groups(self, groups):
762         # raise ValueError('Bad characters in groups')
763         pass
764     
765     def _check_permissions(self, path, permissions):
766         # raise ValueError('Bad characters in permissions')
767         
768         # Check for existing permissions.
769         paths = self.permissions.access_list(path)
770         if paths:
771             ae = AttributeError()
772             ae.data = paths
773             raise ae
774     
775     def _can_read(self, user, account, container, name):
776         if user == account:
777             return True
778         path = '/'.join((account, container, name))
779         if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
780             raise NotAllowedError
781     
782     def _can_write(self, user, account, container, name):
783         if user == account:
784             return True
785         path = '/'.join((account, container, name))
786         if not self.permissions.access_check(path, WRITE, user):
787             raise NotAllowedError
788     
789     def _allowed_accounts(self, user):
790         allow = set()
791         for path in self.permissions.access_list_paths(user):
792             allow.add(path.split('/', 1)[0])
793         return sorted(allow)
794     
795     def _allowed_containers(self, user, account):
796         allow = set()
797         for path in self.permissions.access_list_paths(user, account):
798             allow.add(path.split('/', 2)[1])
799         return sorted(allow)