Reinstate map delete function.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import logging
38 import hashlib
39 import binascii
40
41 from base import NotAllowedError, QuotaError, BaseBackend
42
43 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
44
45 inf = float('inf')
46
47
48 logger = logging.getLogger(__name__)
49
50
51 class HashMap(list):
52     
53     def __init__(self, blocksize, blockhash):
54         super(HashMap, self).__init__()
55         self.blocksize = blocksize
56         self.blockhash = blockhash
57     
58     def _hash_raw(self, v):
59         h = hashlib.new(self.blockhash)
60         h.update(v)
61         return h.digest()
62     
63     def hash(self):
64         if len(self) == 0:
65             return self._hash_raw('')
66         if len(self) == 1:
67             return self.__getitem__(0)
68         
69         h = list(self)
70         s = 2
71         while s < len(h):
72             s = s * 2
73         h += [('\x00' * len(h[0]))] * (s - len(h))
74         while len(h) > 1:
75             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
76         return h[0]
77
78
79 def backend_method(func=None, autocommit=1):
80     if func is None:
81         def fn(func):
82             return backend_method(func, autocommit)
83         return fn
84
85     if not autocommit:
86         return func
87     def fn(self, *args, **kw):
88         self.wrapper.execute()
89         try:
90             ret = func(self, *args, **kw)
91             self.wrapper.commit()
92             return ret
93         except:
94             self.wrapper.rollback()
95             raise
96     return fn
97
98
99 class ModularBackend(BaseBackend):
100     """A modular backend.
101     
102     Uses modules for SQL functions and storage.
103     """
104     
105     def __init__(self, db_module, db_connection, block_module, block_path):
106         self.hash_algorithm = 'sha256'
107         self.block_size = 4 * 1024 * 1024 # 4MB
108         
109         self.default_policy = {'quota': 0, 'versioning': 'manual'}
110         
111         __import__(db_module)
112         self.db_module = sys.modules[db_module]
113         self.wrapper = self.db_module.DBWrapper(db_connection)
114         
115         params = {'wrapper': self.wrapper}
116         self.permissions = self.db_module.Permissions(**params)
117         for x in ['READ', 'WRITE']:
118             setattr(self, x, getattr(self.db_module, x))
119         self.node = self.db_module.Node(**params)
120         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
121             setattr(self, x, getattr(self.db_module, x))
122         
123         __import__(block_module)
124         self.block_module = sys.modules[block_module]
125         
126         params = {'path': block_path,
127                   'block_size': self.block_size,
128                   'hash_algorithm': self.hash_algorithm}
129         self.store = self.block_module.Store(**params)
130     
131     def close(self):
132         self.wrapper.close()
133     
134     @backend_method
135     def list_accounts(self, user, marker=None, limit=10000):
136         """Return a list of accounts the user can access."""
137         
138         logger.debug("list_accounts: %s %s %s", user, marker, limit)
139         allowed = self._allowed_accounts(user)
140         start, limit = self._list_limits(allowed, marker, limit)
141         return allowed[start:start + limit]
142     
143     @backend_method
144     def get_account_meta(self, user, account, until=None):
145         """Return a dictionary with the account metadata."""
146         
147         logger.debug("get_account_meta: %s %s", account, until)
148         path, node = self._lookup_account(account, user == account)
149         if user != account:
150             if until or node is None or account not in self._allowed_accounts(user):
151                 raise NotAllowedError
152         try:
153             props = self._get_properties(node, until)
154             mtime = props[self.MTIME]
155         except NameError:
156             props = None
157             mtime = until
158         count, bytes, tstamp = self._get_statistics(node, until)
159         tstamp = max(tstamp, mtime)
160         if until is None:
161             modified = tstamp
162         else:
163             modified = self._get_statistics(node)[2] # Overall last modification.
164             modified = max(modified, mtime)
165         
166         if user != account:
167             meta = {'name': account}
168         else:
169             meta = {}
170             if props is not None:
171                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
172             if until is not None:
173                 meta.update({'until_timestamp': tstamp})
174             meta.update({'name': account, 'count': count, 'bytes': bytes})
175         meta.update({'modified': modified})
176         return meta
177     
178     @backend_method
179     def update_account_meta(self, user, account, meta, replace=False):
180         """Update the metadata associated with the account."""
181         
182         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
183         if user != account:
184             raise NotAllowedError
185         path, node = self._lookup_account(account, True)
186         self._put_metadata(user, node, meta, replace)
187     
188     @backend_method
189     def get_account_groups(self, user, account):
190         """Return a dictionary with the user groups defined for this account."""
191         
192         logger.debug("get_account_groups: %s", account)
193         if user != account:
194             if account not in self._allowed_accounts(user):
195                 raise NotAllowedError
196             return {}
197         self._lookup_account(account, True)
198         return self.permissions.group_dict(account)
199     
200     @backend_method
201     def update_account_groups(self, user, account, groups, replace=False):
202         """Update the groups associated with the account."""
203         
204         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
205         if user != account:
206             raise NotAllowedError
207         self._lookup_account(account, True)
208         self._check_groups(groups)
209         if replace:
210             self.permissions.group_destroy(account)
211         for k, v in groups.iteritems():
212             if not replace: # If not already deleted.
213                 self.permissions.group_delete(account, k)
214             if v:
215                 self.permissions.group_addmany(account, k, v)
216     
217     @backend_method
218     def get_account_policy(self, user, account):
219         """Return a dictionary with the account policy."""
220         
221         logger.debug("get_account_policy: %s", account)
222         if user != account:
223             if account not in self._allowed_accounts(user):
224                 raise NotAllowedError
225             return {}
226         path, node = self._lookup_account(account, True)
227         return self._get_policy(node)
228     
229     @backend_method
230     def update_account_policy(self, user, account, policy, replace=False):
231         """Update the policy associated with the account."""
232         
233         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
234         if user != account:
235             raise NotAllowedError
236         path, node = self._lookup_account(account, True)
237         self._check_policy(policy)
238         self._put_policy(node, policy, replace)
239     
240     @backend_method
241     def put_account(self, user, account, policy={}):
242         """Create a new account with the given name."""
243         
244         logger.debug("put_account: %s %s", account, policy)
245         if user != account:
246             raise NotAllowedError
247         node = self.node.node_lookup(account)
248         if node is not None:
249             raise NameError('Account already exists')
250         if policy:
251             self._check_policy(policy)
252         node = self._put_path(user, self.ROOTNODE, account)
253         self._put_policy(node, policy, True)
254     
255     @backend_method
256     def delete_account(self, user, account):
257         """Delete the account with the given name."""
258         
259         logger.debug("delete_account: %s", account)
260         if user != account:
261             raise NotAllowedError
262         node = self.node.node_lookup(account)
263         if node is None:
264             return
265         if not self.node.node_remove(node):
266             raise IndexError('Account is not empty')
267         self.permissions.group_destroy(account)
268     
269     @backend_method
270     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
271         """Return a list of containers existing under an account."""
272         
273         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
274         if user != account:
275             if until or account not in self._allowed_accounts(user):
276                 raise NotAllowedError
277             allowed = self._allowed_containers(user, account)
278             start, limit = self._list_limits(allowed, marker, limit)
279             return allowed[start:start + limit]
280         if shared:
281             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
282             allowed = list(set(allowed))
283             start, limit = self._list_limits(allowed, marker, limit)
284             return allowed[start:start + limit]
285         node = self.node.node_lookup(account)
286         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
287     
288     @backend_method
289     def get_container_meta(self, user, account, container, until=None):
290         """Return a dictionary with the container metadata."""
291         
292         logger.debug("get_container_meta: %s %s %s", account, container, until)
293         if user != account:
294             if until or container not in self._allowed_containers(user, account):
295                 raise NotAllowedError
296         path, node = self._lookup_container(account, container)
297         props = self._get_properties(node, until)
298         mtime = props[self.MTIME]
299         count, bytes, tstamp = self._get_statistics(node, until)
300         tstamp = max(tstamp, mtime)
301         if until is None:
302             modified = tstamp
303         else:
304             modified = self._get_statistics(node)[2] # Overall last modification.
305             modified = max(modified, mtime)
306         
307         if user != account:
308             meta = {'name': container}
309         else:
310             meta = dict(self.node.attribute_get(props[self.SERIAL]))
311             if until is not None:
312                 meta.update({'until_timestamp': tstamp})
313             meta.update({'name': container, 'count': count, 'bytes': bytes})
314         meta.update({'modified': modified})
315         return meta
316     
317     @backend_method
318     def update_container_meta(self, user, account, container, meta, replace=False):
319         """Update the metadata associated with the container."""
320         
321         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
322         if user != account:
323             raise NotAllowedError
324         path, node = self._lookup_container(account, container)
325         self._put_metadata(user, node, meta, replace)
326     
327     @backend_method
328     def get_container_policy(self, user, account, container):
329         """Return a dictionary with the container policy."""
330         
331         logger.debug("get_container_policy: %s %s", account, container)
332         if user != account:
333             if container not in self._allowed_containers(user, account):
334                 raise NotAllowedError
335             return {}
336         path, node = self._lookup_container(account, container)
337         return self._get_policy(node)
338     
339     @backend_method
340     def update_container_policy(self, user, account, container, policy, replace=False):
341         """Update the policy associated with the container."""
342         
343         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
344         if user != account:
345             raise NotAllowedError
346         path, node = self._lookup_container(account, container)
347         self._check_policy(policy)
348         self._put_policy(node, policy, replace)
349     
350     @backend_method
351     def put_container(self, user, account, container, policy={}):
352         """Create a new container with the given name."""
353         
354         logger.debug("put_container: %s %s %s", account, container, policy)
355         if user != account:
356             raise NotAllowedError
357         try:
358             path, node = self._lookup_container(account, container)
359         except NameError:
360             pass
361         else:
362             raise NameError('Container already exists')
363         if policy:
364             self._check_policy(policy)
365         path = '/'.join((account, container))
366         node = self._put_path(user, self._lookup_account(account, True)[1], path)
367         self._put_policy(node, policy, True)
368     
369     @backend_method
370     def delete_container(self, user, account, container, until=None):
371         """Delete/purge the container with the given name."""
372         
373         logger.debug("delete_container: %s %s %s", account, container, until)
374         if user != account:
375             raise NotAllowedError
376         path, node = self._lookup_container(account, container)
377         
378         if until is not None:
379             hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
380             for h in hashes:
381                 self.store.map_delete(h)
382             self.node.node_purge_children(node, until, CLUSTER_DELETED)
383             return
384         
385         if self._get_statistics(node)[0] > 0:
386             raise IndexError('Container is not empty')
387         hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
388         for h in hashes:
389             self.store.map_delete(h)
390         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
391         self.node.node_remove(node)
392     
393     @backend_method
394     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
395         """Return a list of objects existing under a container."""
396         
397         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
398         allowed = []
399         if user != account:
400             if until:
401                 raise NotAllowedError
402             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
403             if not allowed:
404                 raise NotAllowedError
405         else:
406             if shared:
407                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
408                 if not allowed:
409                     return []
410         path, node = self._lookup_container(account, container)
411         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
412     
413     @backend_method
414     def list_object_meta(self, user, account, container, until=None):
415         """Return a list with all the container's object meta keys."""
416         
417         logger.debug("list_object_meta: %s %s %s", account, container, until)
418         allowed = []
419         if user != account:
420             if until:
421                 raise NotAllowedError
422             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
423             if not allowed:
424                 raise NotAllowedError
425         path, node = self._lookup_container(account, container)
426         before = until if until is not None else inf
427         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
428     
429     @backend_method
430     def get_object_meta(self, user, account, container, name, version=None):
431         """Return a dictionary with the object metadata."""
432         
433         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
434         self._can_read(user, account, container, name)
435         path, node = self._lookup_object(account, container, name)
436         props = self._get_version(node, version)
437         if version is None:
438             modified = props[self.MTIME]
439         else:
440             try:
441                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
442             except NameError: # Object may be deleted.
443                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
444                 if del_props is None:
445                     raise NameError('Object does not exist')
446                 modified = del_props[self.MTIME]
447         
448         meta = dict(self.node.attribute_get(props[self.SERIAL]))
449         meta.update({'name': name, 'bytes': props[self.SIZE]})
450         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
451         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
452         return meta
453     
454     @backend_method
455     def update_object_meta(self, user, account, container, name, meta, replace=False):
456         """Update the metadata associated with the object."""
457         
458         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
459         self._can_write(user, account, container, name)
460         path, node = self._lookup_object(account, container, name)
461         return self._put_metadata(user, node, meta, replace)
462     
463     @backend_method
464     def get_object_permissions(self, user, account, container, name):
465         """Return the action allowed on the object, the path
466         from which the object gets its permissions from,
467         along with a dictionary containing the permissions."""
468         
469         logger.debug("get_object_permissions: %s %s %s", account, container, name)
470         allowed = 'write'
471         if user != account:
472             path = '/'.join((account, container, name))
473             if self.permissions.access_check(path, self.WRITE, user):
474                 allowed = 'write'
475             elif self.permissions.access_check(path, self.READ, user):
476                 allowed = 'read'
477             else:
478                 raise NotAllowedError
479         path = self._lookup_object(account, container, name)[0]
480         return (allowed,) + self.permissions.access_inherit(path)
481     
482     @backend_method
483     def update_object_permissions(self, user, account, container, name, permissions):
484         """Update the permissions associated with the object."""
485         
486         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
487         if user != account:
488             raise NotAllowedError
489         path = self._lookup_object(account, container, name)[0]
490         self._check_permissions(path, permissions)
491         self.permissions.access_set(path, permissions)
492     
493     @backend_method
494     def get_object_public(self, user, account, container, name):
495         """Return the public URL of the object if applicable."""
496         
497         logger.debug("get_object_public: %s %s %s", account, container, name)
498         self._can_read(user, account, container, name)
499         path = self._lookup_object(account, container, name)[0]
500         if self.permissions.public_check(path):
501             return '/public/' + path
502         return None
503     
504     @backend_method
505     def update_object_public(self, user, account, container, name, public):
506         """Update the public status of the object."""
507         
508         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
509         self._can_write(user, account, container, name)
510         path = self._lookup_object(account, container, name)[0]
511         if not public:
512             self.permissions.public_unset(path)
513         else:
514             self.permissions.public_set(path)
515     
516     @backend_method
517     def get_object_hashmap(self, user, account, container, name, version=None):
518         """Return the object's size and a list with partial hashes."""
519         
520         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
521         self._can_read(user, account, container, name)
522         path, node = self._lookup_object(account, container, name)
523         props = self._get_version(node, version)
524         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
525         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
526     
527     def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
528         if permissions is not None and user != account:
529             raise NotAllowedError
530         self._can_write(user, account, container, name)
531         if permissions is not None:
532             path = '/'.join((account, container, name))
533             self._check_permissions(path, permissions)
534         
535         account_path, account_node = self._lookup_account(account, True)
536         container_path, container_node = self._lookup_container(account, container)
537         path, node = self._put_object_node(container_path, container_node, name)
538         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
539         
540         # Check quota.
541         size_delta = size # Change with versioning.
542         if size_delta > 0:
543             account_quota = long(self._get_policy(account_node)['quota'])
544             container_quota = long(self._get_policy(container_node)['quota'])
545             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
546                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
547                 # This must be executed in a transaction, so the version is never created if it fails.
548                 raise QuotaError
549         
550         if not replace_meta and src_version_id is not None:
551             self.node.attribute_copy(src_version_id, dest_version_id)
552         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
553         if permissions is not None:
554             self.permissions.access_set(path, permissions)
555         return dest_version_id
556     
557     @backend_method
558     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
559         """Create/update an object with the specified size and partial hashes."""
560         
561         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
562         if size == 0: # No such thing as an empty hashmap.
563             hashmap = [self.put_block('')]
564         map = HashMap(self.block_size, self.hash_algorithm)
565         map.extend([binascii.unhexlify(x) for x in hashmap])
566         missing = self.store.block_search(map)
567         if missing:
568             ie = IndexError()
569             ie.data = [binascii.hexlify(x) for x in missing]
570             raise ie
571         
572         hash = map.hash()
573         dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
574         self.store.map_put(hash, map)
575         return dest_version_id
576     
577     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
578         self._can_read(user, src_account, src_container, src_name)
579         path, node = self._lookup_object(src_account, src_container, src_name)
580         props = self._get_version(node, src_version)
581         src_version_id = props[self.SERIAL]
582         hash = props[self.HASH]
583         size = props[self.SIZE]
584         
585         if replace_meta:
586             meta = dest_meta
587         else:
588             meta = {}
589         dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
590         if not replace_meta:
591             self.node.attribute_copy(src_version_id, dest_version_id)
592             self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
593         return dest_version_id
594     
595     @backend_method
596     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
597         """Copy an object's data and metadata."""
598         
599         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
600         return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
601     
602     @backend_method
603     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
604         """Move an object's data and metadata."""
605         
606         logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
607         if user != src_account:
608             raise NotAllowedError
609         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
610         self._delete_object(user, src_account, src_container, src_name)
611         return dest_version_id
612     
613     def _delete_object(self, user, account, container, name, until=None):
614         if user != account:
615             raise NotAllowedError
616         
617         if until is not None:
618             path = '/'.join((account, container, name))
619             node = self.node.node_lookup(path)
620             if node is None:
621                 return
622             hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
623             hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
624             for h in hashes:
625                 self.store.map_delete(h)
626             self.node.node_purge_children(node, until, CLUSTER_DELETED)
627             try:
628                 props = self._get_version(node)
629             except NameError:
630                 pass
631             else:
632                 self.permissions.access_clear(path)
633             return
634         
635         path, node = self._lookup_object(account, container, name)
636         src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
637         self.permissions.access_clear(path)
638     
639     @backend_method
640     def delete_object(self, user, account, container, name, until=None):
641         """Delete/purge an object."""
642         
643         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
644         self._delete_object(user, account, container, name, until)
645     
646     @backend_method
647     def list_versions(self, user, account, container, name):
648         """Return a list of all (version, version_timestamp) tuples for an object."""
649         
650         logger.debug("list_versions: %s %s %s", account, container, name)
651         self._can_read(user, account, container, name)
652         path, node = self._lookup_object(account, container, name)
653         versions = self.node.node_get_versions(node)
654         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
655     
656     @backend_method(autocommit=0)
657     def get_block(self, hash):
658         """Return a block's data."""
659         
660         logger.debug("get_block: %s", hash)
661         block = self.store.block_get(binascii.unhexlify(hash))
662         if not block:
663             raise NameError('Block does not exist')
664         return block
665     
666     @backend_method(autocommit=0)
667     def put_block(self, data):
668         """Store a block and return the hash."""
669         
670         logger.debug("put_block: %s", len(data))
671         return binascii.hexlify(self.store.block_put(data))
672     
673     @backend_method(autocommit=0)
674     def update_block(self, hash, data, offset=0):
675         """Update a known block and return the hash."""
676         
677         logger.debug("update_block: %s %s %s", hash, len(data), offset)
678         if offset == 0 and len(data) == self.block_size:
679             return self.put_block(data)
680         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
681         return binascii.hexlify(h)
682     
683     # Path functions.
684     
685     def _put_object_node(self, path, parent, name):
686         path = '/'.join((path, name))
687         node = self.node.node_lookup(path)
688         if node is None:
689             node = self.node.node_create(parent, path)
690         return path, node
691     
692     def _put_path(self, user, parent, path):
693         node = self.node.node_create(parent, path)
694         self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
695         return node
696     
697     def _lookup_account(self, account, create=True):
698         node = self.node.node_lookup(account)
699         if node is None and create:
700             node = self._put_path(account, self.ROOTNODE, account) # User is account.
701         return account, node
702     
703     def _lookup_container(self, account, container):
704         path = '/'.join((account, container))
705         node = self.node.node_lookup(path)
706         if node is None:
707             raise NameError('Container does not exist')
708         return path, node
709     
710     def _lookup_object(self, account, container, name):
711         path = '/'.join((account, container, name))
712         node = self.node.node_lookup(path)
713         if node is None:
714             raise NameError('Object does not exist')
715         return path, node
716     
717     def _get_properties(self, node, until=None):
718         """Return properties until the timestamp given."""
719         
720         before = until if until is not None else inf
721         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
722         if props is None and until is not None:
723             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
724         if props is None:
725             raise NameError('Path does not exist')
726         return props
727     
728     def _get_statistics(self, node, until=None):
729         """Return count, sum of size and latest timestamp of everything under node."""
730         
731         if until is None:
732             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
733         else:
734             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
735         if stats is None:
736             stats = (0, 0, 0)
737         return stats
738     
739     def _get_version(self, node, version=None):
740         if version is None:
741             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
742             if props is None:
743                 raise NameError('Object does not exist')
744         else:
745             try:
746                 version = int(version)
747             except ValueError:
748                 raise IndexError('Version does not exist')
749             props = self.node.version_get_properties(version)
750             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
751                 raise IndexError('Version does not exist')
752         return props
753     
754     def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
755         """Create a new version of the node."""
756         
757         props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
758         if props is not None:
759             src_version_id = props[self.SERIAL]
760             src_hash = props[self.HASH]
761             src_size = props[self.SIZE]
762         else:
763             src_version_id = None
764             src_hash = None
765             src_size = 0
766         if size is None:
767             hash = src_hash # This way hash can be set to None.
768             size = src_size
769         
770         if src_version_id is not None:
771             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
772         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
773         return src_version_id, dest_version_id
774     
775     def _put_metadata(self, user, node, meta, replace=False):
776         """Create a new version and store metadata."""
777         
778         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
779         
780         # TODO: Merge with other functions that update metadata...
781         if not replace:
782             if src_version_id is not None:
783                 self.node.attribute_copy(src_version_id, dest_version_id)
784             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
785             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
786         else:
787             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
788         return dest_version_id
789     
790     def _list_limits(self, listing, marker, limit):
791         start = 0
792         if marker:
793             try:
794                 start = listing.index(marker) + 1
795             except ValueError:
796                 pass
797         if not limit or limit > 10000:
798             limit = 10000
799         return start, limit
800     
801     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
802         cont_prefix = path + '/'
803         prefix = cont_prefix + prefix
804         start = cont_prefix + marker if marker else None
805         before = until if until is not None else inf
806         filterq = ','.join(keys) if keys else None
807         
808         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
809         objects.extend([(p, None) for p in prefixes] if virtual else [])
810         objects.sort(key=lambda x: x[0])
811         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
812         
813         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
814         return objects[start:start + limit]
815     
816     # Policy functions.
817     
818     def _check_policy(self, policy):
819         for k in policy.keys():
820             if policy[k] == '':
821                 policy[k] = self.default_policy.get(k)
822         for k, v in policy.iteritems():
823             if k == 'quota':
824                 q = int(v) # May raise ValueError.
825                 if q < 0:
826                     raise ValueError
827             elif k == 'versioning':
828                 if v not in ['auto', 'manual', 'none']:
829                     raise ValueError
830             else:
831                 raise ValueError
832     
833     def _put_policy(self, node, policy, replace):
834         if replace:
835             for k, v in self.default_policy.iteritems():
836                 if k not in policy:
837                     policy[k] = v
838         self.node.policy_set(node, policy)
839     
840     def _get_policy(self, node):
841         policy = self.default_policy.copy()
842         policy.update(self.node.policy_get(node))
843         return policy
844     
845     # Access control functions.
846     
847     def _check_groups(self, groups):
848         # raise ValueError('Bad characters in groups')
849         pass
850     
851     def _check_permissions(self, path, permissions):
852         # raise ValueError('Bad characters in permissions')
853         
854         # Check for existing permissions.
855         paths = self.permissions.access_list(path)
856         if paths:
857             ae = AttributeError()
858             ae.data = paths
859             raise ae
860     
861     def _can_read(self, user, account, container, name):
862         if user == account:
863             return True
864         path = '/'.join((account, container, name))
865         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
866             raise NotAllowedError
867     
868     def _can_write(self, user, account, container, name):
869         if user == account:
870             return True
871         path = '/'.join((account, container, name))
872         if not self.permissions.access_check(path, self.WRITE, user):
873             raise NotAllowedError
874     
875     def _allowed_accounts(self, user):
876         allow = set()
877         for path in self.permissions.access_list_paths(user):
878             allow.add(path.split('/', 1)[0])
879         return sorted(allow)
880     
881     def _allowed_containers(self, user, account):
882         allow = set()
883         for path in self.permissions.access_list_paths(user, account):
884             allow.add(path.split('/', 2)[1])
885         return sorted(allow)