Container PUT can also be used for updating metadata/policy.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import sqlite3
38 import logging
39 import hashlib
40 import binascii
41
42 from base import NotAllowedError, BaseBackend
43 from lib.hashfiler import Mapper, Blocker
44
45 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46
47 inf = float('inf')
48
49
50 logger = logging.getLogger(__name__)
51
52 def backend_method(func=None, autocommit=1):
53     if func is None:
54         def fn(func):
55             return backend_method(func, autocommit)
56         return fn
57
58     if not autocommit:
59         return func
60     def fn(self, *args, **kw):
61         self.wrapper.execute()
62         try:
63             ret = func(self, *args, **kw)
64             self.wrapper.commit()
65             return ret
66         except:
67             self.wrapper.rollback()
68             raise
69     return fn
70
71
72 class ModularBackend(BaseBackend):
73     """A modular backend.
74     
75     Uses modules for SQL functions and hashfiler for storage.
76     """
77     
78     def __init__(self, mod, path, db):
79         self.hash_algorithm = 'sha256'
80         self.block_size = 4 * 1024 * 1024 # 4MB
81         
82         self.default_policy = {'quota': 0, 'versioning': 'auto'}
83         
84         if path and not os.path.exists(path):
85             os.makedirs(path)
86         if not os.path.isdir(path):
87             raise RuntimeError("Cannot open path '%s'" % (path,))
88         
89         __import__(mod)
90         self.mod = sys.modules[mod]
91         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92         
93         params = {'blocksize': self.block_size,
94                   'blockpath': os.path.join(path + '/blocks'),
95                   'hashtype': self.hash_algorithm}
96         self.blocker = Blocker(**params)
97         
98         params = {'mappath': os.path.join(path + '/maps'),
99                   'namelen': self.blocker.hashlen}
100         self.mapper = Mapper(**params)
101         
102         params = {'wrapper': self.wrapper}
103         self.permissions = self.mod.permissions.Permissions(**params)
104         for x in ['READ', 'WRITE']:
105             setattr(self, x, getattr(self.mod.permissions, x))
106         self.policy = self.mod.policy.Policy(**params)
107         self.node = self.mod.node.Node(**params)
108         for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
109             setattr(self, x, getattr(self.mod.node, x))
110     
111     @backend_method
112     def list_accounts(self, user, marker=None, limit=10000):
113         """Return a list of accounts the user can access."""
114         
115         logger.debug("list_accounts: %s %s %s", user, marker, limit)
116         allowed = self._allowed_accounts(user)
117         start, limit = self._list_limits(allowed, marker, limit)
118         return allowed[start:start + limit]
119     
120     @backend_method
121     def get_account_meta(self, user, account, until=None):
122         """Return a dictionary with the account metadata."""
123         
124         logger.debug("get_account_meta: %s %s", account, until)
125         path, node = self._lookup_account(account, user == account)
126         if user != account:
127             if until or node is None or account not in self._allowed_accounts(user):
128                 raise NotAllowedError
129         try:
130             props = self._get_properties(node, until)
131             mtime = props[self.MTIME]
132         except NameError:
133             props = None
134             mtime = until
135         count, bytes, tstamp = self._get_statistics(node, until)
136         tstamp = max(tstamp, mtime)
137         if until is None:
138             modified = tstamp
139         else:
140             modified = self._get_statistics(node)[2] # Overall last modification.
141             modified = max(modified, mtime)
142         
143         if user != account:
144             meta = {'name': account}
145         else:
146             meta = {}
147             if props is not None:
148                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
149             if until is not None:
150                 meta.update({'until_timestamp': tstamp})
151             meta.update({'name': account, 'count': count, 'bytes': bytes})
152         meta.update({'modified': modified})
153         return meta
154     
155     @backend_method
156     def update_account_meta(self, user, account, meta, replace=False):
157         """Update the metadata associated with the account."""
158         
159         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
160         if user != account:
161             raise NotAllowedError
162         path, node = self._lookup_account(account, True)
163         self._put_metadata(user, node, meta, replace, False)
164     
165     @backend_method
166     def get_account_groups(self, user, account):
167         """Return a dictionary with the user groups defined for this account."""
168         
169         logger.debug("get_account_groups: %s", account)
170         if user != account:
171             if account not in self._allowed_accounts(user):
172                 raise NotAllowedError
173             return {}
174         self._lookup_account(account, True)
175         return self.permissions.group_dict(account)
176     
177     @backend_method
178     def update_account_groups(self, user, account, groups, replace=False):
179         """Update the groups associated with the account."""
180         
181         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
182         if user != account:
183             raise NotAllowedError
184         self._lookup_account(account, True)
185         self._check_groups(groups)
186         if replace:
187             self.permissions.group_destroy(account)
188         for k, v in groups.iteritems():
189             if not replace: # If not already deleted.
190                 self.permissions.group_delete(account, k)
191             if v:
192                 self.permissions.group_addmany(account, k, v)
193     
194     @backend_method
195     def put_account(self, user, account):
196         """Create a new account with the given name."""
197         
198         logger.debug("put_account: %s", account)
199         if user != account:
200             raise NotAllowedError
201         node = self.node.node_lookup(account)
202         if node is not None:
203             raise NameError('Account already exists')
204         self._put_path(user, self.ROOTNODE, account)
205     
206     @backend_method
207     def delete_account(self, user, account):
208         """Delete the account with the given name."""
209         
210         logger.debug("delete_account: %s", account)
211         if user != account:
212             raise NotAllowedError
213         node = self.node.node_lookup(account)
214         if node is None:
215             return
216         if not self.node.node_remove(node):
217             raise IndexError('Account is not empty')
218         self.permissions.group_destroy(account)
219     
220     @backend_method
221     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
222         """Return a list of containers existing under an account."""
223         
224         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
225         if user != account:
226             if until or account not in self._allowed_accounts(user):
227                 raise NotAllowedError
228             allowed = self._allowed_containers(user, account)
229             start, limit = self._list_limits(allowed, marker, limit)
230             return allowed[start:start + limit]
231         if shared:
232             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
233             start, limit = self._list_limits(allowed, marker, limit)
234             return allowed[start:start + limit]
235         node = self.node.node_lookup(account)
236         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
237     
238     @backend_method
239     def get_container_meta(self, user, account, container, until=None):
240         """Return a dictionary with the container metadata."""
241         
242         logger.debug("get_container_meta: %s %s %s", account, container, until)
243         if user != account:
244             if until or container not in self._allowed_containers(user, account):
245                 raise NotAllowedError
246         path, node = self._lookup_container(account, container)
247         props = self._get_properties(node, until)
248         mtime = props[self.MTIME]
249         count, bytes, tstamp = self._get_statistics(node, until)
250         tstamp = max(tstamp, mtime)
251         if until is None:
252             modified = tstamp
253         else:
254             modified = self._get_statistics(node)[2] # Overall last modification.
255             modified = max(modified, mtime)
256         
257         if user != account:
258             meta = {'name': container}
259         else:
260             meta = dict(self.node.attribute_get(props[self.SERIAL]))
261             if until is not None:
262                 meta.update({'until_timestamp': tstamp})
263             meta.update({'name': container, 'count': count, 'bytes': bytes})
264         meta.update({'modified': modified})
265         return meta
266     
267     @backend_method
268     def update_container_meta(self, user, account, container, meta, replace=False):
269         """Update the metadata associated with the container."""
270         
271         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
272         if user != account:
273             raise NotAllowedError
274         path, node = self._lookup_container(account, container)
275         self._put_metadata(user, node, meta, replace, False)
276     
277     @backend_method
278     def get_container_policy(self, user, account, container):
279         """Return a dictionary with the container policy."""
280         
281         logger.debug("get_container_policy: %s %s", account, container)
282         if user != account:
283             if container not in self._allowed_containers(user, account):
284                 raise NotAllowedError
285             return {}
286         path = self._lookup_container(account, container)[0]
287         return self.policy.policy_get(path)
288     
289     @backend_method
290     def update_container_policy(self, user, account, container, policy, replace=False):
291         """Update the policy associated with the account."""
292         
293         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
294         if user != account:
295             raise NotAllowedError
296         path = self._lookup_container(account, container)[0]
297         self._check_policy(policy)
298         if replace:
299             for k, v in self.default_policy.iteritems():
300                 if k not in policy:
301                     policy[k] = v
302         self.policy.policy_set(path, policy)
303     
304     @backend_method
305     def put_container(self, user, account, container, policy=None):
306         """Create a new container with the given name."""
307         
308         logger.debug("put_container: %s %s %s", account, container, policy)
309         if user != account:
310             raise NotAllowedError
311         try:
312             path, node = self._lookup_container(account, container)
313         except NameError:
314             pass
315         else:
316             raise NameError('Container already exists')
317         if policy:
318             self._check_policy(policy)
319         path = '/'.join((account, container))
320         self._put_path(user, self._lookup_account(account, True)[1], path)
321         for k, v in self.default_policy.iteritems():
322             if k not in policy:
323                 policy[k] = v
324         self.policy.policy_set(path, policy)
325     
326     @backend_method
327     def delete_container(self, user, account, container, until=None):
328         """Delete/purge the container with the given name."""
329         
330         logger.debug("delete_container: %s %s %s", account, container, until)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         
335         if until is not None:
336             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
337             for v in versions:
338                 self.mapper.map_remv(v)
339             self.node.node_purge_children(node, until, CLUSTER_DELETED)
340             return
341         
342         if self._get_statistics(node)[0] > 0:
343             raise IndexError('Container is not empty')
344         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
345         for v in versions:
346             self.mapper.map_remv(v)
347         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
348         self.node.node_remove(node)
349         self.policy.policy_unset(path)
350     
351     @backend_method
352     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
353         """Return a list of objects existing under a container."""
354         
355         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
356         allowed = []
357         if user != account:
358             if until:
359                 raise NotAllowedError
360             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
361             if not allowed:
362                 raise NotAllowedError
363         else:
364             if shared:
365                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
366                 if not allowed:
367                     return []
368         path, node = self._lookup_container(account, container)
369         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
370     
371     @backend_method
372     def list_object_meta(self, user, account, container, until=None):
373         """Return a list with all the container's object meta keys."""
374         
375         logger.debug("list_object_meta: %s %s %s", account, container, until)
376         allowed = []
377         if user != account:
378             if until:
379                 raise NotAllowedError
380             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381             if not allowed:
382                 raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         before = until if until is not None else inf
385         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
386     
387     @backend_method
388     def get_object_meta(self, user, account, container, name, version=None):
389         """Return a dictionary with the object metadata."""
390         
391         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
392         self._can_read(user, account, container, name)
393         path, node = self._lookup_object(account, container, name)
394         props = self._get_version(node, version)
395         if version is None:
396             modified = props[self.MTIME]
397         else:
398             modified = self._get_version(node)[self.MTIME] # Overall last modification.
399         
400         meta = dict(self.node.attribute_get(props[self.SERIAL]))
401         meta.update({'name': name, 'bytes': props[self.SIZE]})
402         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
403         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
404         return meta
405     
406     @backend_method
407     def update_object_meta(self, user, account, container, name, meta, replace=False):
408         """Update the metadata associated with the object."""
409         
410         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
411         self._can_write(user, account, container, name)
412         path, node = self._lookup_object(account, container, name)
413         return self._put_metadata(user, node, meta, replace)
414     
415     @backend_method
416     def get_object_permissions(self, user, account, container, name):
417         """Return the path from which this object gets its permissions from,\
418         along with a dictionary containing the permissions."""
419         
420         logger.debug("get_object_permissions: %s %s %s", account, container, name)
421         self._can_read(user, account, container, name)
422         path = self._lookup_object(account, container, name)[0]
423         return self.permissions.access_inherit(path)
424     
425     @backend_method
426     def update_object_permissions(self, user, account, container, name, permissions):
427         """Update the permissions associated with the object."""
428         
429         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
430         if user != account:
431             raise NotAllowedError
432         path = self._lookup_object(account, container, name)[0]
433         self._check_permissions(path, permissions)
434         self.permissions.access_set(path, permissions)
435     
436     @backend_method
437     def get_object_public(self, user, account, container, name):
438         """Return the public URL of the object if applicable."""
439         
440         logger.debug("get_object_public: %s %s %s", account, container, name)
441         self._can_read(user, account, container, name)
442         path = self._lookup_object(account, container, name)[0]
443         if self.permissions.public_check(path):
444             return '/public/' + path
445         return None
446     
447     @backend_method
448     def update_object_public(self, user, account, container, name, public):
449         """Update the public status of the object."""
450         
451         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
452         self._can_write(user, account, container, name)
453         path = self._lookup_object(account, container, name)[0]
454         if not public:
455             self.permissions.public_unset(path)
456         else:
457             self.permissions.public_set(path)
458     
459     @backend_method
460     def get_object_hashmap(self, user, account, container, name, version=None):
461         """Return the object's size and a list with partial hashes."""
462         
463         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
464         self._can_read(user, account, container, name)
465         path, node = self._lookup_object(account, container, name)
466         props = self._get_version(node, version)
467         hashmap = self.mapper.map_retr(props[self.SERIAL])
468         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
469     
470     @backend_method
471     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
472         """Create/update an object with the specified size and partial hashes."""
473         
474         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
475         if permissions is not None and user != account:
476             raise NotAllowedError
477         self._can_write(user, account, container, name)
478         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
479         if missing:
480             ie = IndexError()
481             ie.data = [binascii.hexlify(x) for x in missing]
482             raise ie
483         if permissions is not None:
484             path = '/'.join((account, container, name))
485             self._check_permissions(path, permissions)
486         path, node = self._put_object_node(account, container, name)
487         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
488         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
489         if not replace_meta and src_version_id is not None:
490             self.node.attribute_copy(src_version_id, dest_version_id)
491         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
492         if permissions is not None:
493             self.permissions.access_set(path, permissions)
494         return dest_version_id
495     
496     def _copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
497         if permissions is not None and user != account:
498             raise NotAllowedError
499         self._can_read(user, account, src_container, src_name)
500         self._can_write(user, account, dest_container, dest_name)
501         src_path, src_node = self._lookup_object(account, src_container, src_name)
502         if permissions is not None:
503             dest_path = '/'.join((account, container, name))
504             self._check_permissions(dest_path, permissions)
505         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
506         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
507         if src_version_id is not None:
508             self._copy_data(src_version_id, dest_version_id)
509         if not replace_meta and src_version_id is not None:
510             self.node.attribute_copy(src_version_id, dest_version_id)
511         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
512         if permissions is not None:
513             self.permissions.access_set(dest_path, permissions)
514         return dest_version_id
515     
516     @backend_method
517     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
518         """Copy an object's data and metadata."""
519         
520         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
521         return self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
522     
523     @backend_method
524     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
525         """Move an object's data and metadata."""
526         
527         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
528         dest_version_id = self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
529         self._delete_object(user, account, src_container, src_name)
530         return dest_version_id
531     
532     def _delete_object(self, user, account, container, name, until=None):
533         if user != account:
534             raise NotAllowedError
535         
536         if until is not None:
537             path = '/'.join((account, container, name))
538             node = self.node.node_lookup(path)
539             if node is None:
540                 return
541             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
542             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
543             for v in versions:
544                 self.mapper.map_remv(v)
545             self.node.node_purge_children(node, until, CLUSTER_DELETED)
546             try:
547                 props = self._get_version(node)
548             except NameError:
549                 pass
550             else:
551                 self.permissions.access_clear(path)
552             return
553         
554         path, node = self._lookup_object(account, container, name)
555         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
556         self.permissions.access_clear(path)
557     
558     @backend_method
559     def delete_object(self, user, account, container, name, until=None):
560         """Delete/purge an object."""
561         
562         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
563         self._delete_object(user, account, container, name, until)
564     
565     @backend_method
566     def list_versions(self, user, account, container, name):
567         """Return a list of all (version, version_timestamp) tuples for an object."""
568         
569         logger.debug("list_versions: %s %s %s", account, container, name)
570         self._can_read(user, account, container, name)
571         path, node = self._lookup_object(account, container, name)
572         return self.node.node_get_versions(node, ['serial', 'mtime'])
573     
574     @backend_method(autocommit=0)
575     def get_block(self, hash):
576         """Return a block's data."""
577         
578         logger.debug("get_block: %s", hash)
579         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
580         if not blocks:
581             raise NameError('Block does not exist')
582         return blocks[0]
583     
584     @backend_method(autocommit=0)
585     def put_block(self, data):
586         """Store a block and return the hash."""
587         
588         logger.debug("put_block: %s", len(data))
589         hashes, absent = self.blocker.block_stor((data,))
590         return binascii.hexlify(hashes[0])
591     
592     @backend_method(autocommit=0)
593     def update_block(self, hash, data, offset=0):
594         """Update a known block and return the hash."""
595         
596         logger.debug("update_block: %s %s %s", hash, len(data), offset)
597         if offset == 0 and len(data) == self.block_size:
598             return self.put_block(data)
599         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
600         return binascii.hexlify(h)
601     
602     # Path functions.
603     
604     def _put_object_node(self, account, container, name):
605         path, parent = self._lookup_container(account, container)
606         path = '/'.join((path, name))
607         node = self.node.node_lookup(path)
608         if node is None:
609             node = self.node.node_create(parent, path)
610         return path, node
611     
612     def _put_path(self, user, parent, path):
613         node = self.node.node_create(parent, path)
614         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
615         return node
616     
617     def _lookup_account(self, account, create=True):
618         node = self.node.node_lookup(account)
619         if node is None and create:
620             node = self._put_path(account, self.ROOTNODE, account) # User is account.
621         return account, node
622     
623     def _lookup_container(self, account, container):
624         path = '/'.join((account, container))
625         node = self.node.node_lookup(path)
626         if node is None:
627             raise NameError('Container does not exist')
628         return path, node
629     
630     def _lookup_object(self, account, container, name):
631         path = '/'.join((account, container, name))
632         node = self.node.node_lookup(path)
633         if node is None:
634             raise NameError('Object does not exist')
635         return path, node
636     
637     def _get_properties(self, node, until=None):
638         """Return properties until the timestamp given."""
639         
640         before = until if until is not None else inf
641         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
642         if props is None and until is not None:
643             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
644         if props is None:
645             raise NameError('Path does not exist')
646         return props
647     
648     def _get_statistics(self, node, until=None):
649         """Return count, sum of size and latest timestamp of everything under node."""
650         
651         if until is None:
652             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
653         else:
654             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
655         if stats is None:
656             stats = (0, 0, 0)
657         return stats
658     
659     def _get_version(self, node, version=None):
660         if version is None:
661             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
662             if props is None:
663                 raise NameError('Object does not exist')
664         else:
665             props = self.node.version_get_properties(version)
666             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
667                 raise IndexError('Version does not exist')
668         return props
669     
670     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
671         
672         # Get source serial and size.
673         if src_version is not None:
674             src_props = self._get_version(src_node, src_version)
675             src_version_id = src_props[self.SERIAL]
676             size = src_props[self.SIZE]
677         else:
678             # Latest or create from scratch.
679             try:
680                 src_props = self._get_version(src_node)
681                 src_version_id = src_props[self.SERIAL]
682                 size = src_props[self.SIZE]
683             except NameError:
684                 src_version_id = None
685                 size = 0
686         if dest_size is not None:
687             size = dest_size
688         
689         # Move the latest version at destination to CLUSTER_HISTORY and create new.
690         if src_node == dest_node and src_version is None and src_version_id is not None:
691             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
692         else:
693             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
694             if dest_props is not None:
695                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
696         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
697         
698         return src_version_id, dest_version_id
699     
700     def _copy_data(self, src_version, dest_version):
701         hashmap = self.mapper.map_retr(src_version)
702         self.mapper.map_stor(dest_version, hashmap)
703     
704     def _get_metadata(self, version):
705         if version is None:
706             return {}
707         return dict(self.node.attribute_get(version))
708     
709     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
710         """Create a new version and store metadata."""
711         
712         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
713         if not replace:
714             if src_version_id is not None:
715                 self.node.attribute_copy(src_version_id, dest_version_id)
716             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
717             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
718         else:
719             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
720         if copy_data and src_version_id is not None:
721             self._copy_data(src_version_id, dest_version_id)
722         return dest_version_id
723     
724     def _list_limits(self, listing, marker, limit):
725         start = 0
726         if marker:
727             try:
728                 start = listing.index(marker) + 1
729             except ValueError:
730                 pass
731         if not limit or limit > 10000:
732             limit = 10000
733         return start, limit
734     
735     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
736         cont_prefix = path + '/'
737         prefix = cont_prefix + prefix
738         start = cont_prefix + marker if marker else None
739         before = until if until is not None else inf
740         filterq = ','.join(keys) if keys else None
741         
742         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
743         objects.extend([(p, None) for p in prefixes] if virtual else [])
744         objects.sort(key=lambda x: x[0])
745         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
746         
747         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
748         return objects[start:start + limit]
749     
750     # Policy functions.
751     
752     def _check_policy(self, policy):
753         for k in policy.keys():
754             if policy[k] == '':
755                 policy[k] = self.default_policy.get(k)
756         for k, v in policy.iteritems():
757             if k == 'quota':
758                 q = int(v) # May raise ValueError.
759                 if q < 0:
760                     raise ValueError
761             elif k == 'versioning':
762                 if v not in ['auto', 'manual', 'none']:
763                     raise ValueError
764             else:
765                 raise ValueError
766     
767     # Access control functions.
768     
769     def _check_groups(self, groups):
770         # raise ValueError('Bad characters in groups')
771         pass
772     
773     def _check_permissions(self, path, permissions):
774         # raise ValueError('Bad characters in permissions')
775         
776         # Check for existing permissions.
777         paths = self.permissions.access_list(path)
778         if paths:
779             ae = AttributeError()
780             ae.data = paths
781             raise ae
782     
783     def _can_read(self, user, account, container, name):
784         if user == account:
785             return True
786         path = '/'.join((account, container, name))
787         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
788             raise NotAllowedError
789     
790     def _can_write(self, user, account, container, name):
791         if user == account:
792             return True
793         path = '/'.join((account, container, name))
794         if not self.permissions.access_check(path, self.WRITE, user):
795             raise NotAllowedError
796     
797     def _allowed_accounts(self, user):
798         allow = set()
799         for path in self.permissions.access_list_paths(user):
800             allow.add(path.split('/', 1)[0])
801         return sorted(allow)
802     
803     def _allowed_containers(self, user, account):
804         allow = set()
805         for path in self.permissions.access_list_paths(user, account):
806             allow.add(path.split('/', 2)[1])
807         return sorted(allow)