1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
42 from commissioning.clients.quotaholder import QuotaholderHTTP
44 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
45 AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
47 # Stripped-down version of the HashMap class found in tools.
52 def __init__(self, blocksize, blockhash):
53 super(HashMap, self).__init__()
54 self.blocksize = blocksize
55 self.blockhash = blockhash
57 def _hash_raw(self, v):
58 h = hashlib.new(self.blockhash)
64 return self._hash_raw('')
66 return self.__getitem__(0)
72 h += [('\x00' * len(h[0]))] * (s - len(h))
74 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
77 # Default modules and settings.
78 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
79 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
80 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
81 DEFAULT_BLOCK_PATH = 'data/'
82 DEFAULT_BLOCK_UMASK = 0o022
83 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
84 #DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
85 #DEFAULT_QUEUE_EXCHANGE = 'pithos'
87 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
88 QUEUE_CLIENT_ID = 'pithos'
89 QUEUE_INSTANCE_ID = '1'
91 (CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
98 logger = logging.getLogger(__name__)
101 def backend_method(func=None, autocommit=1):
104 return backend_method(func, autocommit)
110 def fn(self, *args, **kw):
111 self.wrapper.execute()
113 self.serials = serials
116 ret = func(self, *args, **kw)
117 for m in self.messages:
120 self.quotaholder.accept_commission(
122 clientkey = 'pithos',
124 self.wrapper.commit()
127 self.quotaholder.reject_commission(
129 clientkey = 'pithos',
131 self.wrapper.rollback()
136 class ModularBackend(BaseBackend):
137 """A modular backend.
139 Uses modules for SQL functions and storage.
142 def __init__(self, db_module=None, db_connection=None,
143 block_module=None, block_path=None, block_umask=None,
144 queue_module=None, queue_hosts=None,
145 queue_exchange=None, quotaholder_url=None):
146 db_module = db_module or DEFAULT_DB_MODULE
147 db_connection = db_connection or DEFAULT_DB_CONNECTION
148 block_module = block_module or DEFAULT_BLOCK_MODULE
149 block_path = block_path or DEFAULT_BLOCK_PATH
150 block_umask = block_umask or DEFAULT_BLOCK_UMASK
151 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
152 #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
153 #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
155 self.hash_algorithm = 'sha256'
156 self.block_size = 4 * 1024 * 1024 # 4MB
158 self.default_policy = {'quota': DEFAULT_QUOTA,
159 'versioning': DEFAULT_VERSIONING}
163 return sys.modules[m]
165 self.db_module = load_module(db_module)
166 self.wrapper = self.db_module.DBWrapper(db_connection)
167 params = {'wrapper': self.wrapper}
168 self.permissions = self.db_module.Permissions(**params)
169 self.config = self.db_module.Config(**params)
170 self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
171 for x in ['READ', 'WRITE']:
172 setattr(self, x, getattr(self.db_module, x))
173 self.node = self.db_module.Node(**params)
174 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
175 setattr(self, x, getattr(self.db_module, x))
177 self.block_module = load_module(block_module)
178 params = {'path': block_path,
179 'block_size': self.block_size,
180 'hash_algorithm': self.hash_algorithm,
181 'umask': block_umask}
182 self.store = self.block_module.Store(**params)
184 if queue_module and queue_hosts:
185 self.queue_module = load_module(queue_module)
186 params = {'hosts': queue_hosts,
187 'exchange': queue_exchange,
188 'client_id': QUEUE_CLIENT_ID}
189 self.queue = self.queue_module.Queue(**params)
192 def send(self, *args):
198 self.queue = NoQueue()
200 self.quotaholder_url = quotaholder_url
201 self.quotaholder = QuotaholderHTTP(quotaholder_url)
209 def list_accounts(self, user, marker=None, limit=10000):
210 """Return a list of accounts the user can access."""
212 logger.debug("list_accounts: %s %s %s", user, marker, limit)
213 allowed = self._allowed_accounts(user)
214 start, limit = self._list_limits(allowed, marker, limit)
215 return allowed[start:start + limit]
218 def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
219 """Return a dictionary with the account metadata for the domain."""
222 "get_account_meta: %s %s %s %s", user, account, domain, until)
223 path, node = self._lookup_account(account, user == account)
225 if until or node is None or account not in self._allowed_accounts(user):
226 raise NotAllowedError
228 props = self._get_properties(node, until)
229 mtime = props[self.MTIME]
233 count, bytes, tstamp = self._get_statistics(node, until)
234 tstamp = max(tstamp, mtime)
238 modified = self._get_statistics(
239 node)[2] # Overall last modification.
240 modified = max(modified, mtime)
243 meta = {'name': account}
246 if props is not None and include_user_defined:
248 dict(self.node.attribute_get(props[self.SERIAL], domain)))
249 if until is not None:
250 meta.update({'until_timestamp': tstamp})
251 meta.update({'name': account, 'count': count, 'bytes': bytes})
252 meta.update({'modified': modified})
256 def update_account_meta(self, user, account, domain, meta, replace=False):
257 """Update the metadata associated with the account for the domain."""
259 logger.debug("update_account_meta: %s %s %s %s %s", user,
260 account, domain, meta, replace)
262 raise NotAllowedError
263 path, node = self._lookup_account(account, True)
264 self._put_metadata(user, node, domain, meta, replace)
267 def get_account_groups(self, user, account):
268 """Return a dictionary with the user groups defined for this account."""
270 logger.debug("get_account_groups: %s %s", user, account)
272 if account not in self._allowed_accounts(user):
273 raise NotAllowedError
275 self._lookup_account(account, True)
276 return self.permissions.group_dict(account)
279 def update_account_groups(self, user, account, groups, replace=False):
280 """Update the groups associated with the account."""
282 logger.debug("update_account_groups: %s %s %s %s", user,
283 account, groups, replace)
285 raise NotAllowedError
286 self._lookup_account(account, True)
287 self._check_groups(groups)
289 self.permissions.group_destroy(account)
290 for k, v in groups.iteritems():
291 if not replace: # If not already deleted.
292 self.permissions.group_delete(account, k)
294 self.permissions.group_addmany(account, k, v)
297 def get_account_policy(self, user, account):
298 """Return a dictionary with the account policy."""
300 logger.debug("get_account_policy: %s %s", user, account)
302 if account not in self._allowed_accounts(user):
303 raise NotAllowedError
305 path, node = self._lookup_account(account, True)
306 return self._get_policy(node)
309 def update_account_policy(self, user, account, policy, replace=False):
310 """Update the policy associated with the account."""
312 logger.debug("update_account_policy: %s %s %s %s", user,
313 account, policy, replace)
315 raise NotAllowedError
316 path, node = self._lookup_account(account, True)
317 self._check_policy(policy)
318 self._put_policy(node, policy, replace)
321 def put_account(self, user, account, policy={}):
322 """Create a new account with the given name."""
324 logger.debug("put_account: %s %s %s", user, account, policy)
326 raise NotAllowedError
327 node = self.node.node_lookup(account)
329 raise AccountExists('Account already exists')
331 self._check_policy(policy)
332 node = self._put_path(user, self.ROOTNODE, account)
333 self._put_policy(node, policy, True)
336 def delete_account(self, user, account):
337 """Delete the account with the given name."""
339 logger.debug("delete_account: %s %s", user, account)
341 raise NotAllowedError
342 node = self.node.node_lookup(account)
345 if not self.node.node_remove(node):
346 raise AccountNotEmpty('Account is not empty')
347 self.permissions.group_destroy(account)
350 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
351 """Return a list of containers existing under an account."""
353 logger.debug("list_containers: %s %s %s %s %s %s %s", user,
354 account, marker, limit, shared, until, public)
356 if until or account not in self._allowed_accounts(user):
357 raise NotAllowedError
358 allowed = self._allowed_containers(user, account)
359 start, limit = self._list_limits(allowed, marker, limit)
360 return allowed[start:start + limit]
364 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
366 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
367 allowed = sorted(allowed)
368 start, limit = self._list_limits(allowed, marker, limit)
369 return allowed[start:start + limit]
370 node = self.node.node_lookup(account)
371 containers = [x[0] for x in self._list_object_properties(
372 node, account, '', '/', marker, limit, False, None, [], until)]
373 start, limit = self._list_limits(
374 [x[0] for x in containers], marker, limit)
375 return containers[start:start + limit]
378 def list_container_meta(self, user, account, container, domain, until=None):
379 """Return a list with all the container's object meta keys for the domain."""
381 logger.debug("list_container_meta: %s %s %s %s %s", user,
382 account, container, domain, until)
386 raise NotAllowedError
387 allowed = self.permissions.access_list_paths(
388 user, '/'.join((account, container)))
390 raise NotAllowedError
391 path, node = self._lookup_container(account, container)
392 before = until if until is not None else inf
393 allowed = self._get_formatted_paths(allowed)
394 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
397 def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
398 """Return a dictionary with the container metadata for the domain."""
400 logger.debug("get_container_meta: %s %s %s %s %s", user,
401 account, container, domain, until)
403 if until or container not in self._allowed_containers(user, account):
404 raise NotAllowedError
405 path, node = self._lookup_container(account, container)
406 props = self._get_properties(node, until)
407 mtime = props[self.MTIME]
408 count, bytes, tstamp = self._get_statistics(node, until)
409 tstamp = max(tstamp, mtime)
413 modified = self._get_statistics(
414 node)[2] # Overall last modification.
415 modified = max(modified, mtime)
418 meta = {'name': container}
421 if include_user_defined:
423 dict(self.node.attribute_get(props[self.SERIAL], domain)))
424 if until is not None:
425 meta.update({'until_timestamp': tstamp})
426 meta.update({'name': container, 'count': count, 'bytes': bytes})
427 meta.update({'modified': modified})
431 def update_container_meta(self, user, account, container, domain, meta, replace=False):
432 """Update the metadata associated with the container for the domain."""
434 logger.debug("update_container_meta: %s %s %s %s %s %s",
435 user, account, container, domain, meta, replace)
437 raise NotAllowedError
438 path, node = self._lookup_container(account, container)
439 src_version_id, dest_version_id = self._put_metadata(
440 user, node, domain, meta, replace)
441 if src_version_id is not None:
442 versioning = self._get_policy(node)['versioning']
443 if versioning != 'auto':
444 self.node.version_remove(src_version_id)
447 def get_container_policy(self, user, account, container):
448 """Return a dictionary with the container policy."""
451 "get_container_policy: %s %s %s", user, account, container)
453 if container not in self._allowed_containers(user, account):
454 raise NotAllowedError
456 path, node = self._lookup_container(account, container)
457 return self._get_policy(node)
460 def update_container_policy(self, user, account, container, policy, replace=False):
461 """Update the policy associated with the container."""
463 logger.debug("update_container_policy: %s %s %s %s %s",
464 user, account, container, policy, replace)
466 raise NotAllowedError
467 path, node = self._lookup_container(account, container)
468 self._check_policy(policy)
469 self._put_policy(node, policy, replace)
472 def put_container(self, user, account, container, policy={}):
473 """Create a new container with the given name."""
476 "put_container: %s %s %s %s", user, account, container, policy)
478 raise NotAllowedError
480 path, node = self._lookup_container(account, container)
484 raise ContainerExists('Container already exists')
486 self._check_policy(policy)
487 path = '/'.join((account, container))
488 node = self._put_path(
489 user, self._lookup_account(account, True)[1], path)
490 self._put_policy(node, policy, True)
493 def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
494 """Delete/purge the container with the given name."""
496 logger.debug("delete_container: %s %s %s %s %s %s", user,
497 account, container, until, prefix, delimiter)
499 raise NotAllowedError
500 path, node = self._lookup_container(account, container)
502 if until is not None:
503 hashes, size, serials = self.node.node_purge_children(
504 node, until, CLUSTER_HISTORY)
506 self.store.map_delete(h)
507 self.node.node_purge_children(node, until, CLUSTER_DELETED)
508 self._report_size_change(user, account, -size,
509 {'action':'container purge', 'path': path,
510 'versions': serials})
514 if self._get_statistics(node)[0] > 0:
515 raise ContainerNotEmpty('Container is not empty')
516 hashes, size, serials = self.node.node_purge_children(
517 node, inf, CLUSTER_HISTORY)
519 self.store.map_delete(h)
520 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
521 self.node.node_remove(node)
522 self._report_size_change(user, account, -size,
523 {'action': 'container delete',
525 'versions': serials})
527 # remove only contents
528 src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
531 path = '/'.join((account, container, t[0]))
533 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
534 del_size = self._apply_versioning(
535 account, container, src_version_id)
537 self._report_size_change(user, account, -del_size,
538 {'action': 'object delete',
539 'path': path, 'versions': [dest_version_id]})
540 self._report_object_change(
541 user, account, path, details={'action': 'object delete'})
543 self.permissions.access_clear_bulk(paths)
545 def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
546 if user != account and until:
547 raise NotAllowedError
548 if shared and public:
550 shared = self._list_object_permissions(
551 user, account, container, prefix, shared=True, public=False)
554 path, node = self._lookup_container(account, container)
555 shared = self._get_formatted_paths(shared)
556 objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
559 objects |= set(self._list_public_object_properties(
560 user, account, container, prefix, all_props))
561 objects = list(objects)
563 objects.sort(key=lambda x: x[0])
564 start, limit = self._list_limits(
565 [x[0] for x in objects], marker, limit)
566 return objects[start:start + limit]
568 objects = self._list_public_object_properties(
569 user, account, container, prefix, all_props)
570 start, limit = self._list_limits(
571 [x[0] for x in objects], marker, limit)
572 return objects[start:start + limit]
574 allowed = self._list_object_permissions(
575 user, account, container, prefix, shared, public)
576 if shared and not allowed:
578 path, node = self._lookup_container(account, container)
579 allowed = self._get_formatted_paths(allowed)
580 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
581 start, limit = self._list_limits(
582 [x[0] for x in objects], marker, limit)
583 return objects[start:start + limit]
585 def _list_public_object_properties(self, user, account, container, prefix, all_props):
586 public = self._list_object_permissions(
587 user, account, container, prefix, shared=False, public=True)
588 paths, nodes = self._lookup_objects(public)
589 path = '/'.join((account, container))
590 cont_prefix = path + '/'
591 paths = [x[len(cont_prefix):] for x in paths]
592 props = self.node.version_lookup_bulk(nodes, all_props=all_props)
593 objects = [(path,) + props for path, props in zip(paths, props)]
596 def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
599 marker = objects[-1] if objects else None
601 l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
603 if not l or len(l) < limit:
607 def _list_object_permissions(self, user, account, container, prefix, shared, public):
609 path = '/'.join((account, container, prefix)).rstrip('/')
611 allowed = self.permissions.access_list_paths(user, path)
613 raise NotAllowedError
617 allowed.update(self.permissions.access_list_shared(path))
620 [x[0] for x in self.permissions.public_list(path)])
621 allowed = sorted(allowed)
627 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
628 """Return a list of object (name, version_id) tuples existing under a container."""
630 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
631 return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
634 def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
635 """Return a list of object metadata dicts existing under a container."""
637 logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
638 props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
642 objects.append({'subdir': p[0]})
644 objects.append({'name': p[0],
645 'bytes': p[self.SIZE + 1],
646 'type': p[self.TYPE + 1],
647 'hash': p[self.HASH + 1],
648 'version': p[self.SERIAL + 1],
649 'version_timestamp': p[self.MTIME + 1],
650 'modified': p[self.MTIME + 1] if until is None else None,
651 'modified_by': p[self.MUSER + 1],
652 'uuid': p[self.UUID + 1],
653 'checksum': p[self.CHECKSUM + 1]})
657 def list_object_permissions(self, user, account, container, prefix=''):
658 """Return a list of paths that enforce permissions under a container."""
660 logger.debug("list_object_permissions: %s %s %s %s", user,
661 account, container, prefix)
662 return self._list_object_permissions(user, account, container, prefix, True, False)
665 def list_object_public(self, user, account, container, prefix=''):
666 """Return a dict mapping paths to public ids for objects that are public under a container."""
668 logger.debug("list_object_public: %s %s %s %s", user,
669 account, container, prefix)
671 for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
672 public[path] = p + ULTIMATE_ANSWER
676 def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
677 """Return a dictionary with the object metadata for the domain."""
679 logger.debug("get_object_meta: %s %s %s %s %s %s", user,
680 account, container, name, domain, version)
681 self._can_read(user, account, container, name)
682 path, node = self._lookup_object(account, container, name)
683 props = self._get_version(node, version)
685 modified = props[self.MTIME]
688 modified = self._get_version(
689 node)[self.MTIME] # Overall last modification.
690 except NameError: # Object may be deleted.
691 del_props = self.node.version_lookup(
692 node, inf, CLUSTER_DELETED)
693 if del_props is None:
694 raise ItemNotExists('Object does not exist')
695 modified = del_props[self.MTIME]
698 if include_user_defined:
700 dict(self.node.attribute_get(props[self.SERIAL], domain)))
701 meta.update({'name': name,
702 'bytes': props[self.SIZE],
703 'type': props[self.TYPE],
704 'hash': props[self.HASH],
705 'version': props[self.SERIAL],
706 'version_timestamp': props[self.MTIME],
707 'modified': modified,
708 'modified_by': props[self.MUSER],
709 'uuid': props[self.UUID],
710 'checksum': props[self.CHECKSUM]})
714 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
715 """Update the metadata associated with the object for the domain and return the new version."""
717 logger.debug("update_object_meta: %s %s %s %s %s %s %s",
718 user, account, container, name, domain, meta, replace)
719 self._can_write(user, account, container, name)
720 path, node = self._lookup_object(account, container, name)
721 src_version_id, dest_version_id = self._put_metadata(
722 user, node, domain, meta, replace)
723 self._apply_versioning(account, container, src_version_id)
724 return dest_version_id
727 def get_object_permissions(self, user, account, container, name):
728 """Return the action allowed on the object, the path
729 from which the object gets its permissions from,
730 along with a dictionary containing the permissions."""
732 logger.debug("get_object_permissions: %s %s %s %s", user,
733 account, container, name)
735 permissions_path = self._get_permissions_path(account, container, name)
737 if self.permissions.access_check(permissions_path, self.WRITE, user):
739 elif self.permissions.access_check(permissions_path, self.READ, user):
742 raise NotAllowedError
743 self._lookup_object(account, container, name)
744 return (allowed, permissions_path, self.permissions.access_get(permissions_path))
747 def update_object_permissions(self, user, account, container, name, permissions):
748 """Update the permissions associated with the object."""
750 logger.debug("update_object_permissions: %s %s %s %s %s",
751 user, account, container, name, permissions)
753 raise NotAllowedError
754 path = self._lookup_object(account, container, name)[0]
755 self._check_permissions(path, permissions)
756 self.permissions.access_set(path, permissions)
757 self._report_sharing_change(user, account, path, {'members':
758 self.permissions.access_members(path)})
761 def get_object_public(self, user, account, container, name):
762 """Return the public id of the object if applicable."""
765 "get_object_public: %s %s %s %s", user, account, container, name)
766 self._can_read(user, account, container, name)
767 path = self._lookup_object(account, container, name)[0]
768 p = self.permissions.public_get(path)
774 def update_object_public(self, user, account, container, name, public):
775 """Update the public status of the object."""
777 logger.debug("update_object_public: %s %s %s %s %s", user,
778 account, container, name, public)
779 self._can_write(user, account, container, name)
780 path = self._lookup_object(account, container, name)[0]
782 self.permissions.public_unset(path)
784 self.permissions.public_set(path)
787 def get_object_hashmap(self, user, account, container, name, version=None):
788 """Return the object's size and a list with partial hashes."""
790 logger.debug("get_object_hashmap: %s %s %s %s %s", user,
791 account, container, name, version)
792 self._can_read(user, account, container, name)
793 path, node = self._lookup_object(account, container, name)
794 props = self._get_version(node, version)
795 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
796 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
798 def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
799 if permissions is not None and user != account:
800 raise NotAllowedError
801 self._can_write(user, account, container, name)
802 if permissions is not None:
803 path = '/'.join((account, container, name))
804 self._check_permissions(path, permissions)
806 account_path, account_node = self._lookup_account(account, True)
807 container_path, container_node = self._lookup_container(
809 path, node = self._put_object_node(
810 container_path, container_node, name)
811 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
814 if src_version_id is None:
815 src_version_id = pre_version_id
816 self._put_metadata_duplicate(
817 src_version_id, dest_version_id, domain, meta, replace_meta)
820 del_size = self._apply_versioning(account, container, pre_version_id)
821 size_delta = size - del_size
823 account_quota = long(self._get_policy(account_node)['quota'])
824 container_quota = long(self._get_policy(container_node)['quota'])
825 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
826 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
827 # This must be executed in a transaction, so the version is never created if it fails.
829 self._report_size_change(user, account, size_delta,
830 {'action': 'object update', 'path': path,
831 'versions': [dest_version_id]})
833 if permissions is not None:
834 self.permissions.access_set(path, permissions)
835 self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
837 self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
838 return dest_version_id
841 def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
842 """Create/update an object with the specified size and partial hashes."""
844 logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
845 account, container, name, size, type, hashmap, checksum)
846 if size == 0: # No such thing as an empty hashmap.
847 hashmap = [self.put_block('')]
848 map = HashMap(self.block_size, self.hash_algorithm)
849 map.extend([binascii.unhexlify(x) for x in hashmap])
850 missing = self.store.block_search(map)
853 ie.data = [binascii.hexlify(x) for x in missing]
857 dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
858 self.store.map_put(hash, map)
859 return dest_version_id
862 def update_object_checksum(self, user, account, container, name, version, checksum):
863 """Update an object's checksum."""
865 logger.debug("update_object_checksum: %s %s %s %s %s %s",
866 user, account, container, name, version, checksum)
867 # Update objects with greater version and same hashmap and size (fix metadata updates).
868 self._can_write(user, account, container, name)
869 path, node = self._lookup_object(account, container, name)
870 props = self._get_version(node, version)
871 versions = self.node.node_get_versions(node)
873 if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
874 self.node.version_put_property(
875 x[self.SERIAL], 'checksum', checksum)
877 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
878 dest_version_ids = []
879 self._can_read(user, src_account, src_container, src_name)
880 path, node = self._lookup_object(src_account, src_container, src_name)
881 # TODO: Will do another fetch of the properties in duplicate version...
882 props = self._get_version(
883 node, src_version) # Check to see if source exists.
884 src_version_id = props[self.SERIAL]
885 hash = props[self.HASH]
886 size = props[self.SIZE]
887 is_copy = not is_move and (src_account, src_container, src_name) != (
888 dest_account, dest_container, dest_name) # New uuid.
889 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
890 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
891 self._delete_object(user, src_account, src_container, src_name)
894 prefix = src_name + \
895 delimiter if not src_name.endswith(delimiter) else src_name
896 src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
897 src_names.sort(key=lambda x: x[2]) # order by nodes
898 paths = [elem[0] for elem in src_names]
899 nodes = [elem[2] for elem in src_names]
900 # TODO: Will do another fetch of the properties in duplicate version...
901 props = self._get_versions(nodes) # Check to see if source exists.
903 for prop, path, node in zip(props, paths, nodes):
904 src_version_id = prop[self.SERIAL]
905 hash = prop[self.HASH]
906 vtype = prop[self.TYPE]
907 size = prop[self.SIZE]
908 dest_prefix = dest_name + delimiter if not dest_name.endswith(
909 delimiter) else dest_name
910 vdest_name = path.replace(prefix, dest_prefix, 1)
911 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
912 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
913 self._delete_object(user, src_account, src_container, path)
914 return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
917 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
918 """Copy an object's data and metadata."""
920 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
921 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
922 return dest_version_id
925 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
926 """Move an object's data and metadata."""
928 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
929 if user != src_account:
930 raise NotAllowedError
931 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
932 return dest_version_id
934 def _delete_object(self, user, account, container, name, until=None, delimiter=None):
936 raise NotAllowedError
938 if until is not None:
939 path = '/'.join((account, container, name))
940 node = self.node.node_lookup(path)
946 h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
950 h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
955 self.store.map_delete(h)
956 self.node.node_purge(node, until, CLUSTER_DELETED)
958 props = self._get_version(node)
960 self.permissions.access_clear(path)
961 self._report_size_change(user, account, -size,
962 {'action': 'object purge', 'path': path,
963 'versions': serials})
966 path, node = self._lookup_object(account, container, name)
967 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
968 del_size = self._apply_versioning(account, container, src_version_id)
970 self._report_size_change(user, account, -del_size,
971 {'action': 'object delete', 'path': path,
972 'versions': [dest_version_id]})
973 self._report_object_change(
974 user, account, path, details={'action': 'object delete'})
975 self.permissions.access_clear(path)
978 prefix = name + delimiter if not name.endswith(delimiter) else name
979 src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
982 path = '/'.join((account, container, t[0]))
984 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
985 del_size = self._apply_versioning(
986 account, container, src_version_id)
988 self._report_size_change(user, account, -del_size,
989 {'action': 'object delete',
991 'versions': [dest_version_id]})
992 self._report_object_change(
993 user, account, path, details={'action': 'object delete'})
995 self.permissions.access_clear_bulk(paths)
998 def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
999 """Delete/purge an object."""
1001 logger.debug("delete_object: %s %s %s %s %s %s %s", user,
1002 account, container, name, until, prefix, delimiter)
1003 self._delete_object(user, account, container, name, until, delimiter)
1006 def list_versions(self, user, account, container, name):
1007 """Return a list of all (version, version_timestamp) tuples for an object."""
1010 "list_versions: %s %s %s %s", user, account, container, name)
1011 self._can_read(user, account, container, name)
1012 path, node = self._lookup_object(account, container, name)
1013 versions = self.node.node_get_versions(node)
1014 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
1017 def get_uuid(self, user, uuid):
1018 """Return the (account, container, name) for the UUID given."""
1020 logger.debug("get_uuid: %s %s", user, uuid)
1021 info = self.node.latest_uuid(uuid)
1025 account, container, name = path.split('/', 2)
1026 self._can_read(user, account, container, name)
1027 return (account, container, name)
1030 def get_public(self, user, public):
1031 """Return the (account, container, name) for the public id given."""
1033 logger.debug("get_public: %s %s", user, public)
1034 if public is None or public < ULTIMATE_ANSWER:
1036 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1039 account, container, name = path.split('/', 2)
1040 self._can_read(user, account, container, name)
1041 return (account, container, name)
1043 @backend_method(autocommit=0)
1044 def get_block(self, hash):
1045 """Return a block's data."""
1047 logger.debug("get_block: %s", hash)
1048 block = self.store.block_get(binascii.unhexlify(hash))
1050 raise ItemNotExists('Block does not exist')
1053 @backend_method(autocommit=0)
1054 def put_block(self, data):
1055 """Store a block and return the hash."""
1057 logger.debug("put_block: %s", len(data))
1058 return binascii.hexlify(self.store.block_put(data))
1060 @backend_method(autocommit=0)
1061 def update_block(self, hash, data, offset=0):
1062 """Update a known block and return the hash."""
1064 logger.debug("update_block: %s %s %s", hash, len(data), offset)
1065 if offset == 0 and len(data) == self.block_size:
1066 return self.put_block(data)
1067 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1068 return binascii.hexlify(h)
1072 def _generate_uuid(self):
1073 return str(uuidlib.uuid4())
1075 def _put_object_node(self, path, parent, name):
1076 path = '/'.join((path, name))
1077 node = self.node.node_lookup(path)
1079 node = self.node.node_create(parent, path)
1082 def _put_path(self, user, parent, path):
1083 node = self.node.node_create(parent, path)
1084 self.node.version_create(node, None, 0, '', None, user,
1085 self._generate_uuid(), '', CLUSTER_NORMAL)
1088 def _lookup_account(self, account, create=True):
1089 node = self.node.node_lookup(account)
1090 if node is None and create:
1091 node = self._put_path(
1092 account, self.ROOTNODE, account) # User is account.
1093 return account, node
1095 def _lookup_container(self, account, container):
1096 path = '/'.join((account, container))
1097 node = self.node.node_lookup(path)
1099 raise ItemNotExists('Container does not exist')
1102 def _lookup_object(self, account, container, name):
1103 path = '/'.join((account, container, name))
1104 node = self.node.node_lookup(path)
1106 raise ItemNotExists('Object does not exist')
1109 def _lookup_objects(self, paths):
1110 nodes = self.node.node_lookup_bulk(paths)
1113 def _get_properties(self, node, until=None):
1114 """Return properties until the timestamp given."""
1116 before = until if until is not None else inf
1117 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1118 if props is None and until is not None:
1119 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1121 raise ItemNotExists('Path does not exist')
1124 def _get_statistics(self, node, until=None):
1125 """Return count, sum of size and latest timestamp of everything under node."""
1128 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1130 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1135 def _get_version(self, node, version=None):
1137 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1139 raise ItemNotExists('Object does not exist')
1142 version = int(version)
1144 raise VersionNotExists('Version does not exist')
1145 props = self.node.version_get_properties(version)
1146 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1147 raise VersionNotExists('Version does not exist')
1150 def _get_versions(self, nodes):
1151 return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1153 def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1154 """Create a new version of the node."""
1156 props = self.node.version_lookup(
1157 node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1158 if props is not None:
1159 src_version_id = props[self.SERIAL]
1160 src_hash = props[self.HASH]
1161 src_size = props[self.SIZE]
1162 src_type = props[self.TYPE]
1163 src_checksum = props[self.CHECKSUM]
1165 src_version_id = None
1170 if size is None: # Set metadata.
1171 hash = src_hash # This way hash can be set to None (account or container).
1175 if checksum is None:
1176 checksum = src_checksum
1177 uuid = self._generate_uuid(
1178 ) if (is_copy or src_version_id is None) else props[self.UUID]
1180 if src_node is None:
1181 pre_version_id = src_version_id
1183 pre_version_id = None
1184 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1185 if props is not None:
1186 pre_version_id = props[self.SERIAL]
1187 if pre_version_id is not None:
1188 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1190 dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1191 return pre_version_id, dest_version_id
1193 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1194 if src_version_id is not None:
1195 self.node.attribute_copy(src_version_id, dest_version_id)
1197 self.node.attribute_del(dest_version_id, domain, (
1198 k for k, v in meta.iteritems() if v == ''))
1199 self.node.attribute_set(dest_version_id, domain, (
1200 (k, v) for k, v in meta.iteritems() if v != ''))
1202 self.node.attribute_del(dest_version_id, domain)
1203 self.node.attribute_set(dest_version_id, domain, ((
1204 k, v) for k, v in meta.iteritems()))
1206 def _put_metadata(self, user, node, domain, meta, replace=False):
1207 """Create a new version and store metadata."""
1209 src_version_id, dest_version_id = self._put_version_duplicate(
1211 self._put_metadata_duplicate(
1212 src_version_id, dest_version_id, domain, meta, replace)
1213 return src_version_id, dest_version_id
1215 def _list_limits(self, listing, marker, limit):
1219 start = listing.index(marker) + 1
1222 if not limit or limit > 10000:
1226 def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1227 cont_prefix = path + '/'
1228 prefix = cont_prefix + prefix
1229 start = cont_prefix + marker if marker else None
1230 before = until if until is not None else inf
1231 filterq = keys if domain else []
1234 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1235 objects.extend([(p, None) for p in prefixes] if virtual else [])
1236 objects.sort(key=lambda x: x[0])
1237 objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1240 # Reporting functions.
1242 def _report_size_change(self, user, account, size, details={}):
1243 account_node = self._lookup_account(account, True)[1]
1244 total = self._get_statistics(account_node)[1]
1245 details.update({'user': user, 'total': total})
1247 "_report_size_change: %s %s %s %s", user, account, size, details)
1248 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1249 account, QUEUE_INSTANCE_ID, 'diskspace',
1250 float(size), details))
1252 serial = self.quotaholder.issue_commission(
1256 clientkey = 'pithos',
1258 provisions = (('pithos+', 'pithos+ : diskspace', size),)
1260 self.serials.append(serial)
1262 def _report_object_change(self, user, account, path, details={}):
1263 details.update({'user': user})
1264 logger.debug("_report_object_change: %s %s %s %s", user,
1265 account, path, details)
1266 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1267 account, QUEUE_INSTANCE_ID, 'object', path, details))
1269 def _report_sharing_change(self, user, account, path, details={}):
1270 logger.debug("_report_permissions_change: %s %s %s %s",
1271 user, account, path, details)
1272 details.update({'user': user})
1273 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1274 account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1278 def _check_policy(self, policy):
1279 for k in policy.keys():
1281 policy[k] = self.default_policy.get(k)
1282 for k, v in policy.iteritems():
1284 q = int(v) # May raise ValueError.
1287 elif k == 'versioning':
1288 if v not in ['auto', 'none']:
1293 def _put_policy(self, node, policy, replace):
1295 for k, v in self.default_policy.iteritems():
1298 self.node.policy_set(node, policy)
1300 def _get_policy(self, node):
1301 policy = self.default_policy.copy()
1302 policy.update(self.node.policy_get(node))
1305 def _apply_versioning(self, account, container, version_id):
1306 """Delete the provided version if such is the policy.
1307 Return size of object removed.
1310 if version_id is None:
1312 path, node = self._lookup_container(account, container)
1313 versioning = self._get_policy(node)['versioning']
1314 if versioning != 'auto':
1315 hash, size = self.node.version_remove(version_id)
1316 self.store.map_delete(hash)
1320 # Access control functions.
1322 def _check_groups(self, groups):
1323 # raise ValueError('Bad characters in groups')
1326 def _check_permissions(self, path, permissions):
1327 # raise ValueError('Bad characters in permissions')
1330 def _get_formatted_paths(self, paths):
1333 node = self.node.node_lookup(p)
1334 if node is not None:
1335 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1336 if props is not None:
1337 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1338 formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1339 formatted.append((p, self.MATCH_EXACT))
1342 def _get_permissions_path(self, account, container, name):
1343 path = '/'.join((account, container, name))
1344 permission_paths = self.permissions.access_inherit(path)
1345 permission_paths.sort()
1346 permission_paths.reverse()
1347 for p in permission_paths:
1351 if p.count('/') < 2:
1353 node = self.node.node_lookup(p)
1354 if node is not None:
1355 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1356 if props is not None:
1357 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1361 def _can_read(self, user, account, container, name):
1364 path = '/'.join((account, container, name))
1365 if self.permissions.public_get(path) is not None:
1367 path = self._get_permissions_path(account, container, name)
1369 raise NotAllowedError
1370 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1371 raise NotAllowedError
1373 def _can_write(self, user, account, container, name):
1376 path = '/'.join((account, container, name))
1377 path = self._get_permissions_path(account, container, name)
1379 raise NotAllowedError
1380 if not self.permissions.access_check(path, self.WRITE, user):
1381 raise NotAllowedError
1383 def _allowed_accounts(self, user):
1385 for path in self.permissions.access_list_paths(user):
1386 allow.add(path.split('/', 1)[0])
1387 return sorted(allow)
1389 def _allowed_containers(self, user, account):
1391 for path in self.permissions.access_list_paths(user, account):
1392 allow.add(path.split('/', 2)[1])
1393 return sorted(allow)