1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
44 # Stripped-down version of the HashMap class found in tools.
47 def __init__(self, blocksize, blockhash):
48 super(HashMap, self).__init__()
49 self.blocksize = blocksize
50 self.blockhash = blockhash
52 def _hash_raw(self, v):
53 h = hashlib.new(self.blockhash)
59 return self._hash_raw('')
61 return self.__getitem__(0)
67 h += [('\x00' * len(h[0]))] * (s - len(h))
69 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
92 logger = logging.getLogger(__name__)
95 def backend_method(func=None, autocommit=1):
98 return backend_method(func, autocommit)
103 def fn(self, *args, **kw):
104 self.wrapper.execute()
107 ret = func(self, *args, **kw)
108 for m in self.messages:
110 self.wrapper.commit()
113 self.wrapper.rollback()
118 class ModularBackend(BaseBackend):
119 """A modular backend.
121 Uses modules for SQL functions and storage.
124 def __init__(self, db_module=None, db_connection=None,
125 block_module=None, block_path=None, block_umask=None,
126 queue_module=None, queue_connection=None):
127 db_module = db_module or DEFAULT_DB_MODULE
128 db_connection = db_connection or DEFAULT_DB_CONNECTION
129 block_module = block_module or DEFAULT_BLOCK_MODULE
130 block_path = block_path or DEFAULT_BLOCK_PATH
131 block_umask = block_umask or DEFAULT_BLOCK_UMASK
132 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133 #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135 self.hash_algorithm = 'sha256'
136 self.block_size = 4 * 1024 * 1024 # 4MB
138 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
142 return sys.modules[m]
144 self.db_module = load_module(db_module)
145 self.wrapper = self.db_module.DBWrapper(db_connection)
146 params = {'wrapper': self.wrapper}
147 self.permissions = self.db_module.Permissions(**params)
148 for x in ['READ', 'WRITE']:
149 setattr(self, x, getattr(self.db_module, x))
150 self.node = self.db_module.Node(**params)
151 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152 setattr(self, x, getattr(self.db_module, x))
154 self.block_module = load_module(block_module)
155 params = {'path': block_path,
156 'block_size': self.block_size,
157 'hash_algorithm': self.hash_algorithm,
158 'umask': block_umask}
159 self.store = self.block_module.Store(**params)
161 if queue_module and queue_connection:
162 self.queue_module = load_module(queue_module)
163 params = {'exchange': queue_connection,
164 'client_id': QUEUE_CLIENT_ID}
165 self.queue = self.queue_module.Queue(**params)
168 def send(self, *args):
174 self.queue = NoQueue()
181 def list_accounts(self, user, marker=None, limit=10000):
182 """Return a list of accounts the user can access."""
184 logger.debug("list_accounts: %s %s %s", user, marker, limit)
185 allowed = self._allowed_accounts(user)
186 start, limit = self._list_limits(allowed, marker, limit)
187 return allowed[start:start + limit]
190 def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191 """Return a dictionary with the account metadata for the domain."""
193 logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194 path, node = self._lookup_account(account, user == account)
196 if until or node is None or account not in self._allowed_accounts(user):
197 raise NotAllowedError
199 props = self._get_properties(node, until)
200 mtime = props[self.MTIME]
204 count, bytes, tstamp = self._get_statistics(node, until)
205 tstamp = max(tstamp, mtime)
209 modified = self._get_statistics(node)[2] # Overall last modification.
210 modified = max(modified, mtime)
213 meta = {'name': account}
216 if props is not None and include_user_defined:
217 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218 if until is not None:
219 meta.update({'until_timestamp': tstamp})
220 meta.update({'name': account, 'count': count, 'bytes': bytes})
221 meta.update({'modified': modified})
225 def update_account_meta(self, user, account, domain, meta, replace=False):
226 """Update the metadata associated with the account for the domain."""
228 logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230 raise NotAllowedError
231 path, node = self._lookup_account(account, True)
232 self._put_metadata(user, node, domain, meta, replace)
235 def get_account_groups(self, user, account):
236 """Return a dictionary with the user groups defined for this account."""
238 logger.debug("get_account_groups: %s %s", user, account)
240 if account not in self._allowed_accounts(user):
241 raise NotAllowedError
243 self._lookup_account(account, True)
244 return self.permissions.group_dict(account)
247 def update_account_groups(self, user, account, groups, replace=False):
248 """Update the groups associated with the account."""
250 logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252 raise NotAllowedError
253 self._lookup_account(account, True)
254 self._check_groups(groups)
256 self.permissions.group_destroy(account)
257 for k, v in groups.iteritems():
258 if not replace: # If not already deleted.
259 self.permissions.group_delete(account, k)
261 self.permissions.group_addmany(account, k, v)
264 def get_account_policy(self, user, account):
265 """Return a dictionary with the account policy."""
267 logger.debug("get_account_policy: %s %s", user, account)
269 if account not in self._allowed_accounts(user):
270 raise NotAllowedError
272 path, node = self._lookup_account(account, True)
273 return self._get_policy(node)
276 def update_account_policy(self, user, account, policy, replace=False):
277 """Update the policy associated with the account."""
279 logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281 raise NotAllowedError
282 path, node = self._lookup_account(account, True)
283 self._check_policy(policy)
284 self._put_policy(node, policy, replace)
287 def put_account(self, user, account, policy={}):
288 """Create a new account with the given name."""
290 logger.debug("put_account: %s %s %s", user, account, policy)
292 raise NotAllowedError
293 node = self.node.node_lookup(account)
295 raise NameError('Account already exists')
297 self._check_policy(policy)
298 node = self._put_path(user, self.ROOTNODE, account)
299 self._put_policy(node, policy, True)
302 def delete_account(self, user, account):
303 """Delete the account with the given name."""
305 logger.debug("delete_account: %s %s", user, account)
307 raise NotAllowedError
308 node = self.node.node_lookup(account)
311 if not self.node.node_remove(node):
312 raise IndexError('Account is not empty')
313 self.permissions.group_destroy(account)
316 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317 """Return a list of containers existing under an account."""
319 logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321 if until or account not in self._allowed_accounts(user):
322 raise NotAllowedError
323 allowed = self._allowed_containers(user, account)
324 start, limit = self._list_limits(allowed, marker, limit)
325 return allowed[start:start + limit]
329 allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
331 allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332 allowed = list(set(allowed))
334 start, limit = self._list_limits(allowed, marker, limit)
335 return allowed[start:start + limit]
336 node = self.node.node_lookup(account)
337 return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
340 def list_container_meta(self, user, account, container, domain, until=None):
341 """Return a list with all the container's object meta keys for the domain."""
343 logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
347 raise NotAllowedError
348 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
350 raise NotAllowedError
351 path, node = self._lookup_container(account, container)
352 before = until if until is not None else inf
353 allowed = self._get_formatted_paths(allowed)
354 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357 def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
358 """Return a dictionary with the container metadata for the domain."""
360 logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
362 if until or container not in self._allowed_containers(user, account):
363 raise NotAllowedError
364 path, node = self._lookup_container(account, container)
365 props = self._get_properties(node, until)
366 mtime = props[self.MTIME]
367 count, bytes, tstamp = self._get_statistics(node, until)
368 tstamp = max(tstamp, mtime)
372 modified = self._get_statistics(node)[2] # Overall last modification.
373 modified = max(modified, mtime)
376 meta = {'name': container}
379 if include_user_defined:
380 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
381 if until is not None:
382 meta.update({'until_timestamp': tstamp})
383 meta.update({'name': container, 'count': count, 'bytes': bytes})
384 meta.update({'modified': modified})
388 def update_container_meta(self, user, account, container, domain, meta, replace=False):
389 """Update the metadata associated with the container for the domain."""
391 logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
393 raise NotAllowedError
394 path, node = self._lookup_container(account, container)
395 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
396 if src_version_id is not None:
397 versioning = self._get_policy(node)['versioning']
398 if versioning != 'auto':
399 self.node.version_remove(src_version_id)
402 def get_container_policy(self, user, account, container):
403 """Return a dictionary with the container policy."""
405 logger.debug("get_container_policy: %s %s %s", user, account, container)
407 if container not in self._allowed_containers(user, account):
408 raise NotAllowedError
410 path, node = self._lookup_container(account, container)
411 return self._get_policy(node)
414 def update_container_policy(self, user, account, container, policy, replace=False):
415 """Update the policy associated with the container."""
417 logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
419 raise NotAllowedError
420 path, node = self._lookup_container(account, container)
421 self._check_policy(policy)
422 self._put_policy(node, policy, replace)
425 def put_container(self, user, account, container, policy={}):
426 """Create a new container with the given name."""
428 logger.debug("put_container: %s %s %s %s", user, account, container, policy)
430 raise NotAllowedError
432 path, node = self._lookup_container(account, container)
436 raise NameError('Container already exists')
438 self._check_policy(policy)
439 path = '/'.join((account, container))
440 node = self._put_path(user, self._lookup_account(account, True)[1], path)
441 self._put_policy(node, policy, True)
444 def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
445 """Delete/purge the container with the given name."""
447 logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
449 raise NotAllowedError
450 path, node = self._lookup_container(account, container)
452 if until is not None:
453 hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
455 self.store.map_delete(h)
456 self.node.node_purge_children(node, until, CLUSTER_DELETED)
457 self._report_size_change(user, account, -size, {'action': 'container purge'})
460 if self._get_statistics(node)[0] > 0:
461 raise IndexError('Container is not empty')
462 hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
464 self.store.map_delete(h)
465 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
466 self.node.node_remove(node)
467 self._report_size_change(user, account, -size, {'action': 'container delete'})
469 def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
470 if user != account and until:
471 raise NotAllowedError
472 allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
473 if (shared or public) and not allowed:
475 path, node = self._lookup_container(account, container)
476 allowed = self._get_formatted_paths(allowed)
477 return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
479 def _list_object_permissions(self, user, account, container, prefix, shared, public):
481 path = '/'.join((account, container, prefix)).rstrip('/')
483 allowed = self.permissions.access_list_paths(user, path)
485 raise NotAllowedError
489 allowed.extend(self.permissions.access_list_shared(path))
491 allowed.extend([x[0] for x in self.permissions.public_list(path)])
492 allowed = list(set(allowed))
499 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
500 """Return a list of object (name, version_id) tuples existing under a container."""
502 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
503 return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
506 def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
507 """Return a list of object metadata dicts existing under a container."""
509 logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
510 props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
514 objects.append({'subdir': p[0]})
516 objects.append({'name': p[0],
517 'bytes': p[self.SIZE + 1],
518 'type': p[self.TYPE + 1],
519 'hash': p[self.HASH + 1],
520 'version': p[self.SERIAL + 1],
521 'version_timestamp': p[self.MTIME + 1],
522 'modified': p[self.MTIME + 1] if until is None else None,
523 'modified_by': p[self.MUSER + 1],
524 'uuid': p[self.UUID + 1],
525 'checksum': p[self.CHECKSUM + 1]})
529 def list_object_permissions(self, user, account, container, prefix=''):
530 """Return a list of paths that enforce permissions under a container."""
532 logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
533 return self._list_object_permissions(user, account, container, prefix, True, False)
536 def list_object_public(self, user, account, container, prefix=''):
537 """Return a dict mapping paths to public ids for objects that are public under a container."""
539 logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
541 for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
542 public[path] = p + ULTIMATE_ANSWER
546 def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
547 """Return a dictionary with the object metadata for the domain."""
549 logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
550 self._can_read(user, account, container, name)
551 path, node = self._lookup_object(account, container, name)
552 props = self._get_version(node, version)
554 modified = props[self.MTIME]
557 modified = self._get_version(node)[self.MTIME] # Overall last modification.
558 except NameError: # Object may be deleted.
559 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
560 if del_props is None:
561 raise NameError('Object does not exist')
562 modified = del_props[self.MTIME]
565 if include_user_defined:
566 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
567 meta.update({'name': name,
568 'bytes': props[self.SIZE],
569 'type': props[self.TYPE],
570 'hash': props[self.HASH],
571 'version': props[self.SERIAL],
572 'version_timestamp': props[self.MTIME],
573 'modified': modified,
574 'modified_by': props[self.MUSER],
575 'uuid': props[self.UUID],
576 'checksum': props[self.CHECKSUM]})
580 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
581 """Update the metadata associated with the object for the domain and return the new version."""
583 logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
584 self._can_write(user, account, container, name)
585 path, node = self._lookup_object(account, container, name)
586 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
587 self._apply_versioning(account, container, src_version_id)
588 return dest_version_id
591 def get_object_permissions(self, user, account, container, name):
592 """Return the action allowed on the object, the path
593 from which the object gets its permissions from,
594 along with a dictionary containing the permissions."""
596 logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
598 permissions_path = self._get_permissions_path(account, container, name)
600 if self.permissions.access_check(permissions_path, self.WRITE, user):
602 elif self.permissions.access_check(permissions_path, self.READ, user):
605 raise NotAllowedError
606 self._lookup_object(account, container, name)
607 return (allowed, permissions_path, self.permissions.access_get(permissions_path))
610 def update_object_permissions(self, user, account, container, name, permissions):
611 """Update the permissions associated with the object."""
613 logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
615 raise NotAllowedError
616 path = self._lookup_object(account, container, name)[0]
617 self._check_permissions(path, permissions)
618 self.permissions.access_set(path, permissions)
619 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
622 def get_object_public(self, user, account, container, name):
623 """Return the public id of the object if applicable."""
625 logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
626 self._can_read(user, account, container, name)
627 path = self._lookup_object(account, container, name)[0]
628 p = self.permissions.public_get(path)
634 def update_object_public(self, user, account, container, name, public):
635 """Update the public status of the object."""
637 logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
638 self._can_write(user, account, container, name)
639 path = self._lookup_object(account, container, name)[0]
641 self.permissions.public_unset(path)
643 self.permissions.public_set(path)
646 def get_object_hashmap(self, user, account, container, name, version=None):
647 """Return the object's size and a list with partial hashes."""
649 logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
650 self._can_read(user, account, container, name)
651 path, node = self._lookup_object(account, container, name)
652 props = self._get_version(node, version)
653 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
654 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
656 def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
657 if permissions is not None and user != account:
658 raise NotAllowedError
659 self._can_write(user, account, container, name)
660 if permissions is not None:
661 path = '/'.join((account, container, name))
662 self._check_permissions(path, permissions)
664 account_path, account_node = self._lookup_account(account, True)
665 container_path, container_node = self._lookup_container(account, container)
666 path, node = self._put_object_node(container_path, container_node, name)
667 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
670 if src_version_id is None:
671 src_version_id = pre_version_id
672 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
675 del_size = self._apply_versioning(account, container, pre_version_id)
676 size_delta = size - del_size
678 account_quota = long(self._get_policy(account_node)['quota'])
679 container_quota = long(self._get_policy(container_node)['quota'])
680 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
681 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
682 # This must be executed in a transaction, so the version is never created if it fails.
684 self._report_size_change(user, account, size_delta, {'action': 'object update'})
686 if permissions is not None:
687 self.permissions.access_set(path, permissions)
688 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
690 self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
691 return dest_version_id
694 def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
695 """Create/update an object with the specified size and partial hashes."""
697 logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
698 if size == 0: # No such thing as an empty hashmap.
699 hashmap = [self.put_block('')]
700 map = HashMap(self.block_size, self.hash_algorithm)
701 map.extend([binascii.unhexlify(x) for x in hashmap])
702 missing = self.store.block_search(map)
705 ie.data = [binascii.hexlify(x) for x in missing]
709 dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
710 self.store.map_put(hash, map)
711 return dest_version_id
714 def update_object_checksum(self, user, account, container, name, version, checksum):
715 """Update an object's checksum."""
717 logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
718 # Update objects with greater version and same hashmap and size (fix metadata updates).
719 self._can_write(user, account, container, name)
720 path, node = self._lookup_object(account, container, name)
721 props = self._get_version(node, version)
722 versions = self.node.node_get_versions(node)
724 if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
725 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
727 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
728 self._can_read(user, src_account, src_container, src_name)
729 path, node = self._lookup_object(src_account, src_container, src_name)
730 # TODO: Will do another fetch of the properties in duplicate version...
731 props = self._get_version(node, src_version) # Check to see if source exists.
732 src_version_id = props[self.SERIAL]
733 hash = props[self.HASH]
734 size = props[self.SIZE]
736 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
737 dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
738 return dest_version_id
741 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
742 """Copy an object's data and metadata."""
744 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
745 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
746 return dest_version_id
749 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
750 """Move an object's data and metadata."""
752 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
753 if user != src_account:
754 raise NotAllowedError
755 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
756 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
757 self._delete_object(user, src_account, src_container, src_name)
758 return dest_version_id
760 def _delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
762 raise NotAllowedError
764 if until is not None:
765 path = '/'.join((account, container, name))
766 node = self.node.node_lookup(path)
771 h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
774 h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
778 self.store.map_delete(h)
779 self.node.node_purge(node, until, CLUSTER_DELETED)
781 props = self._get_version(node)
783 self.permissions.access_clear(path)
784 self._report_size_change(user, account, -size, {'action': 'object purge'})
787 path, node = self._lookup_object(account, container, name)
788 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
789 del_size = self._apply_versioning(account, container, src_version_id)
791 self._report_size_change(user, account, -del_size, {'action': 'object delete'})
792 self._report_object_change(user, account, path, details={'action': 'object delete'})
793 self.permissions.access_clear(path)
796 def (self, user, account, container, name, until=None, prefix='', delimiter=None):
797 """Delete/purge an object."""
799 logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
800 self._delete_object(user, account, container, name, until)
803 def list_versions(self, user, account, container, name):
804 """Return a list of all (version, version_timestamp) tuples for an object."""
806 logger.debug("list_versions: %s %s %s %s", user, account, container, name)
807 self._can_read(user, account, container, name)
808 path, node = self._lookup_object(account, container, name)
809 versions = self.node.node_get_versions(node)
810 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
813 def get_uuid(self, user, uuid):
814 """Return the (account, container, name) for the UUID given."""
816 logger.debug("get_uuid: %s %s", user, uuid)
817 info = self.node.latest_uuid(uuid)
821 account, container, name = path.split('/', 2)
822 self._can_read(user, account, container, name)
823 return (account, container, name)
826 def get_public(self, user, public):
827 """Return the (account, container, name) for the public id given."""
829 logger.debug("get_public: %s %s", user, public)
830 if public is None or public < ULTIMATE_ANSWER:
832 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
835 account, container, name = path.split('/', 2)
836 self._can_read(user, account, container, name)
837 return (account, container, name)
839 @backend_method(autocommit=0)
840 def get_block(self, hash):
841 """Return a block's data."""
843 logger.debug("get_block: %s", hash)
844 block = self.store.block_get(binascii.unhexlify(hash))
846 raise NameError('Block does not exist')
849 @backend_method(autocommit=0)
850 def put_block(self, data):
851 """Store a block and return the hash."""
853 logger.debug("put_block: %s", len(data))
854 return binascii.hexlify(self.store.block_put(data))
856 @backend_method(autocommit=0)
857 def update_block(self, hash, data, offset=0):
858 """Update a known block and return the hash."""
860 logger.debug("update_block: %s %s %s", hash, len(data), offset)
861 if offset == 0 and len(data) == self.block_size:
862 return self.put_block(data)
863 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
864 return binascii.hexlify(h)
868 def _generate_uuid(self):
869 return str(uuidlib.uuid4())
871 def _put_object_node(self, path, parent, name):
872 path = '/'.join((path, name))
873 node = self.node.node_lookup(path)
875 node = self.node.node_create(parent, path)
878 def _put_path(self, user, parent, path):
879 node = self.node.node_create(parent, path)
880 self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
883 def _lookup_account(self, account, create=True):
884 node = self.node.node_lookup(account)
885 if node is None and create:
886 node = self._put_path(account, self.ROOTNODE, account) # User is account.
889 def _lookup_container(self, account, container):
890 path = '/'.join((account, container))
891 node = self.node.node_lookup(path)
893 raise NameError('Container does not exist')
896 def _lookup_object(self, account, container, name):
897 path = '/'.join((account, container, name))
898 node = self.node.node_lookup(path)
900 raise NameError('Object does not exist')
903 def _get_properties(self, node, until=None):
904 """Return properties until the timestamp given."""
906 before = until if until is not None else inf
907 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
908 if props is None and until is not None:
909 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
911 raise NameError('Path does not exist')
914 def _get_statistics(self, node, until=None):
915 """Return count, sum of size and latest timestamp of everything under node."""
918 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
920 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
925 def _get_version(self, node, version=None):
927 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
929 raise NameError('Object does not exist')
932 version = int(version)
934 raise IndexError('Version does not exist')
935 props = self.node.version_get_properties(version)
936 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
937 raise IndexError('Version does not exist')
940 def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
941 """Create a new version of the node."""
943 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
944 if props is not None:
945 src_version_id = props[self.SERIAL]
946 src_hash = props[self.HASH]
947 src_size = props[self.SIZE]
948 src_type = props[self.TYPE]
949 src_checksum = props[self.CHECKSUM]
951 src_version_id = None
956 if size is None: # Set metadata.
957 hash = src_hash # This way hash can be set to None (account or container).
962 checksum = src_checksum
963 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
966 pre_version_id = src_version_id
968 pre_version_id = None
969 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
970 if props is not None:
971 pre_version_id = props[self.SERIAL]
972 if pre_version_id is not None:
973 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
975 dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
976 return pre_version_id, dest_version_id
978 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
979 if src_version_id is not None:
980 self.node.attribute_copy(src_version_id, dest_version_id)
982 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
983 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
985 self.node.attribute_del(dest_version_id, domain)
986 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
988 def _put_metadata(self, user, node, domain, meta, replace=False):
989 """Create a new version and store metadata."""
991 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
992 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
993 return src_version_id, dest_version_id
995 def _list_limits(self, listing, marker, limit):
999 start = listing.index(marker) + 1
1002 if not limit or limit > 10000:
1006 def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1007 cont_prefix = path + '/'
1008 prefix = cont_prefix + prefix
1009 start = cont_prefix + marker if marker else None
1010 before = until if until is not None else inf
1011 filterq = keys if domain else []
1014 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1015 objects.extend([(p, None) for p in prefixes] if virtual else [])
1016 objects.sort(key=lambda x: x[0])
1017 objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1019 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1020 return objects[start:start + limit]
1022 # Reporting functions.
1024 def _report_size_change(self, user, account, size, details={}):
1025 logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1026 account_node = self._lookup_account(account, True)[1]
1027 total = self._get_statistics(account_node)[1]
1028 details.update({'user': user, 'total': total})
1029 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1031 def _report_object_change(self, user, account, path, details={}):
1032 logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1033 details.update({'user': user})
1034 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1036 def _report_sharing_change(self, user, account, path, details={}):
1037 logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1038 details.update({'user': user})
1039 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1043 def _check_policy(self, policy):
1044 for k in policy.keys():
1046 policy[k] = self.default_policy.get(k)
1047 for k, v in policy.iteritems():
1049 q = int(v) # May raise ValueError.
1052 elif k == 'versioning':
1053 if v not in ['auto', 'none']:
1058 def _put_policy(self, node, policy, replace):
1060 for k, v in self.default_policy.iteritems():
1063 self.node.policy_set(node, policy)
1065 def _get_policy(self, node):
1066 policy = self.default_policy.copy()
1067 policy.update(self.node.policy_get(node))
1070 def _apply_versioning(self, account, container, version_id):
1071 """Delete the provided version if such is the policy.
1072 Return size of object removed.
1075 if version_id is None:
1077 path, node = self._lookup_container(account, container)
1078 versioning = self._get_policy(node)['versioning']
1079 if versioning != 'auto':
1080 hash, size = self.node.version_remove(version_id)
1081 self.store.map_delete(hash)
1085 # Access control functions.
1087 def _check_groups(self, groups):
1088 # raise ValueError('Bad characters in groups')
1091 def _check_permissions(self, path, permissions):
1092 # raise ValueError('Bad characters in permissions')
1095 def _get_formatted_paths(self, paths):
1098 node = self.node.node_lookup(p)
1099 if node is not None:
1100 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1101 if props is not None:
1102 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1103 formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1104 formatted.append((p, self.MATCH_EXACT))
1107 def _get_permissions_path(self, account, container, name):
1108 path = '/'.join((account, container, name))
1109 permission_paths = self.permissions.access_inherit(path)
1110 permission_paths.sort()
1111 permission_paths.reverse()
1112 for p in permission_paths:
1116 if p.count('/') < 2:
1118 node = self.node.node_lookup(p)
1119 if node is not None:
1120 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1121 if props is not None:
1122 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1126 def _can_read(self, user, account, container, name):
1129 path = '/'.join((account, container, name))
1130 if self.permissions.public_get(path) is not None:
1132 path = self._get_permissions_path(account, container, name)
1134 raise NotAllowedError
1135 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1136 raise NotAllowedError
1138 def _can_write(self, user, account, container, name):
1141 path = '/'.join((account, container, name))
1142 path = self._get_permissions_path(account, container, name)
1144 raise NotAllowedError
1145 if not self.permissions.access_check(path, self.WRITE, user):
1146 raise NotAllowedError
1148 def _allowed_accounts(self, user):
1150 for path in self.permissions.access_list_paths(user):
1151 allow.add(path.split('/', 1)[0])
1152 return sorted(allow)
1154 def _allowed_containers(self, user, account):
1156 for path in self.permissions.access_list_paths(user, account):
1157 allow.add(path.split('/', 2)[1])
1158 return sorted(allow)