1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43 AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
45 # Stripped-down version of the HashMap class found in tools.
50 def __init__(self, blocksize, blockhash):
51 super(HashMap, self).__init__()
52 self.blocksize = blocksize
53 self.blockhash = blockhash
55 def _hash_raw(self, v):
56 h = hashlib.new(self.blockhash)
62 return self._hash_raw('')
64 return self.__getitem__(0)
70 h += [('\x00' * len(h[0]))] * (s - len(h))
72 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
75 # Default modules and settings.
76 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
77 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
78 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
79 DEFAULT_BLOCK_PATH = 'data/'
80 DEFAULT_BLOCK_UMASK = 0o022
81 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
84 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
85 QUEUE_CLIENT_ID = 'pithos'
86 QUEUE_INSTANCE_ID = '1'
88 (CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED) = range(3)
95 logger = logging.getLogger(__name__)
98 def backend_method(func=None, autocommit=1):
101 return backend_method(func, autocommit)
107 def fn(self, *args, **kw):
108 self.wrapper.execute()
111 ret = func(self, *args, **kw)
112 for m in self.messages:
114 self.wrapper.commit()
117 self.wrapper.rollback()
122 class ModularBackend(BaseBackend):
123 """A modular backend.
125 Uses modules for SQL functions and storage.
128 def __init__(self, db_module=None, db_connection=None,
129 block_module=None, block_path=None, block_umask=None,
130 queue_module=None, queue_connection=None):
131 db_module = db_module or DEFAULT_DB_MODULE
132 db_connection = db_connection or DEFAULT_DB_CONNECTION
133 block_module = block_module or DEFAULT_BLOCK_MODULE
134 block_path = block_path or DEFAULT_BLOCK_PATH
135 block_umask = block_umask or DEFAULT_BLOCK_UMASK
136 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137 #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
139 self.hash_algorithm = 'sha256'
140 self.block_size = 4 * 1024 * 1024 # 4MB
142 self.default_policy = {'quota': DEFAULT_QUOTA,
143 'versioning': DEFAULT_VERSIONING}
147 return sys.modules[m]
149 self.db_module = load_module(db_module)
150 self.wrapper = self.db_module.DBWrapper(db_connection)
151 params = {'wrapper': self.wrapper}
152 self.permissions = self.db_module.Permissions(**params)
153 self.config = seld.db_module.Config(**params)
154 self.config = seld.db_module.QuotaholderSync(**params)
155 for x in ['READ', 'WRITE']:
156 setattr(self, x, getattr(self.db_module, x))
157 self.node = self.db_module.Node(**params)
158 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
159 setattr(self, x, getattr(self.db_module, x))
161 self.block_module = load_module(block_module)
162 params = {'path': block_path,
163 'block_size': self.block_size,
164 'hash_algorithm': self.hash_algorithm,
165 'umask': block_umask}
166 self.store = self.block_module.Store(**params)
168 if queue_module and queue_connection:
169 self.queue_module = load_module(queue_module)
170 params = {'exchange': queue_connection,
171 'client_id': QUEUE_CLIENT_ID}
172 self.queue = self.queue_module.Queue(**params)
175 def send(self, *args):
181 self.queue = NoQueue()
188 def list_accounts(self, user, marker=None, limit=10000):
189 """Return a list of accounts the user can access."""
191 logger.debug("list_accounts: %s %s %s", user, marker, limit)
192 allowed = self._allowed_accounts(user)
193 start, limit = self._list_limits(allowed, marker, limit)
194 return allowed[start:start + limit]
197 def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
198 """Return a dictionary with the account metadata for the domain."""
201 "get_account_meta: %s %s %s %s", user, account, domain, until)
202 path, node = self._lookup_account(account, user == account)
204 if until or node is None or account not in self._allowed_accounts(user):
205 raise NotAllowedError
207 props = self._get_properties(node, until)
208 mtime = props[self.MTIME]
212 count, bytes, tstamp = self._get_statistics(node, until)
213 tstamp = max(tstamp, mtime)
217 modified = self._get_statistics(
218 node)[2] # Overall last modification.
219 modified = max(modified, mtime)
222 meta = {'name': account}
225 if props is not None and include_user_defined:
227 dict(self.node.attribute_get(props[self.SERIAL], domain)))
228 if until is not None:
229 meta.update({'until_timestamp': tstamp})
230 meta.update({'name': account, 'count': count, 'bytes': bytes})
231 meta.update({'modified': modified})
235 def update_account_meta(self, user, account, domain, meta, replace=False):
236 """Update the metadata associated with the account for the domain."""
238 logger.debug("update_account_meta: %s %s %s %s %s", user,
239 account, domain, meta, replace)
241 raise NotAllowedError
242 path, node = self._lookup_account(account, True)
243 self._put_metadata(user, node, domain, meta, replace)
246 def get_account_groups(self, user, account):
247 """Return a dictionary with the user groups defined for this account."""
249 logger.debug("get_account_groups: %s %s", user, account)
251 if account not in self._allowed_accounts(user):
252 raise NotAllowedError
254 self._lookup_account(account, True)
255 return self.permissions.group_dict(account)
258 def update_account_groups(self, user, account, groups, replace=False):
259 """Update the groups associated with the account."""
261 logger.debug("update_account_groups: %s %s %s %s", user,
262 account, groups, replace)
264 raise NotAllowedError
265 self._lookup_account(account, True)
266 self._check_groups(groups)
268 self.permissions.group_destroy(account)
269 for k, v in groups.iteritems():
270 if not replace: # If not already deleted.
271 self.permissions.group_delete(account, k)
273 self.permissions.group_addmany(account, k, v)
276 def get_account_policy(self, user, account):
277 """Return a dictionary with the account policy."""
279 logger.debug("get_account_policy: %s %s", user, account)
281 if account not in self._allowed_accounts(user):
282 raise NotAllowedError
284 path, node = self._lookup_account(account, True)
285 return self._get_policy(node)
288 def update_account_policy(self, user, account, policy, replace=False):
289 """Update the policy associated with the account."""
291 logger.debug("update_account_policy: %s %s %s %s", user,
292 account, policy, replace)
294 raise NotAllowedError
295 path, node = self._lookup_account(account, True)
296 self._check_policy(policy)
297 self._put_policy(node, policy, replace)
300 def put_account(self, user, account, policy={}):
301 """Create a new account with the given name."""
303 logger.debug("put_account: %s %s %s", user, account, policy)
305 raise NotAllowedError
306 node = self.node.node_lookup(account)
308 raise AccountExists('Account already exists')
310 self._check_policy(policy)
311 node = self._put_path(user, self.ROOTNODE, account)
312 self._put_policy(node, policy, True)
315 def delete_account(self, user, account):
316 """Delete the account with the given name."""
318 logger.debug("delete_account: %s %s", user, account)
320 raise NotAllowedError
321 node = self.node.node_lookup(account)
324 if not self.node.node_remove(node):
325 raise AccountNotEmpty('Account is not empty')
326 self.permissions.group_destroy(account)
329 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
330 """Return a list of containers existing under an account."""
332 logger.debug("list_containers: %s %s %s %s %s %s %s", user,
333 account, marker, limit, shared, until, public)
335 if until or account not in self._allowed_accounts(user):
336 raise NotAllowedError
337 allowed = self._allowed_containers(user, account)
338 start, limit = self._list_limits(allowed, marker, limit)
339 return allowed[start:start + limit]
343 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
345 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
346 allowed = sorted(allowed)
347 start, limit = self._list_limits(allowed, marker, limit)
348 return allowed[start:start + limit]
349 node = self.node.node_lookup(account)
350 containers = [x[0] for x in self._list_object_properties(
351 node, account, '', '/', marker, limit, False, None, [], until)]
352 start, limit = self._list_limits(
353 [x[0] for x in containers], marker, limit)
354 return containers[start:start + limit]
357 def list_container_meta(self, user, account, container, domain, until=None):
358 """Return a list with all the container's object meta keys for the domain."""
360 logger.debug("list_container_meta: %s %s %s %s %s", user,
361 account, container, domain, until)
365 raise NotAllowedError
366 allowed = self.permissions.access_list_paths(
367 user, '/'.join((account, container)))
369 raise NotAllowedError
370 path, node = self._lookup_container(account, container)
371 before = until if until is not None else inf
372 allowed = self._get_formatted_paths(allowed)
373 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
376 def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
377 """Return a dictionary with the container metadata for the domain."""
379 logger.debug("get_container_meta: %s %s %s %s %s", user,
380 account, container, domain, until)
382 if until or container not in self._allowed_containers(user, account):
383 raise NotAllowedError
384 path, node = self._lookup_container(account, container)
385 props = self._get_properties(node, until)
386 mtime = props[self.MTIME]
387 count, bytes, tstamp = self._get_statistics(node, until)
388 tstamp = max(tstamp, mtime)
392 modified = self._get_statistics(
393 node)[2] # Overall last modification.
394 modified = max(modified, mtime)
397 meta = {'name': container}
400 if include_user_defined:
402 dict(self.node.attribute_get(props[self.SERIAL], domain)))
403 if until is not None:
404 meta.update({'until_timestamp': tstamp})
405 meta.update({'name': container, 'count': count, 'bytes': bytes})
406 meta.update({'modified': modified})
410 def update_container_meta(self, user, account, container, domain, meta, replace=False):
411 """Update the metadata associated with the container for the domain."""
413 logger.debug("update_container_meta: %s %s %s %s %s %s",
414 user, account, container, domain, meta, replace)
416 raise NotAllowedError
417 path, node = self._lookup_container(account, container)
418 src_version_id, dest_version_id = self._put_metadata(
419 user, node, domain, meta, replace)
420 if src_version_id is not None:
421 versioning = self._get_policy(node)['versioning']
422 if versioning != 'auto':
423 self.node.version_remove(src_version_id)
426 def get_container_policy(self, user, account, container):
427 """Return a dictionary with the container policy."""
430 "get_container_policy: %s %s %s", user, account, container)
432 if container not in self._allowed_containers(user, account):
433 raise NotAllowedError
435 path, node = self._lookup_container(account, container)
436 return self._get_policy(node)
439 def update_container_policy(self, user, account, container, policy, replace=False):
440 """Update the policy associated with the container."""
442 logger.debug("update_container_policy: %s %s %s %s %s",
443 user, account, container, policy, replace)
445 raise NotAllowedError
446 path, node = self._lookup_container(account, container)
447 self._check_policy(policy)
448 self._put_policy(node, policy, replace)
451 def put_container(self, user, account, container, policy={}):
452 """Create a new container with the given name."""
455 "put_container: %s %s %s %s", user, account, container, policy)
457 raise NotAllowedError
459 path, node = self._lookup_container(account, container)
463 raise ContainerExists('Container already exists')
465 self._check_policy(policy)
466 path = '/'.join((account, container))
467 node = self._put_path(
468 user, self._lookup_account(account, True)[1], path)
469 self._put_policy(node, policy, True)
472 def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
473 """Delete/purge the container with the given name."""
475 logger.debug("delete_container: %s %s %s %s %s %s", user,
476 account, container, until, prefix, delimiter)
478 raise NotAllowedError
479 path, node = self._lookup_container(account, container)
481 if until is not None:
482 hashes, size = self.node.node_purge_children(
483 node, until, CLUSTER_HISTORY)
485 self.store.map_delete(h)
486 self.node.node_purge_children(node, until, CLUSTER_DELETED)
487 self._report_size_change(user, account, -size, {'action':
488 'container purge', 'path': path})
492 if self._get_statistics(node)[0] > 0:
493 raise ContainerNotEmpty('Container is not empty')
494 hashes, size = self.node.node_purge_children(
495 node, inf, CLUSTER_HISTORY)
497 self.store.map_delete(h)
498 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
499 self.node.node_remove(node)
500 self._report_size_change(user, account, -size, {'action':
501 'container delete', 'path': path})
503 # remove only contents
504 src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
507 path = '/'.join((account, container, t[0]))
509 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
510 del_size = self._apply_versioning(
511 account, container, src_version_id)
513 self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
514 self._report_object_change(
515 user, account, path, details={'action': 'object delete'})
517 self.permissions.access_clear_bulk(paths)
519 def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
520 if user != account and until:
521 raise NotAllowedError
522 if shared and public:
524 shared = self._list_object_permissions(
525 user, account, container, prefix, shared=True, public=False)
528 path, node = self._lookup_container(account, container)
529 shared = self._get_formatted_paths(shared)
530 objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
533 objects |= set(self._list_public_object_properties(
534 user, account, container, prefix, all_props))
535 objects = list(objects)
537 objects.sort(key=lambda x: x[0])
538 start, limit = self._list_limits(
539 [x[0] for x in objects], marker, limit)
540 return objects[start:start + limit]
542 objects = self._list_public_object_properties(
543 user, account, container, prefix, all_props)
544 start, limit = self._list_limits(
545 [x[0] for x in objects], marker, limit)
546 return objects[start:start + limit]
548 allowed = self._list_object_permissions(
549 user, account, container, prefix, shared, public)
550 if shared and not allowed:
552 path, node = self._lookup_container(account, container)
553 allowed = self._get_formatted_paths(allowed)
554 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
555 start, limit = self._list_limits(
556 [x[0] for x in objects], marker, limit)
557 return objects[start:start + limit]
559 def _list_public_object_properties(self, user, account, container, prefix, all_props):
560 public = self._list_object_permissions(
561 user, account, container, prefix, shared=False, public=True)
562 paths, nodes = self._lookup_objects(public)
563 path = '/'.join((account, container))
564 cont_prefix = path + '/'
565 paths = [x[len(cont_prefix):] for x in paths]
566 props = self.node.version_lookup_bulk(nodes, all_props=all_props)
567 objects = [(path,) + props for path, props in zip(paths, props)]
570 def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
573 marker = objects[-1] if objects else None
575 l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
577 if not l or len(l) < limit:
581 def _list_object_permissions(self, user, account, container, prefix, shared, public):
583 path = '/'.join((account, container, prefix)).rstrip('/')
585 allowed = self.permissions.access_list_paths(user, path)
587 raise NotAllowedError
591 allowed.update(self.permissions.access_list_shared(path))
594 [x[0] for x in self.permissions.public_list(path)])
595 allowed = sorted(allowed)
601 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
602 """Return a list of object (name, version_id) tuples existing under a container."""
604 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
605 return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
608 def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
609 """Return a list of object metadata dicts existing under a container."""
611 logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
612 props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
616 objects.append({'subdir': p[0]})
618 objects.append({'name': p[0],
619 'bytes': p[self.SIZE + 1],
620 'type': p[self.TYPE + 1],
621 'hash': p[self.HASH + 1],
622 'version': p[self.SERIAL + 1],
623 'version_timestamp': p[self.MTIME + 1],
624 'modified': p[self.MTIME + 1] if until is None else None,
625 'modified_by': p[self.MUSER + 1],
626 'uuid': p[self.UUID + 1],
627 'checksum': p[self.CHECKSUM + 1]})
631 def list_object_permissions(self, user, account, container, prefix=''):
632 """Return a list of paths that enforce permissions under a container."""
634 logger.debug("list_object_permissions: %s %s %s %s", user,
635 account, container, prefix)
636 return self._list_object_permissions(user, account, container, prefix, True, False)
639 def list_object_public(self, user, account, container, prefix=''):
640 """Return a dict mapping paths to public ids for objects that are public under a container."""
642 logger.debug("list_object_public: %s %s %s %s", user,
643 account, container, prefix)
645 for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
646 public[path] = p + ULTIMATE_ANSWER
650 def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
651 """Return a dictionary with the object metadata for the domain."""
653 logger.debug("get_object_meta: %s %s %s %s %s %s", user,
654 account, container, name, domain, version)
655 self._can_read(user, account, container, name)
656 path, node = self._lookup_object(account, container, name)
657 props = self._get_version(node, version)
659 modified = props[self.MTIME]
662 modified = self._get_version(
663 node)[self.MTIME] # Overall last modification.
664 except NameError: # Object may be deleted.
665 del_props = self.node.version_lookup(
666 node, inf, CLUSTER_DELETED)
667 if del_props is None:
668 raise ItemNotExists('Object does not exist')
669 modified = del_props[self.MTIME]
672 if include_user_defined:
674 dict(self.node.attribute_get(props[self.SERIAL], domain)))
675 meta.update({'name': name,
676 'bytes': props[self.SIZE],
677 'type': props[self.TYPE],
678 'hash': props[self.HASH],
679 'version': props[self.SERIAL],
680 'version_timestamp': props[self.MTIME],
681 'modified': modified,
682 'modified_by': props[self.MUSER],
683 'uuid': props[self.UUID],
684 'checksum': props[self.CHECKSUM]})
688 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
689 """Update the metadata associated with the object for the domain and return the new version."""
691 logger.debug("update_object_meta: %s %s %s %s %s %s %s",
692 user, account, container, name, domain, meta, replace)
693 self._can_write(user, account, container, name)
694 path, node = self._lookup_object(account, container, name)
695 src_version_id, dest_version_id = self._put_metadata(
696 user, node, domain, meta, replace)
697 self._apply_versioning(account, container, src_version_id)
698 return dest_version_id
701 def get_object_permissions(self, user, account, container, name):
702 """Return the action allowed on the object, the path
703 from which the object gets its permissions from,
704 along with a dictionary containing the permissions."""
706 logger.debug("get_object_permissions: %s %s %s %s", user,
707 account, container, name)
709 permissions_path = self._get_permissions_path(account, container, name)
711 if self.permissions.access_check(permissions_path, self.WRITE, user):
713 elif self.permissions.access_check(permissions_path, self.READ, user):
716 raise NotAllowedError
717 self._lookup_object(account, container, name)
718 return (allowed, permissions_path, self.permissions.access_get(permissions_path))
721 def update_object_permissions(self, user, account, container, name, permissions):
722 """Update the permissions associated with the object."""
724 logger.debug("update_object_permissions: %s %s %s %s %s",
725 user, account, container, name, permissions)
727 raise NotAllowedError
728 path = self._lookup_object(account, container, name)[0]
729 self._check_permissions(path, permissions)
730 self.permissions.access_set(path, permissions)
731 self._report_sharing_change(user, account, path, {'members':
732 self.permissions.access_members(path)})
735 def get_object_public(self, user, account, container, name):
736 """Return the public id of the object if applicable."""
739 "get_object_public: %s %s %s %s", user, account, container, name)
740 self._can_read(user, account, container, name)
741 path = self._lookup_object(account, container, name)[0]
742 p = self.permissions.public_get(path)
748 def update_object_public(self, user, account, container, name, public):
749 """Update the public status of the object."""
751 logger.debug("update_object_public: %s %s %s %s %s", user,
752 account, container, name, public)
753 self._can_write(user, account, container, name)
754 path = self._lookup_object(account, container, name)[0]
756 self.permissions.public_unset(path)
758 self.permissions.public_set(path)
761 def get_object_hashmap(self, user, account, container, name, version=None):
762 """Return the object's size and a list with partial hashes."""
764 logger.debug("get_object_hashmap: %s %s %s %s %s", user,
765 account, container, name, version)
766 self._can_read(user, account, container, name)
767 path, node = self._lookup_object(account, container, name)
768 props = self._get_version(node, version)
769 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
770 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
772 def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
773 if permissions is not None and user != account:
774 raise NotAllowedError
775 self._can_write(user, account, container, name)
776 if permissions is not None:
777 path = '/'.join((account, container, name))
778 self._check_permissions(path, permissions)
780 account_path, account_node = self._lookup_account(account, True)
781 container_path, container_node = self._lookup_container(
783 path, node = self._put_object_node(
784 container_path, container_node, name)
785 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
788 if src_version_id is None:
789 src_version_id = pre_version_id
790 self._put_metadata_duplicate(
791 src_version_id, dest_version_id, domain, meta, replace_meta)
794 del_size = self._apply_versioning(account, container, pre_version_id)
795 size_delta = size - del_size
797 account_quota = long(self._get_policy(account_node)['quota'])
798 container_quota = long(self._get_policy(container_node)['quota'])
799 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
800 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
801 # This must be executed in a transaction, so the version is never created if it fails.
803 self._report_size_change(user, account, size_delta, {
804 'action': 'object update', 'path': path})
806 if permissions is not None:
807 self.permissions.access_set(path, permissions)
808 self._report_sharing_change(user, account, path, {'members': self.permissions.access_members(path)})
810 self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
811 return dest_version_id
814 def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
815 """Create/update an object with the specified size and partial hashes."""
817 logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user,
818 account, container, name, size, type, hashmap, checksum)
819 if size == 0: # No such thing as an empty hashmap.
820 hashmap = [self.put_block('')]
821 map = HashMap(self.block_size, self.hash_algorithm)
822 map.extend([binascii.unhexlify(x) for x in hashmap])
823 missing = self.store.block_search(map)
826 ie.data = [binascii.hexlify(x) for x in missing]
830 dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
831 self.store.map_put(hash, map)
832 return dest_version_id
835 def update_object_checksum(self, user, account, container, name, version, checksum):
836 """Update an object's checksum."""
838 logger.debug("update_object_checksum: %s %s %s %s %s %s",
839 user, account, container, name, version, checksum)
840 # Update objects with greater version and same hashmap and size (fix metadata updates).
841 self._can_write(user, account, container, name)
842 path, node = self._lookup_object(account, container, name)
843 props = self._get_version(node, version)
844 versions = self.node.node_get_versions(node)
846 if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
847 self.node.version_put_property(
848 x[self.SERIAL], 'checksum', checksum)
850 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
851 dest_version_ids = []
852 self._can_read(user, src_account, src_container, src_name)
853 path, node = self._lookup_object(src_account, src_container, src_name)
854 # TODO: Will do another fetch of the properties in duplicate version...
855 props = self._get_version(
856 node, src_version) # Check to see if source exists.
857 src_version_id = props[self.SERIAL]
858 hash = props[self.HASH]
859 size = props[self.SIZE]
860 is_copy = not is_move and (src_account, src_container, src_name) != (
861 dest_account, dest_container, dest_name) # New uuid.
862 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
863 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
864 self._delete_object(user, src_account, src_container, src_name)
867 prefix = src_name + \
868 delimiter if not src_name.endswith(delimiter) else src_name
869 src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
870 src_names.sort(key=lambda x: x[2]) # order by nodes
871 paths = [elem[0] for elem in src_names]
872 nodes = [elem[2] for elem in src_names]
873 # TODO: Will do another fetch of the properties in duplicate version...
874 props = self._get_versions(nodes) # Check to see if source exists.
876 for prop, path, node in zip(props, paths, nodes):
877 src_version_id = prop[self.SERIAL]
878 hash = prop[self.HASH]
879 vtype = prop[self.TYPE]
880 size = prop[self.SIZE]
881 dest_prefix = dest_name + delimiter if not dest_name.endswith(
882 delimiter) else dest_name
883 vdest_name = path.replace(prefix, dest_prefix, 1)
884 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
885 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
886 self._delete_object(user, src_account, src_container, path)
887 return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
890 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
891 """Copy an object's data and metadata."""
893 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
894 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
895 return dest_version_id
898 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
899 """Move an object's data and metadata."""
901 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
902 if user != src_account:
903 raise NotAllowedError
904 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
905 return dest_version_id
907 def _delete_object(self, user, account, container, name, until=None, delimiter=None):
909 raise NotAllowedError
911 if until is not None:
912 path = '/'.join((account, container, name))
913 node = self.node.node_lookup(path)
918 h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
921 h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
925 self.store.map_delete(h)
926 self.node.node_purge(node, until, CLUSTER_DELETED)
928 props = self._get_version(node)
930 self.permissions.access_clear(path)
931 self._report_size_change(user, account, -size, {
932 'action': 'object purge', 'path': path})
935 path, node = self._lookup_object(account, container, name)
936 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
937 del_size = self._apply_versioning(account, container, src_version_id)
939 self._report_size_change(user, account, -del_size, {
940 'action': 'object delete', 'path': path})
941 self._report_object_change(
942 user, account, path, details={'action': 'object delete'})
943 self.permissions.access_clear(path)
946 prefix = name + delimiter if not name.endswith(delimiter) else name
947 src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
950 path = '/'.join((account, container, t[0]))
952 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
953 del_size = self._apply_versioning(
954 account, container, src_version_id)
956 self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
957 self._report_object_change(
958 user, account, path, details={'action': 'object delete'})
960 self.permissions.access_clear_bulk(paths)
963 def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
964 """Delete/purge an object."""
966 logger.debug("delete_object: %s %s %s %s %s %s %s", user,
967 account, container, name, until, prefix, delimiter)
968 self._delete_object(user, account, container, name, until, delimiter)
971 def list_versions(self, user, account, container, name):
972 """Return a list of all (version, version_timestamp) tuples for an object."""
975 "list_versions: %s %s %s %s", user, account, container, name)
976 self._can_read(user, account, container, name)
977 path, node = self._lookup_object(account, container, name)
978 versions = self.node.node_get_versions(node)
979 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
982 def get_uuid(self, user, uuid):
983 """Return the (account, container, name) for the UUID given."""
985 logger.debug("get_uuid: %s %s", user, uuid)
986 info = self.node.latest_uuid(uuid)
990 account, container, name = path.split('/', 2)
991 self._can_read(user, account, container, name)
992 return (account, container, name)
995 def get_public(self, user, public):
996 """Return the (account, container, name) for the public id given."""
998 logger.debug("get_public: %s %s", user, public)
999 if public is None or public < ULTIMATE_ANSWER:
1001 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
1004 account, container, name = path.split('/', 2)
1005 self._can_read(user, account, container, name)
1006 return (account, container, name)
1008 @backend_method(autocommit=0)
1009 def get_block(self, hash):
1010 """Return a block's data."""
1012 logger.debug("get_block: %s", hash)
1013 block = self.store.block_get(binascii.unhexlify(hash))
1015 raise ItemNotExists('Block does not exist')
1018 @backend_method(autocommit=0)
1019 def put_block(self, data):
1020 """Store a block and return the hash."""
1022 logger.debug("put_block: %s", len(data))
1023 return binascii.hexlify(self.store.block_put(data))
1025 @backend_method(autocommit=0)
1026 def update_block(self, hash, data, offset=0):
1027 """Update a known block and return the hash."""
1029 logger.debug("update_block: %s %s %s", hash, len(data), offset)
1030 if offset == 0 and len(data) == self.block_size:
1031 return self.put_block(data)
1032 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
1033 return binascii.hexlify(h)
1037 def _generate_uuid(self):
1038 return str(uuidlib.uuid4())
1040 def _put_object_node(self, path, parent, name):
1041 path = '/'.join((path, name))
1042 node = self.node.node_lookup(path)
1044 node = self.node.node_create(parent, path)
1047 def _put_path(self, user, parent, path):
1048 node = self.node.node_create(parent, path)
1049 self.node.version_create(node, None, 0, '', None, user,
1050 self._generate_uuid(), '', CLUSTER_NORMAL)
1053 def _lookup_account(self, account, create=True):
1054 node = self.node.node_lookup(account)
1055 if node is None and create:
1056 node = self._put_path(
1057 account, self.ROOTNODE, account) # User is account.
1058 return account, node
1060 def _lookup_container(self, account, container):
1061 path = '/'.join((account, container))
1062 node = self.node.node_lookup(path)
1064 raise ItemNotExists('Container does not exist')
1067 def _lookup_object(self, account, container, name):
1068 path = '/'.join((account, container, name))
1069 node = self.node.node_lookup(path)
1071 raise ItemNotExists('Object does not exist')
1074 def _lookup_objects(self, paths):
1075 nodes = self.node.node_lookup_bulk(paths)
1078 def _get_properties(self, node, until=None):
1079 """Return properties until the timestamp given."""
1081 before = until if until is not None else inf
1082 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1083 if props is None and until is not None:
1084 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
1086 raise ItemNotExists('Path does not exist')
1089 def _get_statistics(self, node, until=None):
1090 """Return count, sum of size and latest timestamp of everything under node."""
1093 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1095 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1100 def _get_version(self, node, version=None):
1102 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1104 raise ItemNotExists('Object does not exist')
1107 version = int(version)
1109 raise VersionNotExists('Version does not exist')
1110 props = self.node.version_get_properties(version)
1111 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1112 raise VersionNotExists('Version does not exist')
1115 def _get_versions(self, nodes):
1116 return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1118 def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1119 """Create a new version of the node."""
1121 props = self.node.version_lookup(
1122 node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1123 if props is not None:
1124 src_version_id = props[self.SERIAL]
1125 src_hash = props[self.HASH]
1126 src_size = props[self.SIZE]
1127 src_type = props[self.TYPE]
1128 src_checksum = props[self.CHECKSUM]
1130 src_version_id = None
1135 if size is None: # Set metadata.
1136 hash = src_hash # This way hash can be set to None (account or container).
1140 if checksum is None:
1141 checksum = src_checksum
1142 uuid = self._generate_uuid(
1143 ) if (is_copy or src_version_id is None) else props[self.UUID]
1145 if src_node is None:
1146 pre_version_id = src_version_id
1148 pre_version_id = None
1149 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1150 if props is not None:
1151 pre_version_id = props[self.SERIAL]
1152 if pre_version_id is not None:
1153 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1155 dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1156 return pre_version_id, dest_version_id
1158 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1159 if src_version_id is not None:
1160 self.node.attribute_copy(src_version_id, dest_version_id)
1162 self.node.attribute_del(dest_version_id, domain, (
1163 k for k, v in meta.iteritems() if v == ''))
1164 self.node.attribute_set(dest_version_id, domain, (
1165 (k, v) for k, v in meta.iteritems() if v != ''))
1167 self.node.attribute_del(dest_version_id, domain)
1168 self.node.attribute_set(dest_version_id, domain, ((
1169 k, v) for k, v in meta.iteritems()))
1171 def _put_metadata(self, user, node, domain, meta, replace=False):
1172 """Create a new version and store metadata."""
1174 src_version_id, dest_version_id = self._put_version_duplicate(
1176 self._put_metadata_duplicate(
1177 src_version_id, dest_version_id, domain, meta, replace)
1178 return src_version_id, dest_version_id
1180 def _list_limits(self, listing, marker, limit):
1184 start = listing.index(marker) + 1
1187 if not limit or limit > 10000:
1191 def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1192 cont_prefix = path + '/'
1193 prefix = cont_prefix + prefix
1194 start = cont_prefix + marker if marker else None
1195 before = until if until is not None else inf
1196 filterq = keys if domain else []
1199 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1200 objects.extend([(p, None) for p in prefixes] if virtual else [])
1201 objects.sort(key=lambda x: x[0])
1202 objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1205 # Reporting functions.
1207 def _report_size_change(self, user, account, size, details={}):
1208 account_node = self._lookup_account(account, True)[1]
1209 total = self._get_statistics(account_node)[1]
1210 details.update({'user': user, 'total': total})
1212 "_report_size_change: %s %s %s %s", user, account, size, details)
1213 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
1220 def _report_object_change(self, user, account, path, details={}):
1221 details.update({'user': user})
1222 logger.debug("_report_object_change: %s %s %s %s", user,
1223 account, path, details)
1224 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
1225 'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1227 def _report_sharing_change(self, user, account, path, details={}):
1228 logger.debug("_report_permissions_change: %s %s %s %s",
1229 user, account, path, details)
1230 details.update({'user': user})
1231 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
1232 QUEUE_INSTANCE_ID, 'sharing', path, details))
1236 def _check_policy(self, policy):
1237 for k in policy.keys():
1239 policy[k] = self.default_policy.get(k)
1240 for k, v in policy.iteritems():
1242 q = int(v) # May raise ValueError.
1245 elif k == 'versioning':
1246 if v not in ['auto', 'none']:
1251 def _put_policy(self, node, policy, replace):
1253 for k, v in self.default_policy.iteritems():
1256 self.node.policy_set(node, policy)
1258 def _get_policy(self, node):
1259 policy = self.default_policy.copy()
1260 policy.update(self.node.policy_get(node))
1263 def _apply_versioning(self, account, container, version_id):
1264 """Delete the provided version if such is the policy.
1265 Return size of object removed.
1268 if version_id is None:
1270 path, node = self._lookup_container(account, container)
1271 versioning = self._get_policy(node)['versioning']
1272 if versioning != 'auto':
1273 hash, size = self.node.version_remove(version_id)
1274 self.store.map_delete(hash)
1278 # Access control functions.
1280 def _check_groups(self, groups):
1281 # raise ValueError('Bad characters in groups')
1284 def _check_permissions(self, path, permissions):
1285 # raise ValueError('Bad characters in permissions')
1288 def _get_formatted_paths(self, paths):
1291 node = self.node.node_lookup(p)
1293 if node is not None:
1294 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1295 if props is not None:
1296 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1297 formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1298 formatted.append((p, self.MATCH_EXACT))
1301 def _get_permissions_path(self, account, container, name):
1302 path = '/'.join((account, container, name))
1303 permission_paths = self.permissions.access_inherit(path)
1304 permission_paths.sort()
1305 permission_paths.reverse()
1306 for p in permission_paths:
1310 if p.count('/') < 2:
1312 node = self.node.node_lookup(p)
1313 if node is not None:
1314 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1315 if props is not None:
1316 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1320 def _can_read(self, user, account, container, name):
1323 path = '/'.join((account, container, name))
1324 if self.permissions.public_get(path) is not None:
1326 path = self._get_permissions_path(account, container, name)
1328 raise NotAllowedError
1329 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1330 raise NotAllowedError
1332 def _can_write(self, user, account, container, name):
1335 path = '/'.join((account, container, name))
1336 path = self._get_permissions_path(account, container, name)
1338 raise NotAllowedError
1339 if not self.permissions.access_check(path, self.WRITE, user):
1340 raise NotAllowedError
1342 def _allowed_accounts(self, user):
1344 for path in self.permissions.access_list_paths(user):
1345 allow.add(path.split('/', 1)[0])
1346 return sorted(allow)
1348 def _allowed_containers(self, user, account):
1350 for path in self.permissions.access_list_paths(user, account):
1351 allow.add(path.split('/', 2)[1])
1352 return sorted(allow)