1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
44 # Stripped-down version of the HashMap class found in tools.
47 def __init__(self, blocksize, blockhash):
48 super(HashMap, self).__init__()
49 self.blocksize = blocksize
50 self.blockhash = blockhash
52 def _hash_raw(self, v):
53 h = hashlib.new(self.blockhash)
59 return self._hash_raw('')
61 return self.__getitem__(0)
67 h += [('\x00' * len(h[0]))] * (s - len(h))
69 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81 QUEUE_CLIENT_ID = 'pithos'
82 QUEUE_INSTANCE_ID = '1'
84 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
91 logger = logging.getLogger(__name__)
94 def backend_method(func=None, autocommit=1):
97 return backend_method(func, autocommit)
102 def fn(self, *args, **kw):
103 self.wrapper.execute()
106 ret = func(self, *args, **kw)
107 for m in self.messages:
109 self.wrapper.commit()
112 self.wrapper.rollback()
117 class ModularBackend(BaseBackend):
118 """A modular backend.
120 Uses modules for SQL functions and storage.
123 def __init__(self, db_module=None, db_connection=None,
124 block_module=None, block_path=None,
125 queue_module=None, queue_connection=None):
126 db_module = db_module or DEFAULT_DB_MODULE
127 db_connection = db_connection or DEFAULT_DB_CONNECTION
128 block_module = block_module or DEFAULT_BLOCK_MODULE
129 block_path = block_path or DEFAULT_BLOCK_PATH
130 #queue_module = queue_module or DEFAULT_QUEUE_MODULE
131 #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
133 self.hash_algorithm = 'sha256'
134 self.block_size = 4 * 1024 * 1024 # 4MB
136 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140 return sys.modules[m]
142 self.db_module = load_module(db_module)
143 self.wrapper = self.db_module.DBWrapper(db_connection)
144 params = {'wrapper': self.wrapper}
145 self.permissions = self.db_module.Permissions(**params)
146 for x in ['READ', 'WRITE']:
147 setattr(self, x, getattr(self.db_module, x))
148 self.node = self.db_module.Node(**params)
149 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
150 setattr(self, x, getattr(self.db_module, x))
152 self.block_module = load_module(block_module)
153 params = {'path': block_path,
154 'block_size': self.block_size,
155 'hash_algorithm': self.hash_algorithm}
156 self.store = self.block_module.Store(**params)
158 if queue_module and queue_connection:
159 self.queue_module = load_module(queue_module)
160 params = {'exchange': queue_connection,
161 'client_id': QUEUE_CLIENT_ID}
162 self.queue = self.queue_module.Queue(**params)
165 def send(self, *args):
171 self.queue = NoQueue()
178 def list_accounts(self, user, marker=None, limit=10000):
179 """Return a list of accounts the user can access."""
181 logger.debug("list_accounts: %s %s %s", user, marker, limit)
182 allowed = self._allowed_accounts(user)
183 start, limit = self._list_limits(allowed, marker, limit)
184 return allowed[start:start + limit]
187 def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
188 """Return a dictionary with the account metadata for the domain."""
190 logger.debug("get_account_meta: %s %s %s", account, domain, until)
191 path, node = self._lookup_account(account, user == account)
193 if until or node is None or account not in self._allowed_accounts(user):
194 raise NotAllowedError
196 props = self._get_properties(node, until)
197 mtime = props[self.MTIME]
201 count, bytes, tstamp = self._get_statistics(node, until)
202 tstamp = max(tstamp, mtime)
206 modified = self._get_statistics(node)[2] # Overall last modification.
207 modified = max(modified, mtime)
210 meta = {'name': account}
213 if props is not None and include_user_defined:
214 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
215 if until is not None:
216 meta.update({'until_timestamp': tstamp})
217 meta.update({'name': account, 'count': count, 'bytes': bytes})
218 meta.update({'modified': modified})
222 def update_account_meta(self, user, account, domain, meta, replace=False):
223 """Update the metadata associated with the account for the domain."""
225 logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
227 raise NotAllowedError
228 path, node = self._lookup_account(account, True)
229 self._put_metadata(user, node, domain, meta, replace)
232 def get_account_groups(self, user, account):
233 """Return a dictionary with the user groups defined for this account."""
235 logger.debug("get_account_groups: %s", account)
237 if account not in self._allowed_accounts(user):
238 raise NotAllowedError
240 self._lookup_account(account, True)
241 return self.permissions.group_dict(account)
244 def update_account_groups(self, user, account, groups, replace=False):
245 """Update the groups associated with the account."""
247 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
249 raise NotAllowedError
250 self._lookup_account(account, True)
251 self._check_groups(groups)
253 self.permissions.group_destroy(account)
254 for k, v in groups.iteritems():
255 if not replace: # If not already deleted.
256 self.permissions.group_delete(account, k)
258 self.permissions.group_addmany(account, k, v)
261 def get_account_policy(self, user, account):
262 """Return a dictionary with the account policy."""
264 logger.debug("get_account_policy: %s", account)
266 if account not in self._allowed_accounts(user):
267 raise NotAllowedError
269 path, node = self._lookup_account(account, True)
270 return self._get_policy(node)
273 def update_account_policy(self, user, account, policy, replace=False):
274 """Update the policy associated with the account."""
276 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
278 raise NotAllowedError
279 path, node = self._lookup_account(account, True)
280 self._check_policy(policy)
281 self._put_policy(node, policy, replace)
284 def put_account(self, user, account, policy={}):
285 """Create a new account with the given name."""
287 logger.debug("put_account: %s %s", account, policy)
289 raise NotAllowedError
290 node = self.node.node_lookup(account)
292 raise NameError('Account already exists')
294 self._check_policy(policy)
295 node = self._put_path(user, self.ROOTNODE, account)
296 self._put_policy(node, policy, True)
299 def delete_account(self, user, account):
300 """Delete the account with the given name."""
302 logger.debug("delete_account: %s", account)
304 raise NotAllowedError
305 node = self.node.node_lookup(account)
308 if not self.node.node_remove(node):
309 raise IndexError('Account is not empty')
310 self.permissions.group_destroy(account)
313 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
314 """Return a list of containers existing under an account."""
316 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
318 if until or account not in self._allowed_accounts(user):
319 raise NotAllowedError
320 allowed = self._allowed_containers(user, account)
321 start, limit = self._list_limits(allowed, marker, limit)
322 return allowed[start:start + limit]
324 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
325 allowed = list(set(allowed))
326 start, limit = self._list_limits(allowed, marker, limit)
327 return allowed[start:start + limit]
328 node = self.node.node_lookup(account)
329 return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
332 def list_container_meta(self, user, account, container, domain, until=None):
333 """Return a list with all the container's object meta keys for the domain."""
335 logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
339 raise NotAllowedError
340 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
342 raise NotAllowedError
343 path, node = self._lookup_container(account, container)
344 before = until if until is not None else inf
345 allowed = self._get_formatted_paths(allowed)
346 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
349 def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
350 """Return a dictionary with the container metadata for the domain."""
352 logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
354 if until or container not in self._allowed_containers(user, account):
355 raise NotAllowedError
356 path, node = self._lookup_container(account, container)
357 props = self._get_properties(node, until)
358 mtime = props[self.MTIME]
359 count, bytes, tstamp = self._get_statistics(node, until)
360 tstamp = max(tstamp, mtime)
364 modified = self._get_statistics(node)[2] # Overall last modification.
365 modified = max(modified, mtime)
368 meta = {'name': container}
371 if include_user_defined:
372 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
373 if until is not None:
374 meta.update({'until_timestamp': tstamp})
375 meta.update({'name': container, 'count': count, 'bytes': bytes})
376 meta.update({'modified': modified})
380 def update_container_meta(self, user, account, container, domain, meta, replace=False):
381 """Update the metadata associated with the container for the domain."""
383 logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
385 raise NotAllowedError
386 path, node = self._lookup_container(account, container)
387 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
388 if src_version_id is not None:
389 versioning = self._get_policy(node)['versioning']
390 if versioning != 'auto':
391 self.node.version_remove(src_version_id)
394 def get_container_policy(self, user, account, container):
395 """Return a dictionary with the container policy."""
397 logger.debug("get_container_policy: %s %s", account, container)
399 if container not in self._allowed_containers(user, account):
400 raise NotAllowedError
402 path, node = self._lookup_container(account, container)
403 return self._get_policy(node)
406 def update_container_policy(self, user, account, container, policy, replace=False):
407 """Update the policy associated with the container."""
409 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
411 raise NotAllowedError
412 path, node = self._lookup_container(account, container)
413 self._check_policy(policy)
414 self._put_policy(node, policy, replace)
417 def put_container(self, user, account, container, policy={}):
418 """Create a new container with the given name."""
420 logger.debug("put_container: %s %s %s", account, container, policy)
422 raise NotAllowedError
424 path, node = self._lookup_container(account, container)
428 raise NameError('Container already exists')
430 self._check_policy(policy)
431 path = '/'.join((account, container))
432 node = self._put_path(user, self._lookup_account(account, True)[1], path)
433 self._put_policy(node, policy, True)
436 def delete_container(self, user, account, container, until=None):
437 """Delete/purge the container with the given name."""
439 logger.debug("delete_container: %s %s %s", account, container, until)
441 raise NotAllowedError
442 path, node = self._lookup_container(account, container)
444 if until is not None:
445 hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
447 self.store.map_delete(h)
448 self.node.node_purge_children(node, until, CLUSTER_DELETED)
449 self._report_size_change(user, account, -size, {'action': 'container purge'})
452 if self._get_statistics(node)[0] > 0:
453 raise IndexError('Container is not empty')
454 hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
456 self.store.map_delete(h)
457 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
458 self.node.node_remove(node)
459 self._report_size_change(user, account, -size, {'action': 'container delete'})
461 def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
462 if user != account and until:
463 raise NotAllowedError
464 allowed = self._list_object_permissions(user, account, container, prefix, shared)
465 if shared and not allowed:
467 path, node = self._lookup_container(account, container)
468 allowed = self._get_formatted_paths(allowed)
469 return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
471 def _list_object_permissions(self, user, account, container, prefix, shared):
473 path = '/'.join((account, container, prefix)).rstrip('/')
475 allowed = self.permissions.access_list_paths(user, path)
477 raise NotAllowedError
480 allowed = self.permissions.access_list_shared(path)
486 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487 """Return a list of object (name, version_id) tuples existing under a container."""
489 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490 return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
493 def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
494 """Return a list of object metadata dicts existing under a container."""
496 logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
497 props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
501 objects.append({'subdir': p[0]})
503 objects.append({'name': p[0],
504 'bytes': p[self.SIZE + 1],
505 'type': p[self.TYPE + 1],
506 'hash': p[self.HASH + 1],
507 'version': p[self.SERIAL + 1],
508 'version_timestamp': p[self.MTIME + 1],
509 'modified': p[self.MTIME + 1] if until is None else None,
510 'modified_by': p[self.MUSER + 1],
511 'uuid': p[self.UUID + 1],
512 'checksum': p[self.CHECKSUM + 1]})
516 def list_object_permissions(self, user, account, container, prefix=''):
517 """Return a list of paths that enforce permissions under a container."""
519 logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
520 return self._list_object_permissions(user, account, container, prefix, True)
523 def list_object_public(self, user, account, container, prefix=''):
524 """Return a dict mapping paths to public ids for objects that are public under a container."""
526 logger.debug("list_object_public: %s %s %s", account, container, prefix)
528 for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
529 public[path] = p + ULTIMATE_ANSWER
533 def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
534 """Return a dictionary with the object metadata for the domain."""
536 logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
537 self._can_read(user, account, container, name)
538 path, node = self._lookup_object(account, container, name)
539 props = self._get_version(node, version)
541 modified = props[self.MTIME]
544 modified = self._get_version(node)[self.MTIME] # Overall last modification.
545 except NameError: # Object may be deleted.
546 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
547 if del_props is None:
548 raise NameError('Object does not exist')
549 modified = del_props[self.MTIME]
552 if include_user_defined:
553 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
554 meta.update({'name': name,
555 'bytes': props[self.SIZE],
556 'type': props[self.TYPE],
557 'hash': props[self.HASH],
558 'version': props[self.SERIAL],
559 'version_timestamp': props[self.MTIME],
560 'modified': modified,
561 'modified_by': props[self.MUSER],
562 'uuid': props[self.UUID],
563 'checksum': props[self.CHECKSUM]})
567 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
568 """Update the metadata associated with the object for the domain and return the new version."""
570 logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
571 self._can_write(user, account, container, name)
572 path, node = self._lookup_object(account, container, name)
573 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
574 self._apply_versioning(account, container, src_version_id)
575 return dest_version_id
578 def get_object_permissions(self, user, account, container, name):
579 """Return the action allowed on the object, the path
580 from which the object gets its permissions from,
581 along with a dictionary containing the permissions."""
583 logger.debug("get_object_permissions: %s %s %s", account, container, name)
585 permissions_path = self._get_permissions_path(account, container, name)
587 if self.permissions.access_check(permissions_path, self.WRITE, user):
589 elif self.permissions.access_check(permissions_path, self.READ, user):
592 raise NotAllowedError
593 self._lookup_object(account, container, name)
594 return (allowed, permissions_path, self.permissions.access_get(permissions_path))
597 def update_object_permissions(self, user, account, container, name, permissions):
598 """Update the permissions associated with the object."""
600 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
602 raise NotAllowedError
603 path = self._lookup_object(account, container, name)[0]
604 self._check_permissions(path, permissions)
605 self.permissions.access_set(path, permissions)
606 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
609 def get_object_public(self, user, account, container, name):
610 """Return the public id of the object if applicable."""
612 logger.debug("get_object_public: %s %s %s", account, container, name)
613 self._can_read(user, account, container, name)
614 path = self._lookup_object(account, container, name)[0]
615 p = self.permissions.public_get(path)
621 def update_object_public(self, user, account, container, name, public):
622 """Update the public status of the object."""
624 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
625 self._can_write(user, account, container, name)
626 path = self._lookup_object(account, container, name)[0]
628 self.permissions.public_unset(path)
630 self.permissions.public_set(path)
633 def get_object_hashmap(self, user, account, container, name, version=None):
634 """Return the object's size and a list with partial hashes."""
636 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
637 self._can_read(user, account, container, name)
638 path, node = self._lookup_object(account, container, name)
639 props = self._get_version(node, version)
640 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
641 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
643 def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
644 if permissions is not None and user != account:
645 raise NotAllowedError
646 self._can_write(user, account, container, name)
647 if permissions is not None:
648 path = '/'.join((account, container, name))
649 self._check_permissions(path, permissions)
651 account_path, account_node = self._lookup_account(account, True)
652 container_path, container_node = self._lookup_container(account, container)
653 path, node = self._put_object_node(container_path, container_node, name)
654 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
657 if src_version_id is None:
658 src_version_id = pre_version_id
659 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
662 del_size = self._apply_versioning(account, container, pre_version_id)
663 size_delta = size - del_size
665 account_quota = long(self._get_policy(account_node)['quota'])
666 container_quota = long(self._get_policy(container_node)['quota'])
667 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
668 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
669 # This must be executed in a transaction, so the version is never created if it fails.
671 self._report_size_change(user, account, size_delta, {'action': 'object update'})
673 if permissions is not None:
674 self.permissions.access_set(path, permissions)
675 self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
677 self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
678 return dest_version_id
681 def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
682 """Create/update an object with the specified size and partial hashes."""
684 logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
685 if size == 0: # No such thing as an empty hashmap.
686 hashmap = [self.put_block('')]
687 map = HashMap(self.block_size, self.hash_algorithm)
688 map.extend([binascii.unhexlify(x) for x in hashmap])
689 missing = self.store.block_search(map)
692 ie.data = [binascii.hexlify(x) for x in missing]
696 dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
697 self.store.map_put(hash, map)
698 return dest_version_id
701 def update_object_checksum(self, user, account, container, name, version, checksum):
702 """Update an object's checksum."""
704 logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
705 # Update objects with greater version and same hashmap and size (fix metadata updates).
706 self._can_write(user, account, container, name)
707 path, node = self._lookup_object(account, container, name)
708 props = self._get_version(node, version)
709 versions = self.node.node_get_versions(node)
711 if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
712 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
714 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
715 self._can_read(user, src_account, src_container, src_name)
716 path, node = self._lookup_object(src_account, src_container, src_name)
717 # TODO: Will do another fetch of the properties in duplicate version...
718 props = self._get_version(node, src_version) # Check to see if source exists.
719 src_version_id = props[self.SERIAL]
720 hash = props[self.HASH]
721 size = props[self.SIZE]
723 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
724 dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
725 return dest_version_id
728 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
729 """Copy an object's data and metadata."""
731 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
732 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
733 return dest_version_id
736 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
737 """Move an object's data and metadata."""
739 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
740 if user != src_account:
741 raise NotAllowedError
742 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
743 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
744 self._delete_object(user, src_account, src_container, src_name)
745 return dest_version_id
747 def _delete_object(self, user, account, container, name, until=None):
749 raise NotAllowedError
751 if until is not None:
752 path = '/'.join((account, container, name))
753 node = self.node.node_lookup(path)
758 h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
761 h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
765 self.store.map_delete(h)
766 self.node.node_purge(node, until, CLUSTER_DELETED)
768 props = self._get_version(node)
770 self.permissions.access_clear(path)
771 self._report_size_change(user, account, -size, {'action': 'object purge'})
774 path, node = self._lookup_object(account, container, name)
775 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
776 del_size = self._apply_versioning(account, container, src_version_id)
778 self._report_size_change(user, account, -del_size, {'action': 'object delete'})
779 self._report_object_change(user, account, path, details={'action': 'object delete'})
780 self.permissions.access_clear(path)
783 def delete_object(self, user, account, container, name, until=None):
784 """Delete/purge an object."""
786 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
787 self._delete_object(user, account, container, name, until)
790 def list_versions(self, user, account, container, name):
791 """Return a list of all (version, version_timestamp) tuples for an object."""
793 logger.debug("list_versions: %s %s %s", account, container, name)
794 self._can_read(user, account, container, name)
795 path, node = self._lookup_object(account, container, name)
796 versions = self.node.node_get_versions(node)
797 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
800 def get_uuid(self, user, uuid):
801 """Return the (account, container, name) for the UUID given."""
803 logger.debug("get_uuid: %s", uuid)
804 info = self.node.latest_uuid(uuid)
808 account, container, name = path.split('/', 2)
809 self._can_read(user, account, container, name)
810 return (account, container, name)
813 def get_public(self, user, public):
814 """Return the (account, container, name) for the public id given."""
816 logger.debug("get_public: %s", public)
817 if public is None or public < ULTIMATE_ANSWER:
819 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
822 account, container, name = path.split('/', 2)
823 self._can_read(user, account, container, name)
824 return (account, container, name)
826 @backend_method(autocommit=0)
827 def get_block(self, hash):
828 """Return a block's data."""
830 logger.debug("get_block: %s", hash)
831 block = self.store.block_get(binascii.unhexlify(hash))
833 raise NameError('Block does not exist')
836 @backend_method(autocommit=0)
837 def put_block(self, data):
838 """Store a block and return the hash."""
840 logger.debug("put_block: %s", len(data))
841 return binascii.hexlify(self.store.block_put(data))
843 @backend_method(autocommit=0)
844 def update_block(self, hash, data, offset=0):
845 """Update a known block and return the hash."""
847 logger.debug("update_block: %s %s %s", hash, len(data), offset)
848 if offset == 0 and len(data) == self.block_size:
849 return self.put_block(data)
850 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
851 return binascii.hexlify(h)
855 def _generate_uuid(self):
856 return str(uuidlib.uuid4())
858 def _put_object_node(self, path, parent, name):
859 path = '/'.join((path, name))
860 node = self.node.node_lookup(path)
862 node = self.node.node_create(parent, path)
865 def _put_path(self, user, parent, path):
866 node = self.node.node_create(parent, path)
867 self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
870 def _lookup_account(self, account, create=True):
871 node = self.node.node_lookup(account)
872 if node is None and create:
873 node = self._put_path(account, self.ROOTNODE, account) # User is account.
876 def _lookup_container(self, account, container):
877 path = '/'.join((account, container))
878 node = self.node.node_lookup(path)
880 raise NameError('Container does not exist')
883 def _lookup_object(self, account, container, name):
884 path = '/'.join((account, container, name))
885 node = self.node.node_lookup(path)
887 raise NameError('Object does not exist')
890 def _get_properties(self, node, until=None):
891 """Return properties until the timestamp given."""
893 before = until if until is not None else inf
894 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
895 if props is None and until is not None:
896 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
898 raise NameError('Path does not exist')
901 def _get_statistics(self, node, until=None):
902 """Return count, sum of size and latest timestamp of everything under node."""
905 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
907 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
912 def _get_version(self, node, version=None):
914 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
916 raise NameError('Object does not exist')
919 version = int(version)
921 raise IndexError('Version does not exist')
922 props = self.node.version_get_properties(version)
923 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
924 raise IndexError('Version does not exist')
927 def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
928 """Create a new version of the node."""
930 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
931 if props is not None:
932 src_version_id = props[self.SERIAL]
933 src_hash = props[self.HASH]
934 src_size = props[self.SIZE]
935 src_type = props[self.TYPE]
936 src_checksum = props[self.CHECKSUM]
938 src_version_id = None
943 if size is None: # Set metadata.
944 hash = src_hash # This way hash can be set to None (account or container).
949 checksum = src_checksum
950 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
953 pre_version_id = src_version_id
955 pre_version_id = None
956 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
957 if props is not None:
958 pre_version_id = props[self.SERIAL]
959 if pre_version_id is not None:
960 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
962 dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
963 return pre_version_id, dest_version_id
965 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
966 if src_version_id is not None:
967 self.node.attribute_copy(src_version_id, dest_version_id)
969 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
970 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
972 self.node.attribute_del(dest_version_id, domain)
973 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
975 def _put_metadata(self, user, node, domain, meta, replace=False):
976 """Create a new version and store metadata."""
978 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
979 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
980 return src_version_id, dest_version_id
982 def _list_limits(self, listing, marker, limit):
986 start = listing.index(marker) + 1
989 if not limit or limit > 10000:
993 def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
994 cont_prefix = path + '/'
995 prefix = cont_prefix + prefix
996 start = cont_prefix + marker if marker else None
997 before = until if until is not None else inf
998 filterq = keys if domain else []
1001 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1002 objects.extend([(p, None) for p in prefixes] if virtual else [])
1003 objects.sort(key=lambda x: x[0])
1004 objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1006 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1007 return objects[start:start + limit]
1009 # Reporting functions.
1011 def _report_size_change(self, user, account, size, details={}):
1012 logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1013 account_node = self._lookup_account(account, True)[1]
1014 total = self._get_statistics(account_node)[1]
1015 details.update({'user': user, 'total': total})
1016 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1018 def _report_object_change(self, user, account, path, details={}):
1019 logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1020 details.update({'user': user})
1021 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1023 def _report_sharing_change(self, user, account, path, details={}):
1024 logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1025 details.update({'user': user})
1026 self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1030 def _check_policy(self, policy):
1031 for k in policy.keys():
1033 policy[k] = self.default_policy.get(k)
1034 for k, v in policy.iteritems():
1036 q = int(v) # May raise ValueError.
1039 elif k == 'versioning':
1040 if v not in ['auto', 'none']:
1045 def _put_policy(self, node, policy, replace):
1047 for k, v in self.default_policy.iteritems():
1050 self.node.policy_set(node, policy)
1052 def _get_policy(self, node):
1053 policy = self.default_policy.copy()
1054 policy.update(self.node.policy_get(node))
1057 def _apply_versioning(self, account, container, version_id):
1058 """Delete the provided version if such is the policy.
1059 Return size of object removed.
1062 if version_id is None:
1064 path, node = self._lookup_container(account, container)
1065 versioning = self._get_policy(node)['versioning']
1066 if versioning != 'auto':
1067 hash, size = self.node.version_remove(version_id)
1068 self.store.map_delete(hash)
1072 # Access control functions.
1074 def _check_groups(self, groups):
1075 # raise ValueError('Bad characters in groups')
1078 def _check_permissions(self, path, permissions):
1079 # raise ValueError('Bad characters in permissions')
1082 def _get_formatted_paths(self, paths):
1085 node = self.node.node_lookup(p)
1086 if node is not None:
1087 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1088 if props is not None:
1089 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1090 formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1091 formatted.append((p, self.MATCH_EXACT))
1094 def _get_permissions_path(self, account, container, name):
1095 path = '/'.join((account, container, name))
1096 permission_paths = self.permissions.access_inherit(path)
1097 permission_paths.sort()
1098 permission_paths.reverse()
1099 for p in permission_paths:
1103 if p.count('/') < 2:
1105 node = self.node.node_lookup(p)
1106 if node is not None:
1107 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1108 if props is not None:
1109 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1113 def _can_read(self, user, account, container, name):
1116 path = '/'.join((account, container, name))
1117 if self.permissions.public_get(path) is not None:
1119 path = self._get_permissions_path(account, container, name)
1121 raise NotAllowedError
1122 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1123 raise NotAllowedError
1125 def _can_write(self, user, account, container, name):
1128 path = '/'.join((account, container, name))
1129 path = self._get_permissions_path(account, container, name)
1131 raise NotAllowedError
1132 if not self.permissions.access_check(path, self.WRITE, user):
1133 raise NotAllowedError
1135 def _allowed_accounts(self, user):
1137 for path in self.permissions.access_list_paths(user):
1138 allow.add(path.split('/', 1)[0])
1139 return sorted(allow)
1141 def _allowed_containers(self, user, account):
1143 for path in self.permissions.access_list_paths(user, account):
1144 allow.add(path.split('/', 2)[1])
1145 return sorted(allow)