1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
41 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43 from pithos.lib.hashmap import HashMap
45 # Default modules and settings.
46 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 DEFAULT_BLOCK_PATH = 'data/'
50 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
53 QUEUE_MESSAGE_KEY = '#'
54 QUEUE_CLIENT_ID = 2 # Pithos.
56 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
63 logger = logging.getLogger(__name__)
66 def backend_method(func=None, autocommit=1):
69 return backend_method(func, autocommit)
74 def fn(self, *args, **kw):
75 self.wrapper.execute()
77 ret = func(self, *args, **kw)
81 self.wrapper.rollback()
86 class ModularBackend(BaseBackend):
89 Uses modules for SQL functions and storage.
92 def __init__(self, db_module=None, db_connection=None,
93 block_module=None, block_path=None,
94 queue_module=None, queue_connection=None):
95 db_module = db_module or DEFAULT_DB_MODULE
96 db_connection = db_connection or DEFAULT_DB_CONNECTION
97 block_module = block_module or DEFAULT_BLOCK_MODULE
98 block_path = block_path or DEFAULT_BLOCK_PATH
99 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
100 #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
102 self.hash_algorithm = 'sha256'
103 self.block_size = 4 * 1024 * 1024 # 4MB
105 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
109 return sys.modules[m]
111 self.db_module = load_module(db_module)
112 self.wrapper = self.db_module.DBWrapper(db_connection)
113 params = {'wrapper': self.wrapper}
114 self.permissions = self.db_module.Permissions(**params)
115 for x in ['READ', 'WRITE']:
116 setattr(self, x, getattr(self.db_module, x))
117 self.node = self.db_module.Node(**params)
118 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
119 setattr(self, x, getattr(self.db_module, x))
121 self.block_module = load_module(block_module)
122 params = {'path': block_path,
123 'block_size': self.block_size,
124 'hash_algorithm': self.hash_algorithm}
125 self.store = self.block_module.Store(**params)
127 if queue_module and queue_connection:
128 self.queue_module = load_module(queue_module)
129 params = {'exchange': queue_connection,
130 'message_key': QUEUE_MESSAGE_KEY,
131 'client_id': QUEUE_CLIENT_ID}
132 self.queue = self.queue_module.Queue(**params)
135 def send(self, *args):
138 self.queue = NoQueue()
144 def list_accounts(self, user, marker=None, limit=10000):
145 """Return a list of accounts the user can access."""
147 logger.debug("list_accounts: %s %s %s", user, marker, limit)
148 allowed = self._allowed_accounts(user)
149 start, limit = self._list_limits(allowed, marker, limit)
150 return allowed[start:start + limit]
153 def get_account_meta(self, user, account, domain, until=None):
154 """Return a dictionary with the account metadata for the domain."""
156 logger.debug("get_account_meta: %s %s %s", account, domain, until)
157 path, node = self._lookup_account(account, user == account)
159 if until or node is None or account not in self._allowed_accounts(user):
160 raise NotAllowedError
162 props = self._get_properties(node, until)
163 mtime = props[self.MTIME]
167 count, bytes, tstamp = self._get_statistics(node, until)
168 tstamp = max(tstamp, mtime)
172 modified = self._get_statistics(node)[2] # Overall last modification.
173 modified = max(modified, mtime)
176 meta = {'name': account}
179 if props is not None:
180 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
181 if until is not None:
182 meta.update({'until_timestamp': tstamp})
183 meta.update({'name': account, 'count': count, 'bytes': bytes})
184 meta.update({'modified': modified})
188 def update_account_meta(self, user, account, domain, meta, replace=False):
189 """Update the metadata associated with the account for the domain."""
191 logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
193 raise NotAllowedError
194 path, node = self._lookup_account(account, True)
195 self._put_metadata(user, node, domain, meta, replace)
198 def get_account_groups(self, user, account):
199 """Return a dictionary with the user groups defined for this account."""
201 logger.debug("get_account_groups: %s", account)
203 if account not in self._allowed_accounts(user):
204 raise NotAllowedError
206 self._lookup_account(account, True)
207 return self.permissions.group_dict(account)
210 def update_account_groups(self, user, account, groups, replace=False):
211 """Update the groups associated with the account."""
213 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
215 raise NotAllowedError
216 self._lookup_account(account, True)
217 self._check_groups(groups)
219 self.permissions.group_destroy(account)
220 for k, v in groups.iteritems():
221 if not replace: # If not already deleted.
222 self.permissions.group_delete(account, k)
224 self.permissions.group_addmany(account, k, v)
227 def get_account_policy(self, user, account):
228 """Return a dictionary with the account policy."""
230 logger.debug("get_account_policy: %s", account)
232 if account not in self._allowed_accounts(user):
233 raise NotAllowedError
235 path, node = self._lookup_account(account, True)
236 return self._get_policy(node)
239 def update_account_policy(self, user, account, policy, replace=False):
240 """Update the policy associated with the account."""
242 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
244 raise NotAllowedError
245 path, node = self._lookup_account(account, True)
246 self._check_policy(policy)
247 self._put_policy(node, policy, replace)
250 def put_account(self, user, account, policy={}):
251 """Create a new account with the given name."""
253 logger.debug("put_account: %s %s", account, policy)
255 raise NotAllowedError
256 node = self.node.node_lookup(account)
258 raise NameError('Account already exists')
260 self._check_policy(policy)
261 node = self._put_path(user, self.ROOTNODE, account)
262 self._put_policy(node, policy, True)
265 def delete_account(self, user, account):
266 """Delete the account with the given name."""
268 logger.debug("delete_account: %s", account)
270 raise NotAllowedError
271 node = self.node.node_lookup(account)
274 if not self.node.node_remove(node):
275 raise IndexError('Account is not empty')
276 self.permissions.group_destroy(account)
279 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
280 """Return a list of containers existing under an account."""
282 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
284 if until or account not in self._allowed_accounts(user):
285 raise NotAllowedError
286 allowed = self._allowed_containers(user, account)
287 start, limit = self._list_limits(allowed, marker, limit)
288 return allowed[start:start + limit]
290 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
291 allowed = list(set(allowed))
292 start, limit = self._list_limits(allowed, marker, limit)
293 return allowed[start:start + limit]
294 node = self.node.node_lookup(account)
295 return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
298 def get_container_meta(self, user, account, container, domain, until=None):
299 """Return a dictionary with the container metadata for the domain."""
301 logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
303 if until or container not in self._allowed_containers(user, account):
304 raise NotAllowedError
305 path, node = self._lookup_container(account, container)
306 props = self._get_properties(node, until)
307 mtime = props[self.MTIME]
308 count, bytes, tstamp = self._get_statistics(node, until)
309 tstamp = max(tstamp, mtime)
313 modified = self._get_statistics(node)[2] # Overall last modification.
314 modified = max(modified, mtime)
317 meta = {'name': container}
319 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
320 if until is not None:
321 meta.update({'until_timestamp': tstamp})
322 meta.update({'name': container, 'count': count, 'bytes': bytes})
323 meta.update({'modified': modified})
327 def update_container_meta(self, user, account, container, domain, meta, replace=False):
328 """Update the metadata associated with the container for the domain."""
330 logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
332 raise NotAllowedError
333 path, node = self._lookup_container(account, container)
334 self._put_metadata(user, node, domain, meta, replace)
337 def get_container_policy(self, user, account, container):
338 """Return a dictionary with the container policy."""
340 logger.debug("get_container_policy: %s %s", account, container)
342 if container not in self._allowed_containers(user, account):
343 raise NotAllowedError
345 path, node = self._lookup_container(account, container)
346 return self._get_policy(node)
349 def update_container_policy(self, user, account, container, policy, replace=False):
350 """Update the policy associated with the container."""
352 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
354 raise NotAllowedError
355 path, node = self._lookup_container(account, container)
356 self._check_policy(policy)
357 self._put_policy(node, policy, replace)
360 def put_container(self, user, account, container, policy={}):
361 """Create a new container with the given name."""
363 logger.debug("put_container: %s %s %s", account, container, policy)
365 raise NotAllowedError
367 path, node = self._lookup_container(account, container)
371 raise NameError('Container already exists')
373 self._check_policy(policy)
374 path = '/'.join((account, container))
375 node = self._put_path(user, self._lookup_account(account, True)[1], path)
376 self._put_policy(node, policy, True)
379 def delete_container(self, user, account, container, until=None):
380 """Delete/purge the container with the given name."""
382 logger.debug("delete_container: %s %s %s", account, container, until)
384 raise NotAllowedError
385 path, node = self._lookup_container(account, container)
387 if until is not None:
388 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
390 self.store.map_delete(h)
391 self.node.node_purge_children(node, until, CLUSTER_DELETED)
392 self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
395 if self._get_statistics(node)[0] > 0:
396 raise IndexError('Container is not empty')
397 hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
399 self.store.map_delete(h)
400 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
401 self.node.node_remove(node)
402 self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
405 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
406 """Return a list of objects existing under a container."""
408 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
412 raise NotAllowedError
413 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
415 raise NotAllowedError
418 allowed = self.permissions.access_list_shared('/'.join((account, container)))
421 path, node = self._lookup_container(account, container)
422 return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
425 def list_object_meta(self, user, account, container, domain, until=None):
426 """Return a list with all the container's object meta keys for the domain."""
428 logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
432 raise NotAllowedError
433 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
435 raise NotAllowedError
436 path, node = self._lookup_container(account, container)
437 before = until if until is not None else inf
438 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
441 def get_object_meta(self, user, account, container, name, domain, version=None):
442 """Return a dictionary with the object metadata for the domain."""
444 logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
445 self._can_read(user, account, container, name)
446 path, node = self._lookup_object(account, container, name)
447 props = self._get_version(node, version)
449 modified = props[self.MTIME]
452 modified = self._get_version(node)[self.MTIME] # Overall last modification.
453 except NameError: # Object may be deleted.
454 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
455 if del_props is None:
456 raise NameError('Object does not exist')
457 modified = del_props[self.MTIME]
459 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
460 meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
461 meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
462 meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
466 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
467 """Update the metadata associated with the object for the domain and return the new version."""
469 logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
470 self._can_write(user, account, container, name)
471 path, node = self._lookup_object(account, container, name)
472 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
473 self._apply_versioning(account, container, src_version_id)
474 return dest_version_id
477 def get_object_permissions(self, user, account, container, name):
478 """Return the action allowed on the object, the path
479 from which the object gets its permissions from,
480 along with a dictionary containing the permissions."""
482 logger.debug("get_object_permissions: %s %s %s", account, container, name)
485 path = '/'.join((account, container, name))
486 if self.permissions.access_check(path, self.WRITE, user):
488 elif self.permissions.access_check(path, self.READ, user):
491 raise NotAllowedError
492 path = self._lookup_object(account, container, name)[0]
493 return (allowed,) + self.permissions.access_inherit(path)
496 def update_object_permissions(self, user, account, container, name, permissions):
497 """Update the permissions associated with the object."""
499 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
501 raise NotAllowedError
502 path = self._lookup_object(account, container, name)[0]
503 self._check_permissions(path, permissions)
504 self.permissions.access_set(path, permissions)
507 def get_object_public(self, user, account, container, name):
508 """Return the public id of the object if applicable."""
510 logger.debug("get_object_public: %s %s %s", account, container, name)
511 self._can_read(user, account, container, name)
512 path = self._lookup_object(account, container, name)[0]
513 p = self.permissions.public_get(path)
519 def update_object_public(self, user, account, container, name, public):
520 """Update the public status of the object."""
522 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
523 self._can_write(user, account, container, name)
524 path = self._lookup_object(account, container, name)[0]
526 self.permissions.public_unset(path)
528 self.permissions.public_set(path)
531 def get_object_hashmap(self, user, account, container, name, version=None):
532 """Return the object's size and a list with partial hashes."""
534 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
535 self._can_read(user, account, container, name)
536 path, node = self._lookup_object(account, container, name)
537 props = self._get_version(node, version)
538 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
539 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
541 def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
542 if permissions is not None and user != account:
543 raise NotAllowedError
544 self._can_write(user, account, container, name)
545 if permissions is not None:
546 path = '/'.join((account, container, name))
547 self._check_permissions(path, permissions)
549 account_path, account_node = self._lookup_account(account, True)
550 container_path, container_node = self._lookup_container(account, container)
551 path, node = self._put_object_node(container_path, container_node, name)
552 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
555 versioning = self._get_policy(container_node)['versioning']
556 if versioning != 'auto':
557 size_delta = size - 0 # TODO: Get previous size.
561 account_quota = long(self._get_policy(account_node)['quota'])
562 container_quota = long(self._get_policy(container_node)['quota'])
563 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
564 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
565 # This must be executed in a transaction, so the version is never created if it fails.
568 if permissions is not None:
569 self.permissions.access_set(path, permissions)
570 self._apply_versioning(account, container, pre_version_id)
571 return pre_version_id, dest_version_id
574 def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
575 """Create/update an object with the specified size and partial hashes."""
577 logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
578 if size == 0: # No such thing as an empty hashmap.
579 hashmap = [self.put_block('')]
580 map = HashMap(self.block_size, self.hash_algorithm)
581 map.extend([binascii.unhexlify(x) for x in hashmap])
582 missing = self.store.block_search(map)
585 ie.data = [binascii.hexlify(x) for x in missing]
589 pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
590 self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
591 self.store.map_put(hash, map)
592 self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
593 return dest_version_id
595 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
596 self._can_read(user, src_account, src_container, src_name)
597 path, node = self._lookup_object(src_account, src_container, src_name)
598 # TODO: Will do another fetch of the properties in duplicate version...
599 props = self._get_version(node, src_version) # Check to see if source exists.
600 src_version_id = props[self.SERIAL]
601 hash = props[self.HASH]
602 size = props[self.SIZE]
604 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
605 pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
606 self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
607 return dest_version_id
610 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
611 """Copy an object's data and metadata."""
613 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
614 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
615 self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
616 return dest_version_id
619 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
620 """Move an object's data and metadata."""
622 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
623 if user != src_account:
624 raise NotAllowedError
625 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
626 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
627 self._delete_object(user, src_account, src_container, src_name)
628 self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
629 return dest_version_id
631 def _delete_object(self, user, account, container, name, until=None):
633 raise NotAllowedError
635 if until is not None:
636 path = '/'.join((account, container, name))
637 node = self.node.node_lookup(path)
640 hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
641 hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
643 self.store.map_delete(h)
644 self.node.node_purge(node, until, CLUSTER_DELETED)
646 props = self._get_version(node)
648 self.permissions.access_clear(path)
649 self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
652 path, node = self._lookup_object(account, container, name)
653 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
654 self._apply_versioning(account, container, src_version_id)
655 self.permissions.access_clear(path)
658 def delete_object(self, user, account, container, name, until=None):
659 """Delete/purge an object."""
661 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
662 self._delete_object(user, account, container, name, until)
665 def list_versions(self, user, account, container, name):
666 """Return a list of all (version, version_timestamp) tuples for an object."""
668 logger.debug("list_versions: %s %s %s", account, container, name)
669 self._can_read(user, account, container, name)
670 path, node = self._lookup_object(account, container, name)
671 versions = self.node.node_get_versions(node)
672 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
675 def get_uuid(self, user, uuid):
676 """Return the (account, container, name) for the UUID given."""
678 logger.debug("get_uuid: %s", uuid)
679 info = self.node.latest_uuid(uuid)
683 account, container, name = path.split('/', 2)
684 self._can_read(user, account, container, name)
685 return (account, container, name)
688 def get_public(self, user, public):
689 """Return the (account, container, name) for the public id given."""
691 logger.debug("get_public: %s", public)
692 if public is None or public < ULTIMATE_ANSWER:
694 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
697 account, container, name = path.split('/', 2)
698 self._can_read(user, account, container, name)
699 return (account, container, name)
701 @backend_method(autocommit=0)
702 def get_block(self, hash):
703 """Return a block's data."""
705 logger.debug("get_block: %s", hash)
706 block = self.store.block_get(binascii.unhexlify(hash))
708 raise NameError('Block does not exist')
711 @backend_method(autocommit=0)
712 def put_block(self, data):
713 """Store a block and return the hash."""
715 logger.debug("put_block: %s", len(data))
716 return binascii.hexlify(self.store.block_put(data))
718 @backend_method(autocommit=0)
719 def update_block(self, hash, data, offset=0):
720 """Update a known block and return the hash."""
722 logger.debug("update_block: %s %s %s", hash, len(data), offset)
723 if offset == 0 and len(data) == self.block_size:
724 return self.put_block(data)
725 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
726 return binascii.hexlify(h)
730 def _generate_uuid(self):
731 return str(uuidlib.uuid4())
733 def _put_object_node(self, path, parent, name):
734 path = '/'.join((path, name))
735 node = self.node.node_lookup(path)
737 node = self.node.node_create(parent, path)
740 def _put_path(self, user, parent, path):
741 node = self.node.node_create(parent, path)
742 self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
745 def _lookup_account(self, account, create=True):
746 node = self.node.node_lookup(account)
747 if node is None and create:
748 node = self._put_path(account, self.ROOTNODE, account) # User is account.
751 def _lookup_container(self, account, container):
752 path = '/'.join((account, container))
753 node = self.node.node_lookup(path)
755 raise NameError('Container does not exist')
758 def _lookup_object(self, account, container, name):
759 path = '/'.join((account, container, name))
760 node = self.node.node_lookup(path)
762 raise NameError('Object does not exist')
765 def _get_properties(self, node, until=None):
766 """Return properties until the timestamp given."""
768 before = until if until is not None else inf
769 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
770 if props is None and until is not None:
771 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
773 raise NameError('Path does not exist')
776 def _get_statistics(self, node, until=None):
777 """Return count, sum of size and latest timestamp of everything under node."""
780 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
782 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
787 def _get_version(self, node, version=None):
789 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
791 raise NameError('Object does not exist')
794 version = int(version)
796 raise IndexError('Version does not exist')
797 props = self.node.version_get_properties(version)
798 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
799 raise IndexError('Version does not exist')
802 def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
803 """Create a new version of the node."""
805 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
806 if props is not None:
807 src_version_id = props[self.SERIAL]
808 src_hash = props[self.HASH]
809 src_size = props[self.SIZE]
811 src_version_id = None
815 hash = src_hash # This way hash can be set to None.
817 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
820 pre_version_id = src_version_id
822 pre_version_id = None
823 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
824 if props is not None:
825 pre_version_id = props[self.SERIAL]
826 if pre_version_id is not None:
827 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
829 dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
830 return pre_version_id, dest_version_id
832 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
833 if src_version_id is not None:
834 self.node.attribute_copy(src_version_id, dest_version_id)
836 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
837 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
839 self.node.attribute_del(dest_version_id, domain)
840 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
842 def _put_metadata(self, user, node, domain, meta, replace=False):
843 """Create a new version and store metadata."""
845 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
846 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
847 return src_version_id, dest_version_id
849 def _list_limits(self, listing, marker, limit):
853 start = listing.index(marker) + 1
856 if not limit or limit > 10000:
860 def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
861 cont_prefix = path + '/'
862 prefix = cont_prefix + prefix
863 start = cont_prefix + marker if marker else None
864 before = until if until is not None else inf
865 filterq = keys if domain else []
868 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
869 objects.extend([(p, None) for p in prefixes] if virtual else [])
870 objects.sort(key=lambda x: x[0])
871 objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
873 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
874 return objects[start:start + limit]
878 def _check_policy(self, policy):
879 for k in policy.keys():
881 policy[k] = self.default_policy.get(k)
882 for k, v in policy.iteritems():
884 q = int(v) # May raise ValueError.
887 elif k == 'versioning':
888 if v not in ['auto', 'none']:
893 def _put_policy(self, node, policy, replace):
895 for k, v in self.default_policy.iteritems():
898 self.node.policy_set(node, policy)
900 def _get_policy(self, node):
901 policy = self.default_policy.copy()
902 policy.update(self.node.policy_get(node))
905 def _apply_versioning(self, account, container, version_id):
906 if version_id is None:
908 path, node = self._lookup_container(account, container)
909 versioning = self._get_policy(node)['versioning']
910 if versioning != 'auto':
911 hash = self.node.version_remove(version_id)
912 self.store.map_delete(hash)
913 self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
915 # Access control functions.
917 def _check_groups(self, groups):
918 # raise ValueError('Bad characters in groups')
921 def _check_permissions(self, path, permissions):
922 # raise ValueError('Bad characters in permissions')
924 # Check for existing permissions.
925 paths = self.permissions.access_list(path)
927 ae = AttributeError()
931 def _can_read(self, user, account, container, name):
934 path = '/'.join((account, container, name))
935 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
936 raise NotAllowedError
938 def _can_write(self, user, account, container, name):
941 path = '/'.join((account, container, name))
942 if not self.permissions.access_check(path, self.WRITE, user):
943 raise NotAllowedError
945 def _allowed_accounts(self, user):
947 for path in self.permissions.access_list_paths(user):
948 allow.add(path.split('/', 1)[0])
951 def _allowed_containers(self, user, account):
953 for path in self.permissions.access_list_paths(user, account):
954 allow.add(path.split('/', 2)[1])