1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
37 import uuid as uuidlib
41 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43 from pithos.lib.hashmap import HashMap
45 # Default modules and settings.
46 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 DEFAULT_BLOCK_PATH = 'data/'
51 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
58 logger = logging.getLogger(__name__)
61 def backend_method(func=None, autocommit=1):
64 return backend_method(func, autocommit)
69 def fn(self, *args, **kw):
70 self.wrapper.execute()
72 ret = func(self, *args, **kw)
76 self.wrapper.rollback()
81 class ModularBackend(BaseBackend):
84 Uses modules for SQL functions and storage.
87 def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
88 db_module = db_module or DEFAULT_DB_MODULE
89 db_connection = db_connection or DEFAULT_DB_CONNECTION
90 block_module = block_module or DEFAULT_BLOCK_MODULE
91 block_path = block_path or DEFAULT_BLOCK_PATH
93 self.hash_algorithm = 'sha256'
94 self.block_size = 4 * 1024 * 1024 # 4MB
96 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
99 self.db_module = sys.modules[db_module]
100 self.wrapper = self.db_module.DBWrapper(db_connection)
102 params = {'wrapper': self.wrapper}
103 self.permissions = self.db_module.Permissions(**params)
104 for x in ['READ', 'WRITE']:
105 setattr(self, x, getattr(self.db_module, x))
106 self.node = self.db_module.Node(**params)
107 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
108 setattr(self, x, getattr(self.db_module, x))
110 __import__(block_module)
111 self.block_module = sys.modules[block_module]
113 params = {'path': block_path,
114 'block_size': self.block_size,
115 'hash_algorithm': self.hash_algorithm}
116 self.store = self.block_module.Store(**params)
122 def list_accounts(self, user, marker=None, limit=10000):
123 """Return a list of accounts the user can access."""
125 logger.debug("list_accounts: %s %s %s", user, marker, limit)
126 allowed = self._allowed_accounts(user)
127 start, limit = self._list_limits(allowed, marker, limit)
128 return allowed[start:start + limit]
131 def get_account_meta(self, user, account, domain, until=None):
132 """Return a dictionary with the account metadata for the domain."""
134 logger.debug("get_account_meta: %s %s %s", account, domain, until)
135 path, node = self._lookup_account(account, user == account)
137 if until or node is None or account not in self._allowed_accounts(user):
138 raise NotAllowedError
140 props = self._get_properties(node, until)
141 mtime = props[self.MTIME]
145 count, bytes, tstamp = self._get_statistics(node, until)
146 tstamp = max(tstamp, mtime)
150 modified = self._get_statistics(node)[2] # Overall last modification.
151 modified = max(modified, mtime)
154 meta = {'name': account}
157 if props is not None:
158 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
159 if until is not None:
160 meta.update({'until_timestamp': tstamp})
161 meta.update({'name': account, 'count': count, 'bytes': bytes})
162 meta.update({'modified': modified})
166 def update_account_meta(self, user, account, domain, meta, replace=False):
167 """Update the metadata associated with the account for the domain."""
169 logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
171 raise NotAllowedError
172 path, node = self._lookup_account(account, True)
173 self._put_metadata(user, node, domain, meta, replace)
176 def get_account_groups(self, user, account):
177 """Return a dictionary with the user groups defined for this account."""
179 logger.debug("get_account_groups: %s", account)
181 if account not in self._allowed_accounts(user):
182 raise NotAllowedError
184 self._lookup_account(account, True)
185 return self.permissions.group_dict(account)
188 def update_account_groups(self, user, account, groups, replace=False):
189 """Update the groups associated with the account."""
191 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
193 raise NotAllowedError
194 self._lookup_account(account, True)
195 self._check_groups(groups)
197 self.permissions.group_destroy(account)
198 for k, v in groups.iteritems():
199 if not replace: # If not already deleted.
200 self.permissions.group_delete(account, k)
202 self.permissions.group_addmany(account, k, v)
205 def get_account_policy(self, user, account):
206 """Return a dictionary with the account policy."""
208 logger.debug("get_account_policy: %s", account)
210 if account not in self._allowed_accounts(user):
211 raise NotAllowedError
213 path, node = self._lookup_account(account, True)
214 return self._get_policy(node)
217 def update_account_policy(self, user, account, policy, replace=False):
218 """Update the policy associated with the account."""
220 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
222 raise NotAllowedError
223 path, node = self._lookup_account(account, True)
224 self._check_policy(policy)
225 self._put_policy(node, policy, replace)
228 def put_account(self, user, account, policy={}):
229 """Create a new account with the given name."""
231 logger.debug("put_account: %s %s", account, policy)
233 raise NotAllowedError
234 node = self.node.node_lookup(account)
236 raise NameError('Account already exists')
238 self._check_policy(policy)
239 node = self._put_path(user, self.ROOTNODE, account)
240 self._put_policy(node, policy, True)
243 def delete_account(self, user, account):
244 """Delete the account with the given name."""
246 logger.debug("delete_account: %s", account)
248 raise NotAllowedError
249 node = self.node.node_lookup(account)
252 if not self.node.node_remove(node):
253 raise IndexError('Account is not empty')
254 self.permissions.group_destroy(account)
257 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
258 """Return a list of containers existing under an account."""
260 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
262 if until or account not in self._allowed_accounts(user):
263 raise NotAllowedError
264 allowed = self._allowed_containers(user, account)
265 start, limit = self._list_limits(allowed, marker, limit)
266 return allowed[start:start + limit]
268 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
269 allowed = list(set(allowed))
270 start, limit = self._list_limits(allowed, marker, limit)
271 return allowed[start:start + limit]
272 node = self.node.node_lookup(account)
273 return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
276 def get_container_meta(self, user, account, container, domain, until=None):
277 """Return a dictionary with the container metadata for the domain."""
279 logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
281 if until or container not in self._allowed_containers(user, account):
282 raise NotAllowedError
283 path, node = self._lookup_container(account, container)
284 props = self._get_properties(node, until)
285 mtime = props[self.MTIME]
286 count, bytes, tstamp = self._get_statistics(node, until)
287 tstamp = max(tstamp, mtime)
291 modified = self._get_statistics(node)[2] # Overall last modification.
292 modified = max(modified, mtime)
295 meta = {'name': container}
297 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
298 if until is not None:
299 meta.update({'until_timestamp': tstamp})
300 meta.update({'name': container, 'count': count, 'bytes': bytes})
301 meta.update({'modified': modified})
305 def update_container_meta(self, user, account, container, domain, meta, replace=False):
306 """Update the metadata associated with the container for the domain."""
308 logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
310 raise NotAllowedError
311 path, node = self._lookup_container(account, container)
312 self._put_metadata(user, node, domain, meta, replace)
315 def get_container_policy(self, user, account, container):
316 """Return a dictionary with the container policy."""
318 logger.debug("get_container_policy: %s %s", account, container)
320 if container not in self._allowed_containers(user, account):
321 raise NotAllowedError
323 path, node = self._lookup_container(account, container)
324 return self._get_policy(node)
327 def update_container_policy(self, user, account, container, policy, replace=False):
328 """Update the policy associated with the container."""
330 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
332 raise NotAllowedError
333 path, node = self._lookup_container(account, container)
334 self._check_policy(policy)
335 self._put_policy(node, policy, replace)
338 def put_container(self, user, account, container, policy={}):
339 """Create a new container with the given name."""
341 logger.debug("put_container: %s %s %s", account, container, policy)
343 raise NotAllowedError
345 path, node = self._lookup_container(account, container)
349 raise NameError('Container already exists')
351 self._check_policy(policy)
352 path = '/'.join((account, container))
353 node = self._put_path(user, self._lookup_account(account, True)[1], path)
354 self._put_policy(node, policy, True)
357 def delete_container(self, user, account, container, until=None):
358 """Delete/purge the container with the given name."""
360 logger.debug("delete_container: %s %s %s", account, container, until)
362 raise NotAllowedError
363 path, node = self._lookup_container(account, container)
365 if until is not None:
366 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
368 self.store.map_delete(h)
369 self.node.node_purge_children(node, until, CLUSTER_DELETED)
372 if self._get_statistics(node)[0] > 0:
373 raise IndexError('Container is not empty')
374 hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
376 self.store.map_delete(h)
377 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
378 self.node.node_remove(node)
381 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
382 """Return a list of objects existing under a container."""
384 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
388 raise NotAllowedError
389 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
391 raise NotAllowedError
394 allowed = self.permissions.access_list_shared('/'.join((account, container)))
397 path, node = self._lookup_container(account, container)
398 return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
401 def list_object_meta(self, user, account, container, domain, until=None):
402 """Return a list with all the container's object meta keys for the domain."""
404 logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
408 raise NotAllowedError
409 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
411 raise NotAllowedError
412 path, node = self._lookup_container(account, container)
413 before = until if until is not None else inf
414 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
417 def get_object_meta(self, user, account, container, name, domain, version=None):
418 """Return a dictionary with the object metadata for the domain."""
420 logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
421 self._can_read(user, account, container, name)
422 path, node = self._lookup_object(account, container, name)
423 props = self._get_version(node, version)
425 modified = props[self.MTIME]
428 modified = self._get_version(node)[self.MTIME] # Overall last modification.
429 except NameError: # Object may be deleted.
430 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
431 if del_props is None:
432 raise NameError('Object does not exist')
433 modified = del_props[self.MTIME]
435 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
436 meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
437 meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
438 meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
442 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
443 """Update the metadata associated with the object for the domain and return the new version."""
445 logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
446 self._can_write(user, account, container, name)
447 path, node = self._lookup_object(account, container, name)
448 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
449 self._apply_versioning(account, container, src_version_id)
450 return dest_version_id
453 def get_object_permissions(self, user, account, container, name):
454 """Return the action allowed on the object, the path
455 from which the object gets its permissions from,
456 along with a dictionary containing the permissions."""
458 logger.debug("get_object_permissions: %s %s %s", account, container, name)
461 path = '/'.join((account, container, name))
462 if self.permissions.access_check(path, self.WRITE, user):
464 elif self.permissions.access_check(path, self.READ, user):
467 raise NotAllowedError
468 path = self._lookup_object(account, container, name)[0]
469 return (allowed,) + self.permissions.access_inherit(path)
472 def update_object_permissions(self, user, account, container, name, permissions):
473 """Update the permissions associated with the object."""
475 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
477 raise NotAllowedError
478 path = self._lookup_object(account, container, name)[0]
479 self._check_permissions(path, permissions)
480 self.permissions.access_set(path, permissions)
483 def get_object_public(self, user, account, container, name):
484 """Return the public id of the object if applicable."""
486 logger.debug("get_object_public: %s %s %s", account, container, name)
487 self._can_read(user, account, container, name)
488 path = self._lookup_object(account, container, name)[0]
489 p = self.permissions.public_get(path)
495 def update_object_public(self, user, account, container, name, public):
496 """Update the public status of the object."""
498 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
499 self._can_write(user, account, container, name)
500 path = self._lookup_object(account, container, name)[0]
502 self.permissions.public_unset(path)
504 self.permissions.public_set(path)
507 def get_object_hashmap(self, user, account, container, name, version=None):
508 """Return the object's size and a list with partial hashes."""
510 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
511 self._can_read(user, account, container, name)
512 path, node = self._lookup_object(account, container, name)
513 props = self._get_version(node, version)
514 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
515 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
517 def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
518 if permissions is not None and user != account:
519 raise NotAllowedError
520 self._can_write(user, account, container, name)
521 if permissions is not None:
522 path = '/'.join((account, container, name))
523 self._check_permissions(path, permissions)
525 account_path, account_node = self._lookup_account(account, True)
526 container_path, container_node = self._lookup_container(account, container)
527 path, node = self._put_object_node(container_path, container_node, name)
528 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
531 versioning = self._get_policy(container_node)['versioning']
532 if versioning != 'auto':
533 size_delta = size - 0 # TODO: Get previous size.
537 account_quota = long(self._get_policy(account_node)['quota'])
538 container_quota = long(self._get_policy(container_node)['quota'])
539 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
540 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
541 # This must be executed in a transaction, so the version is never created if it fails.
544 if permissions is not None:
545 self.permissions.access_set(path, permissions)
546 self._apply_versioning(account, container, pre_version_id)
547 return pre_version_id, dest_version_id
550 def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
551 """Create/update an object with the specified size and partial hashes."""
553 logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
554 if size == 0: # No such thing as an empty hashmap.
555 hashmap = [self.put_block('')]
556 map = HashMap(self.block_size, self.hash_algorithm)
557 map.extend([binascii.unhexlify(x) for x in hashmap])
558 missing = self.store.block_search(map)
561 ie.data = [binascii.hexlify(x) for x in missing]
565 pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
566 self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
567 self.store.map_put(hash, map)
568 return dest_version_id
570 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
571 self._can_read(user, src_account, src_container, src_name)
572 path, node = self._lookup_object(src_account, src_container, src_name)
573 # TODO: Will do another fetch of the properties in duplicate version...
574 props = self._get_version(node, src_version) # Check to see if source exists.
575 src_version_id = props[self.SERIAL]
576 hash = props[self.HASH]
577 size = props[self.SIZE]
579 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
580 pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
581 self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
582 return dest_version_id
585 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
586 """Copy an object's data and metadata."""
588 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
589 return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
592 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
593 """Move an object's data and metadata."""
595 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
596 if user != src_account:
597 raise NotAllowedError
598 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
599 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
600 self._delete_object(user, src_account, src_container, src_name)
601 return dest_version_id
603 def _delete_object(self, user, account, container, name, until=None):
605 raise NotAllowedError
607 if until is not None:
608 path = '/'.join((account, container, name))
609 node = self.node.node_lookup(path)
612 hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
613 hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
615 self.store.map_delete(h)
616 self.node.node_purge(node, until, CLUSTER_DELETED)
618 props = self._get_version(node)
620 self.permissions.access_clear(path)
623 path, node = self._lookup_object(account, container, name)
624 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
625 self._apply_versioning(account, container, src_version_id)
626 self.permissions.access_clear(path)
629 def delete_object(self, user, account, container, name, until=None):
630 """Delete/purge an object."""
632 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
633 self._delete_object(user, account, container, name, until)
636 def list_versions(self, user, account, container, name):
637 """Return a list of all (version, version_timestamp) tuples for an object."""
639 logger.debug("list_versions: %s %s %s", account, container, name)
640 self._can_read(user, account, container, name)
641 path, node = self._lookup_object(account, container, name)
642 versions = self.node.node_get_versions(node)
643 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
646 def get_uuid(self, user, uuid):
647 """Return the (account, container, name) for the UUID given."""
649 logger.debug("get_uuid: %s", uuid)
650 info = self.node.latest_uuid(uuid)
654 account, container, name = path.split('/', 2)
655 self._can_read(user, account, container, name)
656 return (account, container, name)
659 def get_public(self, user, public):
660 """Return the (account, container, name) for the public id given."""
662 logger.debug("get_public: %s", public)
663 if public is None or public < ULTIMATE_ANSWER:
665 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
668 account, container, name = path.split('/', 2)
669 self._can_read(user, account, container, name)
670 return (account, container, name)
672 @backend_method(autocommit=0)
673 def get_block(self, hash):
674 """Return a block's data."""
676 logger.debug("get_block: %s", hash)
677 block = self.store.block_get(binascii.unhexlify(hash))
679 raise NameError('Block does not exist')
682 @backend_method(autocommit=0)
683 def put_block(self, data):
684 """Store a block and return the hash."""
686 logger.debug("put_block: %s", len(data))
687 return binascii.hexlify(self.store.block_put(data))
689 @backend_method(autocommit=0)
690 def update_block(self, hash, data, offset=0):
691 """Update a known block and return the hash."""
693 logger.debug("update_block: %s %s %s", hash, len(data), offset)
694 if offset == 0 and len(data) == self.block_size:
695 return self.put_block(data)
696 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
697 return binascii.hexlify(h)
701 def _generate_uuid(self):
702 return str(uuidlib.uuid4())
704 def _put_object_node(self, path, parent, name):
705 path = '/'.join((path, name))
706 node = self.node.node_lookup(path)
708 node = self.node.node_create(parent, path)
711 def _put_path(self, user, parent, path):
712 node = self.node.node_create(parent, path)
713 self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
716 def _lookup_account(self, account, create=True):
717 node = self.node.node_lookup(account)
718 if node is None and create:
719 node = self._put_path(account, self.ROOTNODE, account) # User is account.
722 def _lookup_container(self, account, container):
723 path = '/'.join((account, container))
724 node = self.node.node_lookup(path)
726 raise NameError('Container does not exist')
729 def _lookup_object(self, account, container, name):
730 path = '/'.join((account, container, name))
731 node = self.node.node_lookup(path)
733 raise NameError('Object does not exist')
736 def _get_properties(self, node, until=None):
737 """Return properties until the timestamp given."""
739 before = until if until is not None else inf
740 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
741 if props is None and until is not None:
742 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
744 raise NameError('Path does not exist')
747 def _get_statistics(self, node, until=None):
748 """Return count, sum of size and latest timestamp of everything under node."""
751 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
753 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
758 def _get_version(self, node, version=None):
760 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
762 raise NameError('Object does not exist')
765 version = int(version)
767 raise IndexError('Version does not exist')
768 props = self.node.version_get_properties(version)
769 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
770 raise IndexError('Version does not exist')
773 def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
774 """Create a new version of the node."""
776 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
777 if props is not None:
778 src_version_id = props[self.SERIAL]
779 src_hash = props[self.HASH]
780 src_size = props[self.SIZE]
782 src_version_id = None
786 hash = src_hash # This way hash can be set to None.
788 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
791 pre_version_id = src_version_id
793 pre_version_id = None
794 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
795 if props is not None:
796 pre_version_id = props[self.SERIAL]
797 if pre_version_id is not None:
798 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
800 dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
801 return pre_version_id, dest_version_id
803 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
804 if src_version_id is not None:
805 self.node.attribute_copy(src_version_id, dest_version_id)
807 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
808 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
810 self.node.attribute_del(dest_version_id, domain)
811 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
813 def _put_metadata(self, user, node, domain, meta, replace=False):
814 """Create a new version and store metadata."""
816 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
817 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
818 return src_version_id, dest_version_id
820 def _list_limits(self, listing, marker, limit):
824 start = listing.index(marker) + 1
827 if not limit or limit > 10000:
831 def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
832 cont_prefix = path + '/'
833 prefix = cont_prefix + prefix
834 start = cont_prefix + marker if marker else None
835 before = until if until is not None else inf
836 filterq = keys if domain else []
839 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
840 objects.extend([(p, None) for p in prefixes] if virtual else [])
841 objects.sort(key=lambda x: x[0])
842 objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
844 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
845 return objects[start:start + limit]
849 def _check_policy(self, policy):
850 for k in policy.keys():
852 policy[k] = self.default_policy.get(k)
853 for k, v in policy.iteritems():
855 q = int(v) # May raise ValueError.
858 elif k == 'versioning':
859 if v not in ['auto', 'none']:
864 def _put_policy(self, node, policy, replace):
866 for k, v in self.default_policy.iteritems():
869 self.node.policy_set(node, policy)
871 def _get_policy(self, node):
872 policy = self.default_policy.copy()
873 policy.update(self.node.policy_get(node))
876 def _apply_versioning(self, account, container, version_id):
877 if version_id is None:
879 path, node = self._lookup_container(account, container)
880 versioning = self._get_policy(node)['versioning']
881 if versioning != 'auto':
882 hash = self.node.version_remove(version_id)
883 self.store.map_delete(hash)
885 # Access control functions.
887 def _check_groups(self, groups):
888 # raise ValueError('Bad characters in groups')
891 def _check_permissions(self, path, permissions):
892 # raise ValueError('Bad characters in permissions')
894 # Check for existing permissions.
895 paths = self.permissions.access_list(path)
897 ae = AttributeError()
901 def _can_read(self, user, account, container, name):
904 path = '/'.join((account, container, name))
905 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
906 raise NotAllowedError
908 def _can_write(self, user, account, container, name):
911 path = '/'.join((account, container, name))
912 if not self.permissions.access_check(path, self.WRITE, user):
913 raise NotAllowedError
915 def _allowed_accounts(self, user):
917 for path in self.permissions.access_list_paths(user):
918 allow.add(path.split('/', 1)[0])
921 def _allowed_containers(self, user, account):
923 for path in self.permissions.access_list_paths(user, account):
924 allow.add(path.split('/', 2)[1])