Report allowed actions in cross-user object requests, with the 'X-Object-Allowed...
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import sqlite3
38 import logging
39 import hashlib
40 import binascii
41
42 from base import NotAllowedError, BaseBackend
43 from lib.hashfiler import Mapper, Blocker
44
45 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46
47 inf = float('inf')
48
49
50 logger = logging.getLogger(__name__)
51
52 def backend_method(func=None, autocommit=1):
53     if func is None:
54         def fn(func):
55             return backend_method(func, autocommit)
56         return fn
57
58     if not autocommit:
59         return func
60     def fn(self, *args, **kw):
61         self.wrapper.execute()
62         try:
63             ret = func(self, *args, **kw)
64             self.wrapper.commit()
65             return ret
66         except:
67             self.wrapper.rollback()
68             raise
69     return fn
70
71
72 class ModularBackend(BaseBackend):
73     """A modular backend.
74     
75     Uses modules for SQL functions and hashfiler for storage.
76     """
77     
78     def __init__(self, mod, path, db):
79         self.hash_algorithm = 'sha256'
80         self.block_size = 4 * 1024 * 1024 # 4MB
81         
82         self.default_policy = {'quota': 0, 'versioning': 'auto'}
83         
84         if path and not os.path.exists(path):
85             os.makedirs(path)
86         if not os.path.isdir(path):
87             raise RuntimeError("Cannot open path '%s'" % (path,))
88         
89         __import__(mod)
90         self.mod = sys.modules[mod]
91         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92         
93         params = {'blocksize': self.block_size,
94                   'blockpath': os.path.join(path + '/blocks'),
95                   'hashtype': self.hash_algorithm}
96         self.blocker = Blocker(**params)
97         
98         params = {'mappath': os.path.join(path + '/maps'),
99                   'namelen': self.blocker.hashlen}
100         self.mapper = Mapper(**params)
101         
102         params = {'wrapper': self.wrapper}
103         self.permissions = self.mod.permissions.Permissions(**params)
104         for x in ['READ', 'WRITE']:
105             setattr(self, x, getattr(self.mod.permissions, x))
106         self.policy = self.mod.policy.Policy(**params)
107         self.node = self.mod.node.Node(**params)
108         for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
109             setattr(self, x, getattr(self.mod.node, x))
110     
111     @backend_method
112     def list_accounts(self, user, marker=None, limit=10000):
113         """Return a list of accounts the user can access."""
114         
115         logger.debug("list_accounts: %s %s %s", user, marker, limit)
116         allowed = self._allowed_accounts(user)
117         start, limit = self._list_limits(allowed, marker, limit)
118         return allowed[start:start + limit]
119     
120     @backend_method
121     def get_account_meta(self, user, account, until=None):
122         """Return a dictionary with the account metadata."""
123         
124         logger.debug("get_account_meta: %s %s", account, until)
125         path, node = self._lookup_account(account, user == account)
126         if user != account:
127             if until or node is None or account not in self._allowed_accounts(user):
128                 raise NotAllowedError
129         try:
130             props = self._get_properties(node, until)
131             mtime = props[self.MTIME]
132         except NameError:
133             props = None
134             mtime = until
135         count, bytes, tstamp = self._get_statistics(node, until)
136         tstamp = max(tstamp, mtime)
137         if until is None:
138             modified = tstamp
139         else:
140             modified = self._get_statistics(node)[2] # Overall last modification.
141             modified = max(modified, mtime)
142         
143         if user != account:
144             meta = {'name': account}
145         else:
146             meta = {}
147             if props is not None:
148                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
149             if until is not None:
150                 meta.update({'until_timestamp': tstamp})
151             meta.update({'name': account, 'count': count, 'bytes': bytes})
152         meta.update({'modified': modified})
153         return meta
154     
155     @backend_method
156     def update_account_meta(self, user, account, meta, replace=False):
157         """Update the metadata associated with the account."""
158         
159         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
160         if user != account:
161             raise NotAllowedError
162         path, node = self._lookup_account(account, True)
163         self._put_metadata(user, node, meta, replace, False)
164     
165     @backend_method
166     def get_account_groups(self, user, account):
167         """Return a dictionary with the user groups defined for this account."""
168         
169         logger.debug("get_account_groups: %s", account)
170         if user != account:
171             if account not in self._allowed_accounts(user):
172                 raise NotAllowedError
173             return {}
174         self._lookup_account(account, True)
175         return self.permissions.group_dict(account)
176     
177     @backend_method
178     def update_account_groups(self, user, account, groups, replace=False):
179         """Update the groups associated with the account."""
180         
181         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
182         if user != account:
183             raise NotAllowedError
184         self._lookup_account(account, True)
185         self._check_groups(groups)
186         if replace:
187             self.permissions.group_destroy(account)
188         for k, v in groups.iteritems():
189             if not replace: # If not already deleted.
190                 self.permissions.group_delete(account, k)
191             if v:
192                 self.permissions.group_addmany(account, k, v)
193     
194     @backend_method
195     def put_account(self, user, account):
196         """Create a new account with the given name."""
197         
198         logger.debug("put_account: %s", account)
199         if user != account:
200             raise NotAllowedError
201         node = self.node.node_lookup(account)
202         if node is not None:
203             raise NameError('Account already exists')
204         self._put_path(user, self.ROOTNODE, account)
205     
206     @backend_method
207     def delete_account(self, user, account):
208         """Delete the account with the given name."""
209         
210         logger.debug("delete_account: %s", account)
211         if user != account:
212             raise NotAllowedError
213         node = self.node.node_lookup(account)
214         if node is None:
215             return
216         if not self.node.node_remove(node):
217             raise IndexError('Account is not empty')
218         self.permissions.group_destroy(account)
219     
220     @backend_method
221     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
222         """Return a list of containers existing under an account."""
223         
224         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
225         if user != account:
226             if until or account not in self._allowed_accounts(user):
227                 raise NotAllowedError
228             allowed = self._allowed_containers(user, account)
229             start, limit = self._list_limits(allowed, marker, limit)
230             return allowed[start:start + limit]
231         if shared:
232             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
233             start, limit = self._list_limits(allowed, marker, limit)
234             return allowed[start:start + limit]
235         node = self.node.node_lookup(account)
236         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
237     
238     @backend_method
239     def get_container_meta(self, user, account, container, until=None):
240         """Return a dictionary with the container metadata."""
241         
242         logger.debug("get_container_meta: %s %s %s", account, container, until)
243         if user != account:
244             if until or container not in self._allowed_containers(user, account):
245                 raise NotAllowedError
246         path, node = self._lookup_container(account, container)
247         props = self._get_properties(node, until)
248         mtime = props[self.MTIME]
249         count, bytes, tstamp = self._get_statistics(node, until)
250         tstamp = max(tstamp, mtime)
251         if until is None:
252             modified = tstamp
253         else:
254             modified = self._get_statistics(node)[2] # Overall last modification.
255             modified = max(modified, mtime)
256         
257         if user != account:
258             meta = {'name': container}
259         else:
260             meta = dict(self.node.attribute_get(props[self.SERIAL]))
261             if until is not None:
262                 meta.update({'until_timestamp': tstamp})
263             meta.update({'name': container, 'count': count, 'bytes': bytes})
264         meta.update({'modified': modified})
265         return meta
266     
267     @backend_method
268     def update_container_meta(self, user, account, container, meta, replace=False):
269         """Update the metadata associated with the container."""
270         
271         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
272         if user != account:
273             raise NotAllowedError
274         path, node = self._lookup_container(account, container)
275         self._put_metadata(user, node, meta, replace, False)
276     
277     @backend_method
278     def get_container_policy(self, user, account, container):
279         """Return a dictionary with the container policy."""
280         
281         logger.debug("get_container_policy: %s %s", account, container)
282         if user != account:
283             if container not in self._allowed_containers(user, account):
284                 raise NotAllowedError
285             return {}
286         path = self._lookup_container(account, container)[0]
287         return self.policy.policy_get(path)
288     
289     @backend_method
290     def update_container_policy(self, user, account, container, policy, replace=False):
291         """Update the policy associated with the account."""
292         
293         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
294         if user != account:
295             raise NotAllowedError
296         path = self._lookup_container(account, container)[0]
297         self._check_policy(policy)
298         if replace:
299             for k, v in self.default_policy.iteritems():
300                 if k not in policy:
301                     policy[k] = v
302         self.policy.policy_set(path, policy)
303     
304     @backend_method
305     def put_container(self, user, account, container, policy=None):
306         """Create a new container with the given name."""
307         
308         logger.debug("put_container: %s %s %s", account, container, policy)
309         if user != account:
310             raise NotAllowedError
311         try:
312             path, node = self._lookup_container(account, container)
313         except NameError:
314             pass
315         else:
316             raise NameError('Container already exists')
317         if policy:
318             self._check_policy(policy)
319         path = '/'.join((account, container))
320         self._put_path(user, self._lookup_account(account, True)[1], path)
321         for k, v in self.default_policy.iteritems():
322             if k not in policy:
323                 policy[k] = v
324         self.policy.policy_set(path, policy)
325     
326     @backend_method
327     def delete_container(self, user, account, container, until=None):
328         """Delete/purge the container with the given name."""
329         
330         logger.debug("delete_container: %s %s %s", account, container, until)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         
335         if until is not None:
336             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
337             for v in versions:
338                 self.mapper.map_remv(v)
339             self.node.node_purge_children(node, until, CLUSTER_DELETED)
340             return
341         
342         if self._get_statistics(node)[0] > 0:
343             raise IndexError('Container is not empty')
344         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
345         for v in versions:
346             self.mapper.map_remv(v)
347         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
348         self.node.node_remove(node)
349         self.policy.policy_unset(path)
350     
351     @backend_method
352     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
353         """Return a list of objects existing under a container."""
354         
355         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
356         allowed = []
357         if user != account:
358             if until:
359                 raise NotAllowedError
360             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
361             if not allowed:
362                 raise NotAllowedError
363         else:
364             if shared:
365                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
366                 if not allowed:
367                     return []
368         path, node = self._lookup_container(account, container)
369         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
370     
371     @backend_method
372     def list_object_meta(self, user, account, container, until=None):
373         """Return a list with all the container's object meta keys."""
374         
375         logger.debug("list_object_meta: %s %s %s", account, container, until)
376         allowed = []
377         if user != account:
378             if until:
379                 raise NotAllowedError
380             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381             if not allowed:
382                 raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         before = until if until is not None else inf
385         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
386     
387     @backend_method
388     def get_object_meta(self, user, account, container, name, version=None):
389         """Return a dictionary with the object metadata."""
390         
391         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
392         self._can_read(user, account, container, name)
393         path, node = self._lookup_object(account, container, name)
394         props = self._get_version(node, version)
395         if version is None:
396             modified = props[self.MTIME]
397         else:
398             modified = self._get_version(node)[self.MTIME] # Overall last modification.
399         
400         meta = dict(self.node.attribute_get(props[self.SERIAL]))
401         meta.update({'name': name, 'bytes': props[self.SIZE]})
402         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
403         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
404         return meta
405     
406     @backend_method
407     def update_object_meta(self, user, account, container, name, meta, replace=False):
408         """Update the metadata associated with the object."""
409         
410         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
411         self._can_write(user, account, container, name)
412         path, node = self._lookup_object(account, container, name)
413         return self._put_metadata(user, node, meta, replace)
414     
415     @backend_method
416     def get_object_permissions(self, user, account, container, name):
417         """Return the action allowed on the object, the path
418         from which the object gets its permissions from,
419         along with a dictionary containing the permissions."""
420         
421         logger.debug("get_object_permissions: %s %s %s", account, container, name)
422         allowed = 'write'
423         if user != account:
424             path = '/'.join((account, container, name))
425             if self.permissions.access_check(path, self.WRITE, user):
426                 allowed = 'write'
427             elif self.permissions.access_check(path, self.READ, user):
428                 allowed = 'read'
429             else:
430                 raise NotAllowedError
431         path = self._lookup_object(account, container, name)[0]
432         return (allowed,) + self.permissions.access_inherit(path)
433     
434     @backend_method
435     def update_object_permissions(self, user, account, container, name, permissions):
436         """Update the permissions associated with the object."""
437         
438         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
439         if user != account:
440             raise NotAllowedError
441         path = self._lookup_object(account, container, name)[0]
442         self._check_permissions(path, permissions)
443         self.permissions.access_set(path, permissions)
444     
445     @backend_method
446     def get_object_public(self, user, account, container, name):
447         """Return the public URL of the object if applicable."""
448         
449         logger.debug("get_object_public: %s %s %s", account, container, name)
450         self._can_read(user, account, container, name)
451         path = self._lookup_object(account, container, name)[0]
452         if self.permissions.public_check(path):
453             return '/public/' + path
454         return None
455     
456     @backend_method
457     def update_object_public(self, user, account, container, name, public):
458         """Update the public status of the object."""
459         
460         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
461         self._can_write(user, account, container, name)
462         path = self._lookup_object(account, container, name)[0]
463         if not public:
464             self.permissions.public_unset(path)
465         else:
466             self.permissions.public_set(path)
467     
468     @backend_method
469     def get_object_hashmap(self, user, account, container, name, version=None):
470         """Return the object's size and a list with partial hashes."""
471         
472         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
473         self._can_read(user, account, container, name)
474         path, node = self._lookup_object(account, container, name)
475         props = self._get_version(node, version)
476         hashmap = self.mapper.map_retr(props[self.SERIAL])
477         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
478     
479     @backend_method
480     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
481         """Create/update an object with the specified size and partial hashes."""
482         
483         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
484         if permissions is not None and user != account:
485             raise NotAllowedError
486         self._can_write(user, account, container, name)
487         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
488         if missing:
489             ie = IndexError()
490             ie.data = [binascii.hexlify(x) for x in missing]
491             raise ie
492         if permissions is not None:
493             path = '/'.join((account, container, name))
494             self._check_permissions(path, permissions)
495         path, node = self._put_object_node(account, container, name)
496         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
497         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
498         if not replace_meta and src_version_id is not None:
499             self.node.attribute_copy(src_version_id, dest_version_id)
500         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
501         if permissions is not None:
502             self.permissions.access_set(path, permissions)
503         return dest_version_id
504     
505     def _copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
506         if permissions is not None and user != account:
507             raise NotAllowedError
508         self._can_read(user, account, src_container, src_name)
509         self._can_write(user, account, dest_container, dest_name)
510         src_path, src_node = self._lookup_object(account, src_container, src_name)
511         if permissions is not None:
512             dest_path = '/'.join((account, container, name))
513             self._check_permissions(dest_path, permissions)
514         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
515         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
516         if src_version_id is not None:
517             self._copy_data(src_version_id, dest_version_id)
518         if not replace_meta and src_version_id is not None:
519             self.node.attribute_copy(src_version_id, dest_version_id)
520         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
521         if permissions is not None:
522             self.permissions.access_set(dest_path, permissions)
523         return dest_version_id
524     
525     @backend_method
526     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
527         """Copy an object's data and metadata."""
528         
529         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
530         return self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
531     
532     @backend_method
533     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
534         """Move an object's data and metadata."""
535         
536         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
537         dest_version_id = self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
538         self._delete_object(user, account, src_container, src_name)
539         return dest_version_id
540     
541     def _delete_object(self, user, account, container, name, until=None):
542         if user != account:
543             raise NotAllowedError
544         
545         if until is not None:
546             path = '/'.join((account, container, name))
547             node = self.node.node_lookup(path)
548             if node is None:
549                 return
550             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
551             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
552             for v in versions:
553                 self.mapper.map_remv(v)
554             self.node.node_purge_children(node, until, CLUSTER_DELETED)
555             try:
556                 props = self._get_version(node)
557             except NameError:
558                 pass
559             else:
560                 self.permissions.access_clear(path)
561             return
562         
563         path, node = self._lookup_object(account, container, name)
564         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
565         self.permissions.access_clear(path)
566     
567     @backend_method
568     def delete_object(self, user, account, container, name, until=None):
569         """Delete/purge an object."""
570         
571         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
572         self._delete_object(user, account, container, name, until)
573     
574     @backend_method
575     def list_versions(self, user, account, container, name):
576         """Return a list of all (version, version_timestamp) tuples for an object."""
577         
578         logger.debug("list_versions: %s %s %s", account, container, name)
579         self._can_read(user, account, container, name)
580         path, node = self._lookup_object(account, container, name)
581         return self.node.node_get_versions(node, ['serial', 'mtime'])
582     
583     @backend_method(autocommit=0)
584     def get_block(self, hash):
585         """Return a block's data."""
586         
587         logger.debug("get_block: %s", hash)
588         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
589         if not blocks:
590             raise NameError('Block does not exist')
591         return blocks[0]
592     
593     @backend_method(autocommit=0)
594     def put_block(self, data):
595         """Store a block and return the hash."""
596         
597         logger.debug("put_block: %s", len(data))
598         hashes, absent = self.blocker.block_stor((data,))
599         return binascii.hexlify(hashes[0])
600     
601     @backend_method(autocommit=0)
602     def update_block(self, hash, data, offset=0):
603         """Update a known block and return the hash."""
604         
605         logger.debug("update_block: %s %s %s", hash, len(data), offset)
606         if offset == 0 and len(data) == self.block_size:
607             return self.put_block(data)
608         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
609         return binascii.hexlify(h)
610     
611     # Path functions.
612     
613     def _put_object_node(self, account, container, name):
614         path, parent = self._lookup_container(account, container)
615         path = '/'.join((path, name))
616         node = self.node.node_lookup(path)
617         if node is None:
618             node = self.node.node_create(parent, path)
619         return path, node
620     
621     def _put_path(self, user, parent, path):
622         node = self.node.node_create(parent, path)
623         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
624         return node
625     
626     def _lookup_account(self, account, create=True):
627         node = self.node.node_lookup(account)
628         if node is None and create:
629             node = self._put_path(account, self.ROOTNODE, account) # User is account.
630         return account, node
631     
632     def _lookup_container(self, account, container):
633         path = '/'.join((account, container))
634         node = self.node.node_lookup(path)
635         if node is None:
636             raise NameError('Container does not exist')
637         return path, node
638     
639     def _lookup_object(self, account, container, name):
640         path = '/'.join((account, container, name))
641         node = self.node.node_lookup(path)
642         if node is None:
643             raise NameError('Object does not exist')
644         return path, node
645     
646     def _get_properties(self, node, until=None):
647         """Return properties until the timestamp given."""
648         
649         before = until if until is not None else inf
650         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
651         if props is None and until is not None:
652             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
653         if props is None:
654             raise NameError('Path does not exist')
655         return props
656     
657     def _get_statistics(self, node, until=None):
658         """Return count, sum of size and latest timestamp of everything under node."""
659         
660         if until is None:
661             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
662         else:
663             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
664         if stats is None:
665             stats = (0, 0, 0)
666         return stats
667     
668     def _get_version(self, node, version=None):
669         if version is None:
670             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
671             if props is None:
672                 raise NameError('Object does not exist')
673         else:
674             props = self.node.version_get_properties(version)
675             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
676                 raise IndexError('Version does not exist')
677         return props
678     
679     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
680         
681         # Get source serial and size.
682         if src_version is not None:
683             src_props = self._get_version(src_node, src_version)
684             src_version_id = src_props[self.SERIAL]
685             size = src_props[self.SIZE]
686         else:
687             # Latest or create from scratch.
688             try:
689                 src_props = self._get_version(src_node)
690                 src_version_id = src_props[self.SERIAL]
691                 size = src_props[self.SIZE]
692             except NameError:
693                 src_version_id = None
694                 size = 0
695         if dest_size is not None:
696             size = dest_size
697         
698         # Move the latest version at destination to CLUSTER_HISTORY and create new.
699         if src_node == dest_node and src_version is None and src_version_id is not None:
700             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
701         else:
702             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
703             if dest_props is not None:
704                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
705         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
706         
707         return src_version_id, dest_version_id
708     
709     def _copy_data(self, src_version, dest_version):
710         hashmap = self.mapper.map_retr(src_version)
711         self.mapper.map_stor(dest_version, hashmap)
712     
713     def _get_metadata(self, version):
714         if version is None:
715             return {}
716         return dict(self.node.attribute_get(version))
717     
718     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
719         """Create a new version and store metadata."""
720         
721         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
722         if not replace:
723             if src_version_id is not None:
724                 self.node.attribute_copy(src_version_id, dest_version_id)
725             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
726             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
727         else:
728             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
729         if copy_data and src_version_id is not None:
730             self._copy_data(src_version_id, dest_version_id)
731         return dest_version_id
732     
733     def _list_limits(self, listing, marker, limit):
734         start = 0
735         if marker:
736             try:
737                 start = listing.index(marker) + 1
738             except ValueError:
739                 pass
740         if not limit or limit > 10000:
741             limit = 10000
742         return start, limit
743     
744     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
745         cont_prefix = path + '/'
746         prefix = cont_prefix + prefix
747         start = cont_prefix + marker if marker else None
748         before = until if until is not None else inf
749         filterq = ','.join(keys) if keys else None
750         
751         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
752         objects.extend([(p, None) for p in prefixes] if virtual else [])
753         objects.sort(key=lambda x: x[0])
754         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
755         
756         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
757         return objects[start:start + limit]
758     
759     # Policy functions.
760     
761     def _check_policy(self, policy):
762         for k in policy.keys():
763             if policy[k] == '':
764                 policy[k] = self.default_policy.get(k)
765         for k, v in policy.iteritems():
766             if k == 'quota':
767                 q = int(v) # May raise ValueError.
768                 if q < 0:
769                     raise ValueError
770             elif k == 'versioning':
771                 if v not in ['auto', 'manual', 'none']:
772                     raise ValueError
773             else:
774                 raise ValueError
775     
776     # Access control functions.
777     
778     def _check_groups(self, groups):
779         # raise ValueError('Bad characters in groups')
780         pass
781     
782     def _check_permissions(self, path, permissions):
783         # raise ValueError('Bad characters in permissions')
784         
785         # Check for existing permissions.
786         paths = self.permissions.access_list(path)
787         if paths:
788             ae = AttributeError()
789             ae.data = paths
790             raise ae
791     
792     def _can_read(self, user, account, container, name):
793         if user == account:
794             return True
795         path = '/'.join((account, container, name))
796         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
797             raise NotAllowedError
798     
799     def _can_write(self, user, account, container, name):
800         if user == account:
801             return True
802         path = '/'.join((account, container, name))
803         if not self.permissions.access_check(path, self.WRITE, user):
804             raise NotAllowedError
805     
806     def _allowed_accounts(self, user):
807         allow = set()
808         for path in self.permissions.access_list_paths(user):
809             allow.add(path.split('/', 1)[0])
810         return sorted(allow)
811     
812     def _allowed_containers(self, user, account):
813         allow = set()
814         for path in self.permissions.access_list_paths(user, account):
815             allow.add(path.split('/', 2)[1])
816         return sorted(allow)