change pithis-sh & client/lib arguments
[pithos] / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import binascii
40
41 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42
43 from pithos.lib.hashmap import HashMap
44
45 # Default modules and settings.
46 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 DEFAULT_BLOCK_PATH = 'data/'
50 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52
53 QUEUE_MESSAGE_KEY = '#'
54 QUEUE_CLIENT_ID = 2 # Pithos.
55
56 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
57
58 inf = float('inf')
59
60 ULTIMATE_ANSWER = 42
61
62
63 logger = logging.getLogger(__name__)
64
65
66 def backend_method(func=None, autocommit=1):
67     if func is None:
68         def fn(func):
69             return backend_method(func, autocommit)
70         return fn
71
72     if not autocommit:
73         return func
74     def fn(self, *args, **kw):
75         self.wrapper.execute()
76         try:
77             ret = func(self, *args, **kw)
78             self.wrapper.commit()
79             return ret
80         except:
81             self.wrapper.rollback()
82             raise
83     return fn
84
85
86 class ModularBackend(BaseBackend):
87     """A modular backend.
88     
89     Uses modules for SQL functions and storage.
90     """
91     
92     def __init__(self, db_module=None, db_connection=None,
93                  block_module=None, block_path=None,
94                  queue_module=None, queue_connection=None):
95         db_module = db_module or DEFAULT_DB_MODULE
96         db_connection = db_connection or DEFAULT_DB_CONNECTION
97         block_module = block_module or DEFAULT_BLOCK_MODULE
98         block_path = block_path or DEFAULT_BLOCK_PATH
99         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
100         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
101         
102         self.hash_algorithm = 'sha256'
103         self.block_size = 4 * 1024 * 1024 # 4MB
104         
105         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
106         
107         def load_module(m):
108             __import__(m)
109             return sys.modules[m]
110         
111         self.db_module = load_module(db_module)
112         self.wrapper = self.db_module.DBWrapper(db_connection)
113         params = {'wrapper': self.wrapper}
114         self.permissions = self.db_module.Permissions(**params)
115         for x in ['READ', 'WRITE']:
116             setattr(self, x, getattr(self.db_module, x))
117         self.node = self.db_module.Node(**params)
118         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
119             setattr(self, x, getattr(self.db_module, x))
120         
121         self.block_module = load_module(block_module)
122         params = {'path': block_path,
123                   'block_size': self.block_size,
124                   'hash_algorithm': self.hash_algorithm}
125         self.store = self.block_module.Store(**params)
126
127         if queue_module and queue_connection:
128             self.queue_module = load_module(queue_module)
129             params = {'exchange': queue_connection,
130                       'message_key': QUEUE_MESSAGE_KEY,
131                       'client_id': QUEUE_CLIENT_ID}
132             self.queue = self.queue_module.Queue(**params)
133         else:
134             class NoQueue:
135                 def send(self, *args):
136                     pass
137             
138             self.queue = NoQueue()
139     
140     def close(self):
141         self.wrapper.close()
142     
143     @backend_method
144     def list_accounts(self, user, marker=None, limit=10000):
145         """Return a list of accounts the user can access."""
146         
147         logger.debug("list_accounts: %s %s %s", user, marker, limit)
148         allowed = self._allowed_accounts(user)
149         start, limit = self._list_limits(allowed, marker, limit)
150         return allowed[start:start + limit]
151     
152     @backend_method
153     def get_account_meta(self, user, account, domain, until=None):
154         """Return a dictionary with the account metadata for the domain."""
155         
156         logger.debug("get_account_meta: %s %s %s", account, domain, until)
157         path, node = self._lookup_account(account, user == account)
158         if user != account:
159             if until or node is None or account not in self._allowed_accounts(user):
160                 raise NotAllowedError
161         try:
162             props = self._get_properties(node, until)
163             mtime = props[self.MTIME]
164         except NameError:
165             props = None
166             mtime = until
167         count, bytes, tstamp = self._get_statistics(node, until)
168         tstamp = max(tstamp, mtime)
169         if until is None:
170             modified = tstamp
171         else:
172             modified = self._get_statistics(node)[2] # Overall last modification.
173             modified = max(modified, mtime)
174         
175         if user != account:
176             meta = {'name': account}
177         else:
178             meta = {}
179             if props is not None:
180                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
181             if until is not None:
182                 meta.update({'until_timestamp': tstamp})
183             meta.update({'name': account, 'count': count, 'bytes': bytes})
184         meta.update({'modified': modified})
185         return meta
186     
187     @backend_method
188     def update_account_meta(self, user, account, domain, meta, replace=False):
189         """Update the metadata associated with the account for the domain."""
190         
191         logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
192         if user != account:
193             raise NotAllowedError
194         path, node = self._lookup_account(account, True)
195         self._put_metadata(user, node, domain, meta, replace)
196     
197     @backend_method
198     def get_account_groups(self, user, account):
199         """Return a dictionary with the user groups defined for this account."""
200         
201         logger.debug("get_account_groups: %s", account)
202         if user != account:
203             if account not in self._allowed_accounts(user):
204                 raise NotAllowedError
205             return {}
206         self._lookup_account(account, True)
207         return self.permissions.group_dict(account)
208     
209     @backend_method
210     def update_account_groups(self, user, account, groups, replace=False):
211         """Update the groups associated with the account."""
212         
213         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
214         if user != account:
215             raise NotAllowedError
216         self._lookup_account(account, True)
217         self._check_groups(groups)
218         if replace:
219             self.permissions.group_destroy(account)
220         for k, v in groups.iteritems():
221             if not replace: # If not already deleted.
222                 self.permissions.group_delete(account, k)
223             if v:
224                 self.permissions.group_addmany(account, k, v)
225     
226     @backend_method
227     def get_account_policy(self, user, account):
228         """Return a dictionary with the account policy."""
229         
230         logger.debug("get_account_policy: %s", account)
231         if user != account:
232             if account not in self._allowed_accounts(user):
233                 raise NotAllowedError
234             return {}
235         path, node = self._lookup_account(account, True)
236         return self._get_policy(node)
237     
238     @backend_method
239     def update_account_policy(self, user, account, policy, replace=False):
240         """Update the policy associated with the account."""
241         
242         logger.debug("update_account_policy: %s %s %s", account, policy, replace)
243         if user != account:
244             raise NotAllowedError
245         path, node = self._lookup_account(account, True)
246         self._check_policy(policy)
247         self._put_policy(node, policy, replace)
248     
249     @backend_method
250     def put_account(self, user, account, policy={}):
251         """Create a new account with the given name."""
252         
253         logger.debug("put_account: %s %s", account, policy)
254         if user != account:
255             raise NotAllowedError
256         node = self.node.node_lookup(account)
257         if node is not None:
258             raise NameError('Account already exists')
259         if policy:
260             self._check_policy(policy)
261         node = self._put_path(user, self.ROOTNODE, account)
262         self._put_policy(node, policy, True)
263     
264     @backend_method
265     def delete_account(self, user, account):
266         """Delete the account with the given name."""
267         
268         logger.debug("delete_account: %s", account)
269         if user != account:
270             raise NotAllowedError
271         node = self.node.node_lookup(account)
272         if node is None:
273             return
274         if not self.node.node_remove(node):
275             raise IndexError('Account is not empty')
276         self.permissions.group_destroy(account)
277     
278     @backend_method
279     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
280         """Return a list of containers existing under an account."""
281         
282         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
283         if user != account:
284             if until or account not in self._allowed_accounts(user):
285                 raise NotAllowedError
286             allowed = self._allowed_containers(user, account)
287             start, limit = self._list_limits(allowed, marker, limit)
288             return allowed[start:start + limit]
289         if shared:
290             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
291             allowed = list(set(allowed))
292             start, limit = self._list_limits(allowed, marker, limit)
293             return allowed[start:start + limit]
294         node = self.node.node_lookup(account)
295         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
296     
297     @backend_method
298     def get_container_meta(self, user, account, container, domain, until=None):
299         """Return a dictionary with the container metadata for the domain."""
300         
301         logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
302         if user != account:
303             if until or container not in self._allowed_containers(user, account):
304                 raise NotAllowedError
305         path, node = self._lookup_container(account, container)
306         props = self._get_properties(node, until)
307         mtime = props[self.MTIME]
308         count, bytes, tstamp = self._get_statistics(node, until)
309         tstamp = max(tstamp, mtime)
310         if until is None:
311             modified = tstamp
312         else:
313             modified = self._get_statistics(node)[2] # Overall last modification.
314             modified = max(modified, mtime)
315         
316         if user != account:
317             meta = {'name': container}
318         else:
319             meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
320             if until is not None:
321                 meta.update({'until_timestamp': tstamp})
322             meta.update({'name': container, 'count': count, 'bytes': bytes})
323         meta.update({'modified': modified})
324         return meta
325     
326     @backend_method
327     def update_container_meta(self, user, account, container, domain, meta, replace=False):
328         """Update the metadata associated with the container for the domain."""
329         
330         logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         self._put_metadata(user, node, domain, meta, replace)
335     
336     @backend_method
337     def get_container_policy(self, user, account, container):
338         """Return a dictionary with the container policy."""
339         
340         logger.debug("get_container_policy: %s %s", account, container)
341         if user != account:
342             if container not in self._allowed_containers(user, account):
343                 raise NotAllowedError
344             return {}
345         path, node = self._lookup_container(account, container)
346         return self._get_policy(node)
347     
348     @backend_method
349     def update_container_policy(self, user, account, container, policy, replace=False):
350         """Update the policy associated with the container."""
351         
352         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
353         if user != account:
354             raise NotAllowedError
355         path, node = self._lookup_container(account, container)
356         self._check_policy(policy)
357         self._put_policy(node, policy, replace)
358     
359     @backend_method
360     def put_container(self, user, account, container, policy={}):
361         """Create a new container with the given name."""
362         
363         logger.debug("put_container: %s %s %s", account, container, policy)
364         if user != account:
365             raise NotAllowedError
366         try:
367             path, node = self._lookup_container(account, container)
368         except NameError:
369             pass
370         else:
371             raise NameError('Container already exists')
372         if policy:
373             self._check_policy(policy)
374         path = '/'.join((account, container))
375         node = self._put_path(user, self._lookup_account(account, True)[1], path)
376         self._put_policy(node, policy, True)
377     
378     @backend_method
379     def delete_container(self, user, account, container, until=None):
380         """Delete/purge the container with the given name."""
381         
382         logger.debug("delete_container: %s %s %s", account, container, until)
383         if user != account:
384             raise NotAllowedError
385         path, node = self._lookup_container(account, container)
386         
387         if until is not None:
388             hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
389             for h in hashes:
390                 self.store.map_delete(h)
391             self.node.node_purge_children(node, until, CLUSTER_DELETED)
392             self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
393             return
394         
395         if self._get_statistics(node)[0] > 0:
396             raise IndexError('Container is not empty')
397         hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
398         for h in hashes:
399             self.store.map_delete(h)
400         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
401         self.node.node_remove(node)
402         self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
403     
404     @backend_method
405     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
406         """Return a list of objects existing under a container."""
407         
408         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
409         allowed = []
410         if user != account:
411             if until:
412                 raise NotAllowedError
413             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
414             if not allowed:
415                 raise NotAllowedError
416         else:
417             if shared:
418                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
419                 if not allowed:
420                     return []
421         path, node = self._lookup_container(account, container)
422         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
423     
424     @backend_method
425     def list_object_meta(self, user, account, container, domain, until=None):
426         """Return a list with all the container's object meta keys for the domain."""
427         
428         logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
429         allowed = []
430         if user != account:
431             if until:
432                 raise NotAllowedError
433             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
434             if not allowed:
435                 raise NotAllowedError
436         path, node = self._lookup_container(account, container)
437         before = until if until is not None else inf
438         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
439     
440     @backend_method
441     def get_object_meta(self, user, account, container, name, domain, version=None):
442         """Return a dictionary with the object metadata for the domain."""
443         
444         logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
445         self._can_read(user, account, container, name)
446         path, node = self._lookup_object(account, container, name)
447         props = self._get_version(node, version)
448         if version is None:
449             modified = props[self.MTIME]
450         else:
451             try:
452                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
453             except NameError: # Object may be deleted.
454                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
455                 if del_props is None:
456                     raise NameError('Object does not exist')
457                 modified = del_props[self.MTIME]
458         
459         meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
460         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
461         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
462         meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
463         return meta
464     
465     @backend_method
466     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
467         """Update the metadata associated with the object for the domain and return the new version."""
468         
469         logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
470         self._can_write(user, account, container, name)
471         path, node = self._lookup_object(account, container, name)
472         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
473         self._apply_versioning(account, container, src_version_id)
474         return dest_version_id
475     
476     @backend_method
477     def get_object_permissions(self, user, account, container, name):
478         """Return the action allowed on the object, the path
479         from which the object gets its permissions from,
480         along with a dictionary containing the permissions."""
481         
482         logger.debug("get_object_permissions: %s %s %s", account, container, name)
483         allowed = 'write'
484         if user != account:
485             path = '/'.join((account, container, name))
486             if self.permissions.access_check(path, self.WRITE, user):
487                 allowed = 'write'
488             elif self.permissions.access_check(path, self.READ, user):
489                 allowed = 'read'
490             else:
491                 raise NotAllowedError
492         path = self._lookup_object(account, container, name)[0]
493         return (allowed,) + self.permissions.access_inherit(path)
494     
495     @backend_method
496     def update_object_permissions(self, user, account, container, name, permissions):
497         """Update the permissions associated with the object."""
498         
499         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
500         if user != account:
501             raise NotAllowedError
502         path = self._lookup_object(account, container, name)[0]
503         self._check_permissions(path, permissions)
504         self.permissions.access_set(path, permissions)
505     
506     @backend_method
507     def get_object_public(self, user, account, container, name):
508         """Return the public id of the object if applicable."""
509         
510         logger.debug("get_object_public: %s %s %s", account, container, name)
511         self._can_read(user, account, container, name)
512         path = self._lookup_object(account, container, name)[0]
513         p = self.permissions.public_get(path)
514         if p is not None:
515             p += ULTIMATE_ANSWER
516         return p
517     
518     @backend_method
519     def update_object_public(self, user, account, container, name, public):
520         """Update the public status of the object."""
521         
522         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
523         self._can_write(user, account, container, name)
524         path = self._lookup_object(account, container, name)[0]
525         if not public:
526             self.permissions.public_unset(path)
527         else:
528             self.permissions.public_set(path)
529     
530     @backend_method
531     def get_object_hashmap(self, user, account, container, name, version=None):
532         """Return the object's size and a list with partial hashes."""
533         
534         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
535         self._can_read(user, account, container, name)
536         path, node = self._lookup_object(account, container, name)
537         props = self._get_version(node, version)
538         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
539         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
540     
541     def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
542         if permissions is not None and user != account:
543             raise NotAllowedError
544         self._can_write(user, account, container, name)
545         if permissions is not None:
546             path = '/'.join((account, container, name))
547             self._check_permissions(path, permissions)
548         
549         account_path, account_node = self._lookup_account(account, True)
550         container_path, container_node = self._lookup_container(account, container)
551         path, node = self._put_object_node(container_path, container_node, name)
552         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
553         
554         # Check quota.
555         versioning = self._get_policy(container_node)['versioning']
556         if versioning != 'auto':
557             size_delta = size - 0 # TODO: Get previous size.
558         else:
559             size_delta = size
560         if size_delta > 0:
561             account_quota = long(self._get_policy(account_node)['quota'])
562             container_quota = long(self._get_policy(container_node)['quota'])
563             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
564                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
565                 # This must be executed in a transaction, so the version is never created if it fails.
566                 raise QuotaError
567         
568         if permissions is not None:
569             self.permissions.access_set(path, permissions)
570         self._apply_versioning(account, container, pre_version_id)
571         return pre_version_id, dest_version_id
572     
573     @backend_method
574     def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
575         """Create/update an object with the specified size and partial hashes."""
576         
577         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
578         if size == 0: # No such thing as an empty hashmap.
579             hashmap = [self.put_block('')]
580         map = HashMap(self.block_size, self.hash_algorithm)
581         map.extend([binascii.unhexlify(x) for x in hashmap])
582         missing = self.store.block_search(map)
583         if missing:
584             ie = IndexError()
585             ie.data = [binascii.hexlify(x) for x in missing]
586             raise ie
587         
588         hash = map.hash()
589         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
590         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
591         self.store.map_put(hash, map)
592         self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
593         return dest_version_id
594     
595     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
596         self._can_read(user, src_account, src_container, src_name)
597         path, node = self._lookup_object(src_account, src_container, src_name)
598         # TODO: Will do another fetch of the properties in duplicate version...
599         props = self._get_version(node, src_version) # Check to see if source exists.
600         src_version_id = props[self.SERIAL]
601         hash = props[self.HASH]
602         size = props[self.SIZE]
603         
604         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
605         pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
606         self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
607         return dest_version_id
608     
609     @backend_method
610     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
611         """Copy an object's data and metadata."""
612         
613         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
614         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
615         self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
616         return dest_version_id
617     
618     @backend_method
619     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
620         """Move an object's data and metadata."""
621         
622         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
623         if user != src_account:
624             raise NotAllowedError
625         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
626         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
627             self._delete_object(user, src_account, src_container, src_name)
628         self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
629         return dest_version_id
630     
631     def _delete_object(self, user, account, container, name, until=None):
632         if user != account:
633             raise NotAllowedError
634         
635         if until is not None:
636             path = '/'.join((account, container, name))
637             node = self.node.node_lookup(path)
638             if node is None:
639                 return
640             hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
641             hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
642             for h in hashes:
643                 self.store.map_delete(h)
644             self.node.node_purge(node, until, CLUSTER_DELETED)
645             try:
646                 props = self._get_version(node)
647             except NameError:
648                 self.permissions.access_clear(path)
649             self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
650             return
651         
652         path, node = self._lookup_object(account, container, name)
653         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
654         self._apply_versioning(account, container, src_version_id)
655         self.permissions.access_clear(path)
656     
657     @backend_method
658     def delete_object(self, user, account, container, name, until=None):
659         """Delete/purge an object."""
660         
661         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
662         self._delete_object(user, account, container, name, until)
663     
664     @backend_method
665     def list_versions(self, user, account, container, name):
666         """Return a list of all (version, version_timestamp) tuples for an object."""
667         
668         logger.debug("list_versions: %s %s %s", account, container, name)
669         self._can_read(user, account, container, name)
670         path, node = self._lookup_object(account, container, name)
671         versions = self.node.node_get_versions(node)
672         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
673     
674     @backend_method
675     def get_uuid(self, user, uuid):
676         """Return the (account, container, name) for the UUID given."""
677         
678         logger.debug("get_uuid: %s", uuid)
679         info = self.node.latest_uuid(uuid)
680         if info is None:
681             raise NameError
682         path, serial = info
683         account, container, name = path.split('/', 2)
684         self._can_read(user, account, container, name)
685         return (account, container, name)
686     
687     @backend_method
688     def get_public(self, user, public):
689         """Return the (account, container, name) for the public id given."""
690         
691         logger.debug("get_public: %s", public)
692         if public is None or public < ULTIMATE_ANSWER:
693             raise NameError
694         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
695         if path is None:
696             raise NameError
697         account, container, name = path.split('/', 2)
698         self._can_read(user, account, container, name)
699         return (account, container, name)
700     
701     @backend_method(autocommit=0)
702     def get_block(self, hash):
703         """Return a block's data."""
704         
705         logger.debug("get_block: %s", hash)
706         block = self.store.block_get(binascii.unhexlify(hash))
707         if not block:
708             raise NameError('Block does not exist')
709         return block
710     
711     @backend_method(autocommit=0)
712     def put_block(self, data):
713         """Store a block and return the hash."""
714         
715         logger.debug("put_block: %s", len(data))
716         return binascii.hexlify(self.store.block_put(data))
717     
718     @backend_method(autocommit=0)
719     def update_block(self, hash, data, offset=0):
720         """Update a known block and return the hash."""
721         
722         logger.debug("update_block: %s %s %s", hash, len(data), offset)
723         if offset == 0 and len(data) == self.block_size:
724             return self.put_block(data)
725         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
726         return binascii.hexlify(h)
727     
728     # Path functions.
729     
730     def _generate_uuid(self):
731         return str(uuidlib.uuid4())
732     
733     def _put_object_node(self, path, parent, name):
734         path = '/'.join((path, name))
735         node = self.node.node_lookup(path)
736         if node is None:
737             node = self.node.node_create(parent, path)
738         return path, node
739     
740     def _put_path(self, user, parent, path):
741         node = self.node.node_create(parent, path)
742         self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
743         return node
744     
745     def _lookup_account(self, account, create=True):
746         node = self.node.node_lookup(account)
747         if node is None and create:
748             node = self._put_path(account, self.ROOTNODE, account) # User is account.
749         return account, node
750     
751     def _lookup_container(self, account, container):
752         path = '/'.join((account, container))
753         node = self.node.node_lookup(path)
754         if node is None:
755             raise NameError('Container does not exist')
756         return path, node
757     
758     def _lookup_object(self, account, container, name):
759         path = '/'.join((account, container, name))
760         node = self.node.node_lookup(path)
761         if node is None:
762             raise NameError('Object does not exist')
763         return path, node
764     
765     def _get_properties(self, node, until=None):
766         """Return properties until the timestamp given."""
767         
768         before = until if until is not None else inf
769         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
770         if props is None and until is not None:
771             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
772         if props is None:
773             raise NameError('Path does not exist')
774         return props
775     
776     def _get_statistics(self, node, until=None):
777         """Return count, sum of size and latest timestamp of everything under node."""
778         
779         if until is None:
780             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
781         else:
782             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
783         if stats is None:
784             stats = (0, 0, 0)
785         return stats
786     
787     def _get_version(self, node, version=None):
788         if version is None:
789             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
790             if props is None:
791                 raise NameError('Object does not exist')
792         else:
793             try:
794                 version = int(version)
795             except ValueError:
796                 raise IndexError('Version does not exist')
797             props = self.node.version_get_properties(version)
798             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
799                 raise IndexError('Version does not exist')
800         return props
801     
802     def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
803         """Create a new version of the node."""
804         
805         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
806         if props is not None:
807             src_version_id = props[self.SERIAL]
808             src_hash = props[self.HASH]
809             src_size = props[self.SIZE]
810         else:
811             src_version_id = None
812             src_hash = None
813             src_size = 0
814         if size is None:
815             hash = src_hash # This way hash can be set to None.
816             size = src_size
817         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
818         
819         if src_node is None:
820             pre_version_id = src_version_id
821         else:
822             pre_version_id = None
823             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
824             if props is not None:
825                 pre_version_id = props[self.SERIAL]
826         if pre_version_id is not None:
827             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
828         
829         dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
830         return pre_version_id, dest_version_id
831     
832     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
833         if src_version_id is not None:
834             self.node.attribute_copy(src_version_id, dest_version_id)
835         if not replace:
836             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
837             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
838         else:
839             self.node.attribute_del(dest_version_id, domain)
840             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
841     
842     def _put_metadata(self, user, node, domain, meta, replace=False):
843         """Create a new version and store metadata."""
844         
845         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
846         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
847         return src_version_id, dest_version_id
848     
849     def _list_limits(self, listing, marker, limit):
850         start = 0
851         if marker:
852             try:
853                 start = listing.index(marker) + 1
854             except ValueError:
855                 pass
856         if not limit or limit > 10000:
857             limit = 10000
858         return start, limit
859     
860     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
861         cont_prefix = path + '/'
862         prefix = cont_prefix + prefix
863         start = cont_prefix + marker if marker else None
864         before = until if until is not None else inf
865         filterq = keys if domain else []
866         sizeq = size_range
867         
868         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
869         objects.extend([(p, None) for p in prefixes] if virtual else [])
870         objects.sort(key=lambda x: x[0])
871         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
872         
873         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
874         return objects[start:start + limit]
875     
876     # Policy functions.
877     
878     def _check_policy(self, policy):
879         for k in policy.keys():
880             if policy[k] == '':
881                 policy[k] = self.default_policy.get(k)
882         for k, v in policy.iteritems():
883             if k == 'quota':
884                 q = int(v) # May raise ValueError.
885                 if q < 0:
886                     raise ValueError
887             elif k == 'versioning':
888                 if v not in ['auto', 'none']:
889                     raise ValueError
890             else:
891                 raise ValueError
892     
893     def _put_policy(self, node, policy, replace):
894         if replace:
895             for k, v in self.default_policy.iteritems():
896                 if k not in policy:
897                     policy[k] = v
898         self.node.policy_set(node, policy)
899     
900     def _get_policy(self, node):
901         policy = self.default_policy.copy()
902         policy.update(self.node.policy_get(node))
903         return policy
904     
905     def _apply_versioning(self, account, container, version_id):
906         if version_id is None:
907             return
908         path, node = self._lookup_container(account, container)
909         versioning = self._get_policy(node)['versioning']
910         if versioning != 'auto':
911             hash = self.node.version_remove(version_id)
912             self.store.map_delete(hash)
913             self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
914     
915     # Access control functions.
916     
917     def _check_groups(self, groups):
918         # raise ValueError('Bad characters in groups')
919         pass
920     
921     def _check_permissions(self, path, permissions):
922         # raise ValueError('Bad characters in permissions')
923         
924         # Check for existing permissions.
925         paths = self.permissions.access_list(path)
926         if paths:
927             ae = AttributeError()
928             ae.data = paths
929             raise ae
930     
931     def _can_read(self, user, account, container, name):
932         if user == account:
933             return True
934         path = '/'.join((account, container, name))
935         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
936             raise NotAllowedError
937     
938     def _can_write(self, user, account, container, name):
939         if user == account:
940             return True
941         path = '/'.join((account, container, name))
942         if not self.permissions.access_check(path, self.WRITE, user):
943             raise NotAllowedError
944     
945     def _allowed_accounts(self, user):
946         allow = set()
947         for path in self.permissions.access_list_paths(user):
948             allow.add(path.split('/', 1)[0])
949         return sorted(allow)
950     
951     def _allowed_containers(self, user, account):
952         allow = set()
953         for path in self.permissions.access_list_paths(user, account):
954             allow.add(path.split('/', 2)[1])
955         return sorted(allow)