1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following
12 # disclaimer in the documentation and/or other materials
13 # provided with the distribution.
15 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
16 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
19 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
22 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
23 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
25 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
28 # The views and conclusions contained in the software and
29 # documentation are those of the authors and should not be
30 # interpreted as representing official policies, either expressed
31 # or implied, of GRNET S.A.
36 import uuid as uuidlib
40 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42 from pithos.lib.hashmap import HashMap
44 # Default modules and settings.
45 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
46 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
47 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
48 DEFAULT_BLOCK_PATH = 'data/'
50 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
57 logger = logging.getLogger(__name__)
60 def backend_method(func=None, autocommit=1):
63 return backend_method(func, autocommit)
68 def fn(self, *args, **kw):
69 self.wrapper.execute()
71 ret = func(self, *args, **kw)
75 self.wrapper.rollback()
80 class ModularBackend(BaseBackend):
83 Uses modules for SQL functions and storage.
86 def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
87 db_module = db_module or DEFAULT_DB_MODULE
88 db_connection = db_connection or DEFAULT_DB_CONNECTION
89 block_module = block_module or DEFAULT_BLOCK_MODULE
90 block_path = block_path or DEFAULT_BLOCK_PATH
92 self.hash_algorithm = 'sha256'
93 self.block_size = 4 * 1024 * 1024 # 4MB
95 self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
98 self.db_module = sys.modules[db_module]
99 self.wrapper = self.db_module.DBWrapper(db_connection)
101 params = {'wrapper': self.wrapper}
102 self.permissions = self.db_module.Permissions(**params)
103 for x in ['READ', 'WRITE']:
104 setattr(self, x, getattr(self.db_module, x))
105 self.node = self.db_module.Node(**params)
106 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
107 setattr(self, x, getattr(self.db_module, x))
109 __import__(block_module)
110 self.block_module = sys.modules[block_module]
112 params = {'path': block_path,
113 'block_size': self.block_size,
114 'hash_algorithm': self.hash_algorithm}
115 self.store = self.block_module.Store(**params)
121 def list_accounts(self, user, marker=None, limit=10000):
122 """Return a list of accounts the user can access."""
124 logger.debug("list_accounts: %s %s %s", user, marker, limit)
125 allowed = self._allowed_accounts(user)
126 start, limit = self._list_limits(allowed, marker, limit)
127 return allowed[start:start + limit]
130 def get_account_meta(self, user, account, domain, until=None):
131 """Return a dictionary with the account metadata for the domain."""
133 logger.debug("get_account_meta: %s %s %s", account, domain, until)
134 path, node = self._lookup_account(account, user == account)
136 if until or node is None or account not in self._allowed_accounts(user):
137 raise NotAllowedError
139 props = self._get_properties(node, until)
140 mtime = props[self.MTIME]
144 count, bytes, tstamp = self._get_statistics(node, until)
145 tstamp = max(tstamp, mtime)
149 modified = self._get_statistics(node)[2] # Overall last modification.
150 modified = max(modified, mtime)
153 meta = {'name': account}
156 if props is not None:
157 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
158 if until is not None:
159 meta.update({'until_timestamp': tstamp})
160 meta.update({'name': account, 'count': count, 'bytes': bytes})
161 meta.update({'modified': modified})
165 def update_account_meta(self, user, account, domain, meta, replace=False):
166 """Update the metadata associated with the account for the domain."""
168 logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
170 raise NotAllowedError
171 path, node = self._lookup_account(account, True)
172 self._put_metadata(user, node, domain, meta, replace)
175 def get_account_groups(self, user, account):
176 """Return a dictionary with the user groups defined for this account."""
178 logger.debug("get_account_groups: %s", account)
180 if account not in self._allowed_accounts(user):
181 raise NotAllowedError
183 self._lookup_account(account, True)
184 return self.permissions.group_dict(account)
187 def update_account_groups(self, user, account, groups, replace=False):
188 """Update the groups associated with the account."""
190 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
192 raise NotAllowedError
193 self._lookup_account(account, True)
194 self._check_groups(groups)
196 self.permissions.group_destroy(account)
197 for k, v in groups.iteritems():
198 if not replace: # If not already deleted.
199 self.permissions.group_delete(account, k)
201 self.permissions.group_addmany(account, k, v)
204 def get_account_policy(self, user, account):
205 """Return a dictionary with the account policy."""
207 logger.debug("get_account_policy: %s", account)
209 if account not in self._allowed_accounts(user):
210 raise NotAllowedError
212 path, node = self._lookup_account(account, True)
213 return self._get_policy(node)
216 def update_account_policy(self, user, account, policy, replace=False):
217 """Update the policy associated with the account."""
219 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
221 raise NotAllowedError
222 path, node = self._lookup_account(account, True)
223 self._check_policy(policy)
224 self._put_policy(node, policy, replace)
227 def put_account(self, user, account, policy={}):
228 """Create a new account with the given name."""
230 logger.debug("put_account: %s %s", account, policy)
232 raise NotAllowedError
233 node = self.node.node_lookup(account)
235 raise NameError('Account already exists')
237 self._check_policy(policy)
238 node = self._put_path(user, self.ROOTNODE, account)
239 self._put_policy(node, policy, True)
242 def delete_account(self, user, account):
243 """Delete the account with the given name."""
245 logger.debug("delete_account: %s", account)
247 raise NotAllowedError
248 node = self.node.node_lookup(account)
251 if not self.node.node_remove(node):
252 raise IndexError('Account is not empty')
253 self.permissions.group_destroy(account)
256 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
257 """Return a list of containers existing under an account."""
259 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
261 if until or account not in self._allowed_accounts(user):
262 raise NotAllowedError
263 allowed = self._allowed_containers(user, account)
264 start, limit = self._list_limits(allowed, marker, limit)
265 return allowed[start:start + limit]
267 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
268 allowed = list(set(allowed))
269 start, limit = self._list_limits(allowed, marker, limit)
270 return allowed[start:start + limit]
271 node = self.node.node_lookup(account)
272 return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
275 def get_container_meta(self, user, account, container, domain, until=None):
276 """Return a dictionary with the container metadata for the domain."""
278 logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
280 if until or container not in self._allowed_containers(user, account):
281 raise NotAllowedError
282 path, node = self._lookup_container(account, container)
283 props = self._get_properties(node, until)
284 mtime = props[self.MTIME]
285 count, bytes, tstamp = self._get_statistics(node, until)
286 tstamp = max(tstamp, mtime)
290 modified = self._get_statistics(node)[2] # Overall last modification.
291 modified = max(modified, mtime)
294 meta = {'name': container}
296 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
297 if until is not None:
298 meta.update({'until_timestamp': tstamp})
299 meta.update({'name': container, 'count': count, 'bytes': bytes})
300 meta.update({'modified': modified})
304 def update_container_meta(self, user, account, container, domain, meta, replace=False):
305 """Update the metadata associated with the container for the domain."""
307 logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
309 raise NotAllowedError
310 path, node = self._lookup_container(account, container)
311 self._put_metadata(user, node, domain, meta, replace)
314 def get_container_policy(self, user, account, container):
315 """Return a dictionary with the container policy."""
317 logger.debug("get_container_policy: %s %s", account, container)
319 if container not in self._allowed_containers(user, account):
320 raise NotAllowedError
322 path, node = self._lookup_container(account, container)
323 return self._get_policy(node)
326 def update_container_policy(self, user, account, container, policy, replace=False):
327 """Update the policy associated with the container."""
329 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
331 raise NotAllowedError
332 path, node = self._lookup_container(account, container)
333 self._check_policy(policy)
334 self._put_policy(node, policy, replace)
337 def put_container(self, user, account, container, policy={}):
338 """Create a new container with the given name."""
340 logger.debug("put_container: %s %s %s", account, container, policy)
342 raise NotAllowedError
344 path, node = self._lookup_container(account, container)
348 raise NameError('Container already exists')
350 self._check_policy(policy)
351 path = '/'.join((account, container))
352 node = self._put_path(user, self._lookup_account(account, True)[1], path)
353 self._put_policy(node, policy, True)
356 def delete_container(self, user, account, container, until=None):
357 """Delete/purge the container with the given name."""
359 logger.debug("delete_container: %s %s %s", account, container, until)
361 raise NotAllowedError
362 path, node = self._lookup_container(account, container)
364 if until is not None:
365 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
367 self.store.map_delete(h)
368 self.node.node_purge_children(node, until, CLUSTER_DELETED)
371 if self._get_statistics(node)[0] > 0:
372 raise IndexError('Container is not empty')
373 hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
375 self.store.map_delete(h)
376 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
377 self.node.node_remove(node)
380 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None):
381 """Return a list of objects existing under a container."""
383 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
387 raise NotAllowedError
388 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
390 raise NotAllowedError
393 allowed = self.permissions.access_list_shared('/'.join((account, container)))
396 path, node = self._lookup_container(account, container)
397 return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
400 def list_object_meta(self, user, account, container, domain, until=None):
401 """Return a list with all the container's object meta keys for the domain."""
403 logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
407 raise NotAllowedError
408 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
410 raise NotAllowedError
411 path, node = self._lookup_container(account, container)
412 before = until if until is not None else inf
413 return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
416 def get_object_meta(self, user, account, container, name, domain, version=None):
417 """Return a dictionary with the object metadata for the domain."""
419 logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
420 self._can_read(user, account, container, name)
421 path, node = self._lookup_object(account, container, name)
422 props = self._get_version(node, version)
424 modified = props[self.MTIME]
427 modified = self._get_version(node)[self.MTIME] # Overall last modification.
428 except NameError: # Object may be deleted.
429 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
430 if del_props is None:
431 raise NameError('Object does not exist')
432 modified = del_props[self.MTIME]
434 meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
435 meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
436 meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
437 meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
441 def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
442 """Update the metadata associated with the object for the domain and return the new version."""
444 logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
445 self._can_write(user, account, container, name)
446 path, node = self._lookup_object(account, container, name)
447 src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
448 self._apply_versioning(account, container, src_version_id)
449 return dest_version_id
452 def get_object_permissions(self, user, account, container, name):
453 """Return the action allowed on the object, the path
454 from which the object gets its permissions from,
455 along with a dictionary containing the permissions."""
457 logger.debug("get_object_permissions: %s %s %s", account, container, name)
460 path = '/'.join((account, container, name))
461 if self.permissions.access_check(path, self.WRITE, user):
463 elif self.permissions.access_check(path, self.READ, user):
466 raise NotAllowedError
467 path = self._lookup_object(account, container, name)[0]
468 return (allowed,) + self.permissions.access_inherit(path)
471 def update_object_permissions(self, user, account, container, name, permissions):
472 """Update the permissions associated with the object."""
474 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
476 raise NotAllowedError
477 path = self._lookup_object(account, container, name)[0]
478 self._check_permissions(path, permissions)
479 self.permissions.access_set(path, permissions)
482 def get_object_public(self, user, account, container, name):
483 """Return the public id of the object if applicable."""
485 logger.debug("get_object_public: %s %s %s", account, container, name)
486 self._can_read(user, account, container, name)
487 path = self._lookup_object(account, container, name)[0]
488 p = self.permissions.public_get(path)
494 def update_object_public(self, user, account, container, name, public):
495 """Update the public status of the object."""
497 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
498 self._can_write(user, account, container, name)
499 path = self._lookup_object(account, container, name)[0]
501 self.permissions.public_unset(path)
503 self.permissions.public_set(path)
506 def get_object_hashmap(self, user, account, container, name, version=None):
507 """Return the object's size and a list with partial hashes."""
509 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
510 self._can_read(user, account, container, name)
511 path, node = self._lookup_object(account, container, name)
512 props = self._get_version(node, version)
513 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
514 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
516 def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
517 if permissions is not None and user != account:
518 raise NotAllowedError
519 self._can_write(user, account, container, name)
520 if permissions is not None:
521 path = '/'.join((account, container, name))
522 self._check_permissions(path, permissions)
524 account_path, account_node = self._lookup_account(account, True)
525 container_path, container_node = self._lookup_container(account, container)
526 path, node = self._put_object_node(container_path, container_node, name)
527 pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
530 versioning = self._get_policy(container_node)['versioning']
531 if versioning != 'auto':
532 size_delta = size - 0 # TODO: Get previous size.
536 account_quota = long(self._get_policy(account_node)['quota'])
537 container_quota = long(self._get_policy(container_node)['quota'])
538 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
539 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
540 # This must be executed in a transaction, so the version is never created if it fails.
543 if permissions is not None:
544 self.permissions.access_set(path, permissions)
545 self._apply_versioning(account, container, pre_version_id)
546 return pre_version_id, dest_version_id
549 def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
550 """Create/update an object with the specified size and partial hashes."""
552 logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
553 if size == 0: # No such thing as an empty hashmap.
554 hashmap = [self.put_block('')]
555 map = HashMap(self.block_size, self.hash_algorithm)
556 map.extend([binascii.unhexlify(x) for x in hashmap])
557 missing = self.store.block_search(map)
560 ie.data = [binascii.hexlify(x) for x in missing]
564 pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
565 self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
566 self.store.map_put(hash, map)
567 return dest_version_id
569 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
570 self._can_read(user, src_account, src_container, src_name)
571 path, node = self._lookup_object(src_account, src_container, src_name)
572 # TODO: Will do another fetch of the properties in duplicate version...
573 props = self._get_version(node, src_version) # Check to see if source exists.
574 src_version_id = props[self.SERIAL]
575 hash = props[self.HASH]
576 size = props[self.SIZE]
578 is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
579 pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
580 self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
581 return dest_version_id
584 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
585 """Copy an object's data and metadata."""
587 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
588 return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
591 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
592 """Move an object's data and metadata."""
594 logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
595 if user != src_account:
596 raise NotAllowedError
597 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
598 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
599 self._delete_object(user, src_account, src_container, src_name)
600 return dest_version_id
602 def _delete_object(self, user, account, container, name, until=None):
604 raise NotAllowedError
606 if until is not None:
607 path = '/'.join((account, container, name))
608 node = self.node.node_lookup(path)
611 hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
612 hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
614 self.store.map_delete(h)
615 self.node.node_purge(node, until, CLUSTER_DELETED)
617 props = self._get_version(node)
619 self.permissions.access_clear(path)
622 path, node = self._lookup_object(account, container, name)
623 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
624 self._apply_versioning(account, container, src_version_id)
625 self.permissions.access_clear(path)
628 def delete_object(self, user, account, container, name, until=None):
629 """Delete/purge an object."""
631 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
632 self._delete_object(user, account, container, name, until)
635 def list_versions(self, user, account, container, name):
636 """Return a list of all (version, version_timestamp) tuples for an object."""
638 logger.debug("list_versions: %s %s %s", account, container, name)
639 self._can_read(user, account, container, name)
640 path, node = self._lookup_object(account, container, name)
641 versions = self.node.node_get_versions(node)
642 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
645 def get_uuid(self, user, uuid):
646 """Return the (account, container, name) for the UUID given."""
647 logger.debug("get_uuid: %s", uuid)
648 info = self.node.latest_uuid(uuid)
652 account, container, name = path.split('/', 2)
653 self._can_read(user, account, container, name)
654 return (account, container, name)
657 def get_public(self, user, public):
658 """Return the (account, container, name) for the public id given."""
659 logger.debug("get_public: %s", public)
660 if public is None or public < ULTIMATE_ANSWER:
662 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
665 account, container, name = path.split('/', 2)
666 self._can_read(user, account, container, name)
667 return (account, container, name)
669 @backend_method(autocommit=0)
670 def get_block(self, hash):
671 """Return a block's data."""
673 logger.debug("get_block: %s", hash)
674 block = self.store.block_get(binascii.unhexlify(hash))
676 raise NameError('Block does not exist')
679 @backend_method(autocommit=0)
680 def put_block(self, data):
681 """Store a block and return the hash."""
683 logger.debug("put_block: %s", len(data))
684 return binascii.hexlify(self.store.block_put(data))
686 @backend_method(autocommit=0)
687 def update_block(self, hash, data, offset=0):
688 """Update a known block and return the hash."""
690 logger.debug("update_block: %s %s %s", hash, len(data), offset)
691 if offset == 0 and len(data) == self.block_size:
692 return self.put_block(data)
693 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
694 return binascii.hexlify(h)
698 def _generate_uuid(self):
699 return str(uuidlib.uuid4())
701 def _put_object_node(self, path, parent, name):
702 path = '/'.join((path, name))
703 node = self.node.node_lookup(path)
705 node = self.node.node_create(parent, path)
708 def _put_path(self, user, parent, path):
709 node = self.node.node_create(parent, path)
710 self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
713 def _lookup_account(self, account, create=True):
714 node = self.node.node_lookup(account)
715 if node is None and create:
716 node = self._put_path(account, self.ROOTNODE, account) # User is account.
719 def _lookup_container(self, account, container):
720 path = '/'.join((account, container))
721 node = self.node.node_lookup(path)
723 raise NameError('Container does not exist')
726 def _lookup_object(self, account, container, name):
727 path = '/'.join((account, container, name))
728 node = self.node.node_lookup(path)
730 raise NameError('Object does not exist')
733 def _get_properties(self, node, until=None):
734 """Return properties until the timestamp given."""
736 before = until if until is not None else inf
737 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
738 if props is None and until is not None:
739 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
741 raise NameError('Path does not exist')
744 def _get_statistics(self, node, until=None):
745 """Return count, sum of size and latest timestamp of everything under node."""
748 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
750 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
755 def _get_version(self, node, version=None):
757 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
759 raise NameError('Object does not exist')
762 version = int(version)
764 raise IndexError('Version does not exist')
765 props = self.node.version_get_properties(version)
766 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
767 raise IndexError('Version does not exist')
770 def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
771 """Create a new version of the node."""
773 props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
774 if props is not None:
775 src_version_id = props[self.SERIAL]
776 src_hash = props[self.HASH]
777 src_size = props[self.SIZE]
779 src_version_id = None
783 hash = src_hash # This way hash can be set to None.
785 uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
788 pre_version_id = src_version_id
790 pre_version_id = None
791 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
792 if props is not None:
793 pre_version_id = props[self.SERIAL]
794 if pre_version_id is not None:
795 self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
797 dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
798 return pre_version_id, dest_version_id
800 def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
801 if src_version_id is not None:
802 self.node.attribute_copy(src_version_id, dest_version_id)
804 self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
805 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
807 self.node.attribute_del(dest_version_id, domain)
808 self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
810 def _put_metadata(self, user, node, domain, meta, replace=False):
811 """Create a new version and store metadata."""
813 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
814 self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
815 return src_version_id, dest_version_id
817 def _list_limits(self, listing, marker, limit):
821 start = listing.index(marker) + 1
824 if not limit or limit > 10000:
828 def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
829 cont_prefix = path + '/'
830 prefix = cont_prefix + prefix
831 start = cont_prefix + marker if marker else None
832 before = until if until is not None else inf
833 filterq = keys if domain else []
835 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
836 objects.extend([(p, None) for p in prefixes] if virtual else [])
837 objects.sort(key=lambda x: x[0])
838 objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
840 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
841 return objects[start:start + limit]
845 def _check_policy(self, policy):
846 for k in policy.keys():
848 policy[k] = self.default_policy.get(k)
849 for k, v in policy.iteritems():
851 q = int(v) # May raise ValueError.
854 elif k == 'versioning':
855 if v not in ['auto', 'none']:
860 def _put_policy(self, node, policy, replace):
862 for k, v in self.default_policy.iteritems():
865 self.node.policy_set(node, policy)
867 def _get_policy(self, node):
868 policy = self.default_policy.copy()
869 policy.update(self.node.policy_get(node))
872 def _apply_versioning(self, account, container, version_id):
873 if version_id is None:
875 path, node = self._lookup_container(account, container)
876 versioning = self._get_policy(node)['versioning']
877 if versioning != 'auto':
878 hash = self.node.version_remove(version_id)
879 self.store.map_delete(hash)
881 # Access control functions.
883 def _check_groups(self, groups):
884 # raise ValueError('Bad characters in groups')
887 def _check_permissions(self, path, permissions):
888 # raise ValueError('Bad characters in permissions')
890 # Check for existing permissions.
891 paths = self.permissions.access_list(path)
893 ae = AttributeError()
897 def _can_read(self, user, account, container, name):
900 path = '/'.join((account, container, name))
901 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
902 raise NotAllowedError
904 def _can_write(self, user, account, container, name):
907 path = '/'.join((account, container, name))
908 if not self.permissions.access_check(path, self.WRITE, user):
909 raise NotAllowedError
911 def _allowed_accounts(self, user):
913 for path in self.permissions.access_list_paths(user):
914 allow.add(path.split('/', 1)[0])
917 def _allowed_containers(self, user, account):
919 for path in self.permissions.access_list_paths(user, account):
920 allow.add(path.split('/', 2)[1])