1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
43 AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
45 # Stripped-down version of the HashMap class found in tools.
48 def __init__(self, blocksize, blockhash):
49 super(HashMap, self).__init__()
50 self.blocksize = blocksize
51 self.blockhash = blockhash
53 def _hash_raw(self, v):
54 h = hashlib.new(self.blockhash)
60 return self._hash_raw('')
62 return self.__getitem__(0)
68 h += [('\x00' * len(h[0]))] * (s - len(h))
70 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
73 # Default modules and settings.
74 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
75 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
76 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
77 DEFAULT_BLOCK_PATH = 'data/'
78 DEFAULT_BLOCK_UMASK = 0o022
79 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
82 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 QUEUE_CLIENT_ID = 'pithos'
84 QUEUE_INSTANCE_ID = '1'
86 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
93 logger = logging.getLogger(__name__)
96 def backend_method(func=None, autocommit=1):
99 return backend_method(func, autocommit)
104 def fn(self, *args, **kw):
105 self.wrapper.execute()
108 ret = func(self, *args, **kw)
109 for m in self.messages:
111 self.wrapper.commit()
114 self.wrapper.rollback()
119 class ModularBackend(BaseBackend):
120 """A modular backend.
122 Uses modules for SQL functions and storage.
125 def __init__(self, db_module=None, db_connection=None,
126 block_module=None, block_path=None, block_umask=None,
127 queue_module=None, queue_connection=None):
128 db_module = db_module or DEFAULT_DB_MODULE
129 db_connection = db_connection or DEFAULT_DB_CONNECTION
130 block_module = block_module or DEFAULT_BLOCK_MODULE
131 block_path = block_path or DEFAULT_BLOCK_PATH
132 block_umask = block_umask or DEFAULT_BLOCK_UMASK
133 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134 #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
136 self.hash_algorithm = 'sha256'
137 self.block_size = 4 * 1024 * 1024 # 4MB
139 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
143 return sys.modules[m]
145 self.db_module = load_module(db_module)
146 self.wrapper = self.db_module.DBWrapper(db_connection)
147 params = {'wrapper': self.wrapper}
148 self.permissions = self.db_module.Permissions(**params)
149 for x in ['READ', 'WRITE']:
150 setattr(self, x, getattr(self.db_module, x))
151 self.node = self.db_module.Node(**params)
152 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153 setattr(self, x, getattr(self.db_module, x))
155 self.block_module = load_module(block_module)
156 params = {'path': block_path,
157 'block_size': self.block_size,
158 'hash_algorithm': self.hash_algorithm,
159 'umask': block_umask}
160 self.store = self.block_module.Store(**params)
162 if queue_module and queue_connection:
163 self.queue_module = load_module(queue_module)
164 params = {'exchange': queue_connection,
165 'client_id': QUEUE_CLIENT_ID}
166 self.queue = self.queue_module.Queue(**params)
169 def send(self, *args):
175 self.queue = NoQueue()
182 def list_accounts(self, user, marker=None, limit=10000):
183 """Return a list of accounts the user can access."""
185 logger.debug("list_accounts: %s %s %s", user, marker, limit)
186 allowed = self._allowed_accounts(user)
187 start, limit = self._list_limits(allowed, marker, limit)
188 return allowed[start:start + limit]
191 def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192 """Return a dictionary with the account metadata for the domain."""
194 logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195 path, node = self._lookup_account(account, user == account)
197 if until or node is None or account not in self._allowed_accounts(user):
198 raise NotAllowedError
200 props = self._get_properties(node, until)
201 mtime = props[self.MTIME]
205 count, bytes, tstamp = self._get_statistics(node, until)
206 tstamp = max(tstamp, mtime)
210 modified = self._get_statistics(node)[2] # Overall last modification.
211 modified = max(modified, mtime)
214 meta = {'name': account}
217 if props is not None and include_user_defined:
218 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
219 if until is not None:
220 meta.update({'until_timestamp': tstamp})
221 meta.update({'name': account, 'count': count, 'bytes': bytes})
222 meta.update({'modified': modified})
226 def update_account_meta(self, user, account, domain, meta, replace=False):
227 """Update the metadata associated with the account for the domain."""
229 logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
231 raise NotAllowedError
232 path, node = self._lookup_account(account, True)
233 self._put_metadata(user, node, domain, meta, replace)
236 def get_account_groups(self, user, account):
237 """Return a dictionary with the user groups defined for this account."""
239 logger.debug("get_account_groups: %s %s", user, account)
241 if account not in self._allowed_accounts(user):
242 raise NotAllowedError
244 self._lookup_account(account, True)
245 return self.permissions.group_dict(account)
248 def update_account_groups(self, user, account, groups, replace=False):
249 """Update the groups associated with the account."""
251 logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
253 raise NotAllowedError
254 self._lookup_account(account, True)
255 self._check_groups(groups)
257 self.permissions.group_destroy(account)
258 for k, v in groups.iteritems():
259 if not replace: # If not already deleted.
260 self.permissions.group_delete(account, k)
262 self.permissions.group_addmany(account, k, v)
265 def get_account_policy(self, user, account):
266 """Return a dictionary with the account policy."""
268 logger.debug("get_account_policy: %s %s", user, account)
270 if account not in self._allowed_accounts(user):
271 raise NotAllowedError
273 path, node = self._lookup_account(account, True)
274 return self._get_policy(node)
277 def update_account_policy(self, user, account, policy, replace=False):
278 """Update the policy associated with the account."""
280 logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
282 raise NotAllowedError
283 path, node = self._lookup_account(account, True)
284 self._check_policy(policy)
285 self._put_policy(node, policy, replace)
288 def put_account(self, user, account, policy={}):
289 """Create a new account with the given name."""
291 logger.debug("put_account: %s %s %s", user, account, policy)
293 raise NotAllowedError
294 node = self.node.node_lookup(account)
296 raise AccountExists('Account already exists')
298 self._check_policy(policy)
299 node = self._put_path(user, self.ROOTNODE, account)
300 self._put_policy(node, policy, True)
303 def delete_account(self, user, account):
304 """Delete the account with the given name."""
306 logger.debug("delete_account: %s %s", user, account)
308 raise NotAllowedError
309 node = self.node.node_lookup(account)
312 if not self.node.node_remove(node):
313 raise AccountNotEmpty('Account is not empty')
314 self.permissions.group_destroy(account)
317 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318 """Return a list of containers existing under an account."""
320 logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
322 if until or account not in self._allowed_accounts(user):
323 raise NotAllowedError
324 allowed = self._allowed_containers(user, account)
325 start, limit = self._list_limits(allowed, marker, limit)
326 return allowed[start:start + limit]
330 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
332 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
333 allowed = sorted(allowed)
334 start, limit = self._list_limits(allowed, marker, limit)
335 return allowed[start:start + limit]
336 node = self.node.node_lookup(account)
337 containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338 start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339 return containers[start:start + limit]
342 def list_container_meta(self, user, account, container, domain, until=None):
343 """Return a list with all the container's object meta keys for the domain."""
345 logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
349 raise NotAllowedError
350 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
352 raise NotAllowedError
353 path, node = self._lookup_container(account, container)
354 before = until if until is not None else inf
355 allowed = self._get_formatted_paths(allowed)
356 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
359 def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360 """Return a dictionary with the container metadata for the domain."""
362 logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
364 if until or container not in self._allowed_containers(user, account):
365 raise NotAllowedError
366 path, node = self._lookup_container(account, container)
367 props = self._get_properties(node, until)
368 mtime = props[self.MTIME]
369 count, bytes, tstamp = self._get_statistics(node, until)
370 tstamp = max(tstamp, mtime)
374 modified = self._get_statistics(node)[2] # Overall last modification.
375 modified = max(modified, mtime)
378 meta = {'name': container}
381 if include_user_defined:
382 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
383 if until is not None:
384 meta.update({'until_timestamp': tstamp})
385 meta.update({'name': container, 'count': count, 'bytes': bytes})
386 meta.update({'modified': modified})
390 def update_container_meta(self, user, account, container, domain, meta, replace=False):
391 """Update the metadata associated with the container for the domain."""
393 logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
395 raise NotAllowedError
396 path, node = self._lookup_container(account, container)
397 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
398 if src_version_id is not None:
399 versioning = self._get_policy(node)['versioning']
400 if versioning != 'auto':
401 self.node.version_remove(src_version_id)
404 def get_container_policy(self, user, account, container):
405 """Return a dictionary with the container policy."""
407 logger.debug("get_container_policy: %s %s %s", user, account, container)
409 if container not in self._allowed_containers(user, account):
410 raise NotAllowedError
412 path, node = self._lookup_container(account, container)
413 return self._get_policy(node)
416 def update_container_policy(self, user, account, container, policy, replace=False):
417 """Update the policy associated with the container."""
419 logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
421 raise NotAllowedError
422 path, node = self._lookup_container(account, container)
423 self._check_policy(policy)
424 self._put_policy(node, policy, replace)
427 def put_container(self, user, account, container, policy={}):
428 """Create a new container with the given name."""
430 logger.debug("put_container: %s %s %s %s", user, account, container, policy)
432 raise NotAllowedError
434 path, node = self._lookup_container(account, container)
438 raise ContainerExists('Container already exists')
440 self._check_policy(policy)
441 path = '/'.join((account, container))
442 node = self._put_path(user, self._lookup_account(account, True)[1], path)
443 self._put_policy(node, policy, True)
446 def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447 """Delete/purge the container with the given name."""
449 logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
451 raise NotAllowedError
452 path, node = self._lookup_container(account, container)
454 if until is not None:
455 hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
457 self.store.map_delete(h)
458 self.node.node_purge_children(node, until, CLUSTER_DELETED)
459 self._report_size_change(user, account, -size, {'action': 'container purge'})
462 if self._get_statistics(node)[0] > 0:
463 raise ContainerNotEmpty('Container is not empty')
464 hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
466 self.store.map_delete(h)
467 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
468 self.node.node_remove(node)
469 self._report_size_change(user, account, -size, {'action': 'container delete'})
471 def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
472 if user != account and until:
473 raise NotAllowedError
474 if shared and public:
476 shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
479 path, node = self._lookup_container(account, container)
480 shared = self._get_formatted_paths(shared)
481 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
484 objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
486 objects.sort(key=lambda x: x[0])
487 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
488 return objects[start:start + limit]
490 objects = self._list_public_object_properties(user, account, container, prefix, all_props)
491 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
492 return objects[start:start + limit]
494 allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
495 if shared and not allowed:
497 path, node = self._lookup_container(account, container)
498 allowed = self._get_formatted_paths(allowed)
499 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
500 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
501 return objects[start:start + limit]
503 def _list_public_object_properties(self, user, account, container, prefix, all_props):
504 public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
505 paths, nodes = self._lookup_objects(public)
506 path = '/'.join((account, container))
507 cont_prefix = path + '/'
508 paths = [x[len(cont_prefix):] for x in paths]
509 props = self.node.version_lookup_bulk(nodes, all_props=all_props)
510 objects = [(path,) + props for path, props in zip(paths, props)]
513 def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
516 marker = objects[-1] if objects else None
518 l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
520 if not l or len(l) < limit:
524 def _list_object_permissions(self, user, account, container, prefix, shared, public):
526 path = '/'.join((account, container, prefix)).rstrip('/')
528 allowed = self.permissions.access_list_paths(user, path)
530 raise NotAllowedError
534 allowed.update(self.permissions.access_list_shared(path))
536 allowed.update([x[0] for x in self.permissions.public_list(path)])
537 allowed = sorted(allowed)
543 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
544 """Return a list of object (name, version_id) tuples existing under a container."""
546 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
547 return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
550 def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
551 """Return a list of object metadata dicts existing under a container."""
553 logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
554 props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
558 objects.append({'subdir': p[0]})
560 objects.append({'name': p[0],
561 'bytes': p[self.SIZE + 1],
562 'type': p[self.TYPE + 1],
563 'hash': p[self.HASH + 1],
564 'version': p[self.SERIAL + 1],
565 'version_timestamp': p[self.MTIME + 1],
566 'modified': p[self.MTIME + 1] if until is None else None,
567 'modified_by': p[self.MUSER + 1],
568 'uuid': p[self.UUID + 1],
569 'checksum': p[self.CHECKSUM + 1]})
573 def list_object_permissions(self, user, account, container, prefix=''):
574 """Return a list of paths that enforce permissions under a container."""
576 logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
577 return self._list_object_permissions(user, account, container, prefix, True, False)
580 def list_object_public(self, user, account, container, prefix=''):
581 """Return a dict mapping paths to public ids for objects that are public under a container."""
583 logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
585 for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
586 public[path] = p + ULTIMATE_ANSWER
590 def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
591 """Return a dictionary with the object metadata for the domain."""
593 logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
594 self._can_read(user, account, container, name)
595 path, node = self._lookup_object(account, container, name)
596 props = self._get_version(node, version)
598 modified = props[self.MTIME]
601 modified = self._get_version(node)[self.MTIME] # Overall last modification.
602 except NameError: # Object may be deleted.
603 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
604 if del_props is None:
605 raise ItemNotExists('Object does not exist')
606 modified = del_props[self.MTIME]
609 if include_user_defined:
610 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
611 meta.update({'name': name,
612 'bytes': props[self.SIZE],
613 'type': props[self.TYPE],
614 'hash': props[self.HASH],
615 'version': props[self.SERIAL],
616 'version_timestamp': props[self.MTIME],
617 'modified': modified,
618 'modified_by': props[self.MUSER],
619 'uuid': props[self.UUID],
620 'checksum': props[self.CHECKSUM]})
624 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
625 """Update the metadata associated with the object for the domain and return the new version."""
627 logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
628 self._can_write(user, account, container, name)
629 path, node = self._lookup_object(account, container, name)
630 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
631 self._apply_versioning(account, container, src_version_id)
632 return dest_version_id
635 def get_object_permissions(self, user, account, container, name):
636 """Return the action allowed on the object, the path
637 from which the object gets its permissions from,
638 along with a dictionary containing the permissions."""
640 logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
642 permissions_path = self._get_permissions_path(account, container, name)
644 if self.permissions.access_check(permissions_path, self.WRITE, user):
646 elif self.permissions.access_check(permissions_path, self.READ, user):
649 raise NotAllowedError
650 self._lookup_object(account, container, name)
651 return (allowed, permissions_path, self.permissions.access_get(permissions_path))
654 def update_object_permissions(self, user, account, container, name, permissions):
655 """Update the permissions associated with the object."""
657 logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
659 raise NotAllowedError
660 path = self._lookup_object(account, container, name)[0]
661 self._check_permissions(path, permissions)
662 self.permissions.access_set(path, permissions)
663 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
666 def get_object_public(self, user, account, container, name):
667 """Return the public id of the object if applicable."""
669 logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
670 self._can_read(user, account, container, name)
671 path = self._lookup_object(account, container, name)[0]
672 p = self.permissions.public_get(path)
678 def update_object_public(self, user, account, container, name, public):
679 """Update the public status of the object."""
681 logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
682 self._can_write(user, account, container, name)
683 path = self._lookup_object(account, container, name)[0]
685 self.permissions.public_unset(path)
687 self.permissions.public_set(path)
690 def get_object_hashmap(self, user, account, container, name, version=None):
691 """Return the object's size and a list with partial hashes."""
693 logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
694 self._can_read(user, account, container, name)
695 path, node = self._lookup_object(account, container, name)
696 props = self._get_version(node, version)
697 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
698 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
700 def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
701 if permissions is not None and user != account:
702 raise NotAllowedError
703 self._can_write(user, account, container, name)
704 if permissions is not None:
705 path = '/'.join((account, container, name))
706 self._check_permissions(path, permissions)
708 account_path, account_node = self._lookup_account(account, True)
709 container_path, container_node = self._lookup_container(account, container)
710 path, node = self._put_object_node(container_path, container_node, name)
711 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
714 if src_version_id is None:
715 src_version_id = pre_version_id
716 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
719 del_size = self._apply_versioning(account, container, pre_version_id)
720 size_delta = size - del_size
722 account_quota = long(self._get_policy(account_node)['quota'])
723 container_quota = long(self._get_policy(container_node)['quota'])
724 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
725 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
726 # This must be executed in a transaction, so the version is never created if it fails.
728 self._report_size_change(user, account, size_delta, {'action': 'object update'})
730 if permissions is not None:
731 self.permissions.access_set(path, permissions)
732 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
734 self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
735 return dest_version_id
738 def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
739 """Create/update an object with the specified size and partial hashes."""
741 logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
742 if size == 0: # No such thing as an empty hashmap.
743 hashmap = [self.put_block('')]
744 map = HashMap(self.block_size, self.hash_algorithm)
745 map.extend([binascii.unhexlify(x) for x in hashmap])
746 missing = self.store.block_search(map)
749 ie.data = [binascii.hexlify(x) for x in missing]
753 dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
754 self.store.map_put(hash, map)
755 return dest_version_id
758 def update_object_checksum(self, user, account, container, name, version, checksum):
759 """Update an object's checksum."""
761 logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
762 # Update objects with greater version and same hashmap and size (fix metadata updates).
763 self._can_write(user, account, container, name)
764 path, node = self._lookup_object(account, container, name)
765 props = self._get_version(node, version)
766 versions = self.node.node_get_versions(node)
768 if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
769 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
771 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
772 dest_version_ids = []
773 self._can_read(user, src_account, src_container, src_name)
774 path, node = self._lookup_object(src_account, src_container, src_name)
775 # TODO: Will do another fetch of the properties in duplicate version...
776 props = self._get_version(node, src_version) # Check to see if source exists.
777 src_version_id = props[self.SERIAL]
778 hash = props[self.HASH]
779 size = props[self.SIZE]
780 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
781 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
782 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
783 self._delete_object(user, src_account, src_container, src_name)
786 prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
787 src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
788 paths = [elem[0] for elem in src_names]
789 nodes = [elem[2] for elem in src_names]
790 # TODO: Will do another fetch of the properties in duplicate version...
791 props = self._get_versions(nodes) # Check to see if source exists.
793 for prop, path, node in zip(props, paths, nodes):
794 src_version_id = prop[self.SERIAL]
795 hash = prop[self.HASH]
796 vtype = prop[self.TYPE]
797 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
798 vdest_name = path.replace(prefix, dest_prefix, 1)
799 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
800 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
801 self._delete_object(user, src_account, src_container, path)
802 return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
805 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
806 """Copy an object's data and metadata."""
808 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
809 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
810 return dest_version_id
813 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
814 """Move an object's data and metadata."""
816 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
817 if user != src_account:
818 raise NotAllowedError
819 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
820 return dest_version_id
822 def _delete_object(self, user, account, container, name, until=None, delimiter=None):
824 raise NotAllowedError
826 if until is not None:
827 path = '/'.join((account, container, name))
828 node = self.node.node_lookup(path)
833 h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
836 h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
840 self.store.map_delete(h)
841 self.node.node_purge(node, until, CLUSTER_DELETED)
843 props = self._get_version(node)
845 self.permissions.access_clear(path)
846 self._report_size_change(user, account, -size, {'action': 'object purge'})
849 path, node = self._lookup_object(account, container, name)
850 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
851 del_size = self._apply_versioning(account, container, src_version_id)
853 self._report_size_change(user, account, -del_size, {'action': 'object delete'})
854 self._report_object_change(user, account, path, details={'action': 'object delete'})
855 self.permissions.access_clear(path)
858 prefix = name + delimiter if not name.endswith(delimiter) else name
859 src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
862 path = '/'.join((account, container, t[0]))
864 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
865 del_size = self._apply_versioning(account, container, src_version_id)
867 self._report_size_change(user, account, -del_size, {'action': 'object delete'})
868 self._report_object_change(user, account, path, details={'action': 'object delete'})
870 self.permissions.access_clear_bulk(paths)
873 def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
874 """Delete/purge an object."""
876 logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
877 self._delete_object(user, account, container, name, until, delimiter)
880 def list_versions(self, user, account, container, name):
881 """Return a list of all (version, version_timestamp) tuples for an object."""
883 logger.debug("list_versions: %s %s %s %s", user, account, container, name)
884 self._can_read(user, account, container, name)
885 path, node = self._lookup_object(account, container, name)
886 versions = self.node.node_get_versions(node)
887 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
890 def get_uuid(self, user, uuid):
891 """Return the (account, container, name) for the UUID given."""
893 logger.debug("get_uuid: %s %s", user, uuid)
894 info = self.node.latest_uuid(uuid)
898 account, container, name = path.split('/', 2)
899 self._can_read(user, account, container, name)
900 return (account, container, name)
903 def get_public(self, user, public):
904 """Return the (account, container, name) for the public id given."""
906 logger.debug("get_public: %s %s", user, public)
907 if public is None or public < ULTIMATE_ANSWER:
909 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
912 account, container, name = path.split('/', 2)
913 self._can_read(user, account, container, name)
914 return (account, container, name)
916 @backend_method(autocommit=0)
917 def get_block(self, hash):
918 """Return a block's data."""
920 logger.debug("get_block: %s", hash)
921 block = self.store.block_get(binascii.unhexlify(hash))
923 raise ItemNotExists('Block does not exist')
926 @backend_method(autocommit=0)
927 def put_block(self, data):
928 """Store a block and return the hash."""
930 logger.debug("put_block: %s", len(data))
931 return binascii.hexlify(self.store.block_put(data))
933 @backend_method(autocommit=0)
934 def update_block(self, hash, data, offset=0):
935 """Update a known block and return the hash."""
937 logger.debug("update_block: %s %s %s", hash, len(data), offset)
938 if offset == 0 and len(data) == self.block_size:
939 return self.put_block(data)
940 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
941 return binascii.hexlify(h)
945 def _generate_uuid(self):
946 return str(uuidlib.uuid4())
948 def _put_object_node(self, path, parent, name):
949 path = '/'.join((path, name))
950 node = self.node.node_lookup(path)
952 node = self.node.node_create(parent, path)
955 def _put_path(self, user, parent, path):
956 node = self.node.node_create(parent, path)
957 self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
960 def _lookup_account(self, account, create=True):
961 node = self.node.node_lookup(account)
962 if node is None and create:
963 node = self._put_path(account, self.ROOTNODE, account) # User is account.
966 def _lookup_container(self, account, container):
967 path = '/'.join((account, container))
968 node = self.node.node_lookup(path)
970 raise ItemNotExists('Container does not exist')
973 def _lookup_object(self, account, container, name):
974 path = '/'.join((account, container, name))
975 node = self.node.node_lookup(path)
977 raise ItemNotExists('Object does not exist')
980 def _lookup_objects(self, paths):
981 nodes = self.node.node_lookup_bulk(paths)
984 def _get_properties(self, node, until=None):
985 """Return properties until the timestamp given."""
987 before = until if until is not None else inf
988 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
989 if props is None and until is not None:
990 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
992 raise ItemNotExists('Path does not exist')
995 def _get_statistics(self, node, until=None):
996 """Return count, sum of size and latest timestamp of everything under node."""
999 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1001 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1006 def _get_version(self, node, version=None):
1008 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1010 raise ItemNotExists('Object does not exist')
1013 version = int(version)
1015 raise VersionNotExists('Version does not exist')
1016 props = self.node.version_get_properties(version)
1017 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1018 raise VersionNotExists('Version does not exist')
1021 def _get_versions(self, nodes):
1022 return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1024 def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1025 """Create a new version of the node."""
1027 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1028 if props is not None:
1029 src_version_id = props[self.SERIAL]
1030 src_hash = props[self.HASH]
1031 src_size = props[self.SIZE]
1032 src_type = props[self.TYPE]
1033 src_checksum = props[self.CHECKSUM]
1035 src_version_id = None
1040 if size is None: # Set metadata.
1041 hash = src_hash # This way hash can be set to None (account or container).
1045 if checksum is None:
1046 checksum = src_checksum
1047 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1049 if src_node is None:
1050 pre_version_id = src_version_id
1052 pre_version_id = None
1053 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1054 if props is not None:
1055 pre_version_id = props[self.SERIAL]
1056 if pre_version_id is not None:
1057 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1059 dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1060 return pre_version_id, dest_version_id
1062 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1063 if src_version_id is not None:
1064 self.node.attribute_copy(src_version_id, dest_version_id)
1066 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1067 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1069 self.node.attribute_del(dest_version_id, domain)
1070 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1072 def _put_metadata(self, user, node, domain, meta, replace=False):
1073 """Create a new version and store metadata."""
1075 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1076 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1077 return src_version_id, dest_version_id
1079 def _list_limits(self, listing, marker, limit):
1083 start = listing.index(marker) + 1
1086 if not limit or limit > 10000:
1090 def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1091 cont_prefix = path + '/'
1092 prefix = cont_prefix + prefix
1093 start = cont_prefix + marker if marker else None
1094 before = until if until is not None else inf
1095 filterq = keys if domain else []
1098 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1099 objects.extend([(p, None) for p in prefixes] if virtual else [])
1100 objects.sort(key=lambda x: x[0])
1101 objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1104 # Reporting functions.
1106 def _report_size_change(self, user, account, size, details={}):
1107 logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1108 account_node = self._lookup_account(account, True)[1]
1109 total = self._get_statistics(account_node)[1]
1110 details.update({'user': user, 'total': total})
1111 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1113 def _report_object_change(self, user, account, path, details={}):
1114 logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1115 details.update({'user': user})
1116 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1118 def _report_sharing_change(self, user, account, path, details={}):
1119 logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1120 details.update({'user': user})
1121 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1125 def _check_policy(self, policy):
1126 for k in policy.keys():
1128 policy[k] = self.default_policy.get(k)
1129 for k, v in policy.iteritems():
1131 q = int(v) # May raise ValueError.
1134 elif k == 'versioning':
1135 if v not in ['auto', 'none']:
1140 def _put_policy(self, node, policy, replace):
1142 for k, v in self.default_policy.iteritems():
1145 self.node.policy_set(node, policy)
1147 def _get_policy(self, node):
1148 policy = self.default_policy.copy()
1149 policy.update(self.node.policy_get(node))
1152 def _apply_versioning(self, account, container, version_id):
1153 """Delete the provided version if such is the policy.
1154 Return size of object removed.
1157 if version_id is None:
1159 path, node = self._lookup_container(account, container)
1160 versioning = self._get_policy(node)['versioning']
1161 if versioning != 'auto':
1162 hash, size = self.node.version_remove(version_id)
1163 self.store.map_delete(hash)
1167 # Access control functions.
1169 def _check_groups(self, groups):
1170 # raise ValueError('Bad characters in groups')
1173 def _check_permissions(self, path, permissions):
1174 # raise ValueError('Bad characters in permissions')
1177 def _get_formatted_paths(self, paths):
1180 node = self.node.node_lookup(p)
1181 if node is not None:
1182 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1183 if props is not None:
1184 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1185 formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1186 formatted.append((p, self.MATCH_EXACT))
1189 def _get_permissions_path(self, account, container, name):
1190 path = '/'.join((account, container, name))
1191 permission_paths = self.permissions.access_inherit(path)
1192 permission_paths.sort()
1193 permission_paths.reverse()
1194 for p in permission_paths:
1198 if p.count('/') < 2:
1200 node = self.node.node_lookup(p)
1201 if node is not None:
1202 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1203 if props is not None:
1204 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1208 def _can_read(self, user, account, container, name):
1211 path = '/'.join((account, container, name))
1212 if self.permissions.public_get(path) is not None:
1214 path = self._get_permissions_path(account, container, name)
1216 raise NotAllowedError
1217 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1218 raise NotAllowedError
1220 def _can_write(self, user, account, container, name):
1223 path = '/'.join((account, container, name))
1224 path = self._get_permissions_path(account, container, name)
1226 raise NotAllowedError
1227 if not self.permissions.access_check(path, self.WRITE, user):
1228 raise NotAllowedError
1230 def _allowed_accounts(self, user):
1232 for path in self.permissions.access_list_paths(user):
1233 allow.add(path.split('/', 1)[0])
1234 return sorted(allow)
1236 def _allowed_containers(self, user, account):
1238 for path in self.permissions.access_list_paths(user, account):
1239 allow.add(path.split('/', 2)[1])
1240 return sorted(allow)