1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
40 from base import NotAllowedError, QuotaError, BaseBackend
42 from pithos.lib.hashmap import HashMap
44 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
51 logger = logging.getLogger(__name__)
54 def backend_method(func=None, autocommit=1):
57 return backend_method(func, autocommit)
62 def fn(self, *args, **kw):
63 self.wrapper.execute()
65 ret = func(self, *args, **kw)
69 self.wrapper.rollback()
74 class ModularBackend(BaseBackend):
77 Uses modules for SQL functions and storage.
80 def __init__(self, db_module, db_connection, block_module, block_path):
81 self.hash_algorithm = 'sha256'
82 self.block_size = 4 * 1024 * 1024 # 4MB
84 self.default_policy = {'quota': 0, 'versioning': 'auto'}
87 self.db_module = sys.modules[db_module]
88 self.wrapper = self.db_module.DBWrapper(db_connection)
90 params = {'wrapper': self.wrapper}
91 self.permissions = self.db_module.Permissions(**params)
92 for x in ['READ', 'WRITE']:
93 setattr(self, x, getattr(self.db_module, x))
94 self.node = self.db_module.Node(**params)
95 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
96 setattr(self, x, getattr(self.db_module, x))
98 __import__(block_module)
99 self.block_module = sys.modules[block_module]
101 params = {'path': block_path,
102 'block_size': self.block_size,
103 'hash_algorithm': self.hash_algorithm}
104 self.store = self.block_module.Store(**params)
110 def list_accounts(self, user, marker=None, limit=10000):
111 """Return a list of accounts the user can access."""
113 logger.debug("list_accounts: %s %s %s", user, marker, limit)
114 allowed = self._allowed_accounts(user)
115 start, limit = self._list_limits(allowed, marker, limit)
116 return allowed[start:start + limit]
119 def get_account_meta(self, user, account, until=None):
120 """Return a dictionary with the account metadata."""
122 logger.debug("get_account_meta: %s %s", account, until)
123 path, node = self._lookup_account(account, user == account)
125 if until or node is None or account not in self._allowed_accounts(user):
126 raise NotAllowedError
128 props = self._get_properties(node, until)
129 mtime = props[self.MTIME]
133 count, bytes, tstamp = self._get_statistics(node, until)
134 tstamp = max(tstamp, mtime)
138 modified = self._get_statistics(node)[2] # Overall last modification.
139 modified = max(modified, mtime)
142 meta = {'name': account}
145 if props is not None:
146 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
147 if until is not None:
148 meta.update({'until_timestamp': tstamp})
149 meta.update({'name': account, 'count': count, 'bytes': bytes})
150 meta.update({'modified': modified})
154 def update_account_meta(self, user, account, meta, replace=False):
155 """Update the metadata associated with the account."""
157 logger.debug("update_account_meta: %s %s %s", account, meta, replace)
159 raise NotAllowedError
160 path, node = self._lookup_account(account, True)
161 self._put_metadata(user, node, meta, replace)
164 def get_account_groups(self, user, account):
165 """Return a dictionary with the user groups defined for this account."""
167 logger.debug("get_account_groups: %s", account)
169 if account not in self._allowed_accounts(user):
170 raise NotAllowedError
172 self._lookup_account(account, True)
173 return self.permissions.group_dict(account)
176 def update_account_groups(self, user, account, groups, replace=False):
177 """Update the groups associated with the account."""
179 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
181 raise NotAllowedError
182 self._lookup_account(account, True)
183 self._check_groups(groups)
185 self.permissions.group_destroy(account)
186 for k, v in groups.iteritems():
187 if not replace: # If not already deleted.
188 self.permissions.group_delete(account, k)
190 self.permissions.group_addmany(account, k, v)
193 def get_account_policy(self, user, account):
194 """Return a dictionary with the account policy."""
196 logger.debug("get_account_policy: %s", account)
198 if account not in self._allowed_accounts(user):
199 raise NotAllowedError
201 path, node = self._lookup_account(account, True)
202 return self._get_policy(node)
205 def update_account_policy(self, user, account, policy, replace=False):
206 """Update the policy associated with the account."""
208 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
210 raise NotAllowedError
211 path, node = self._lookup_account(account, True)
212 self._check_policy(policy)
213 self._put_policy(node, policy, replace)
216 def put_account(self, user, account, policy={}):
217 """Create a new account with the given name."""
219 logger.debug("put_account: %s %s", account, policy)
221 raise NotAllowedError
222 node = self.node.node_lookup(account)
224 raise NameError('Account already exists')
226 self._check_policy(policy)
227 node = self._put_path(user, self.ROOTNODE, account)
228 self._put_policy(node, policy, True)
231 def delete_account(self, user, account):
232 """Delete the account with the given name."""
234 logger.debug("delete_account: %s", account)
236 raise NotAllowedError
237 node = self.node.node_lookup(account)
240 if not self.node.node_remove(node):
241 raise IndexError('Account is not empty')
242 self.permissions.group_destroy(account)
245 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
246 """Return a list of containers existing under an account."""
248 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
250 if until or account not in self._allowed_accounts(user):
251 raise NotAllowedError
252 allowed = self._allowed_containers(user, account)
253 start, limit = self._list_limits(allowed, marker, limit)
254 return allowed[start:start + limit]
256 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
257 allowed = list(set(allowed))
258 start, limit = self._list_limits(allowed, marker, limit)
259 return allowed[start:start + limit]
260 node = self.node.node_lookup(account)
261 return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
264 def get_container_meta(self, user, account, container, until=None):
265 """Return a dictionary with the container metadata."""
267 logger.debug("get_container_meta: %s %s %s", account, container, until)
269 if until or container not in self._allowed_containers(user, account):
270 raise NotAllowedError
271 path, node = self._lookup_container(account, container)
272 props = self._get_properties(node, until)
273 mtime = props[self.MTIME]
274 count, bytes, tstamp = self._get_statistics(node, until)
275 tstamp = max(tstamp, mtime)
279 modified = self._get_statistics(node)[2] # Overall last modification.
280 modified = max(modified, mtime)
283 meta = {'name': container}
285 meta = dict(self.node.attribute_get(props[self.SERIAL]))
286 if until is not None:
287 meta.update({'until_timestamp': tstamp})
288 meta.update({'name': container, 'count': count, 'bytes': bytes})
289 meta.update({'modified': modified})
293 def update_container_meta(self, user, account, container, meta, replace=False):
294 """Update the metadata associated with the container."""
296 logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
298 raise NotAllowedError
299 path, node = self._lookup_container(account, container)
300 self._put_metadata(user, node, meta, replace)
303 def get_container_policy(self, user, account, container):
304 """Return a dictionary with the container policy."""
306 logger.debug("get_container_policy: %s %s", account, container)
308 if container not in self._allowed_containers(user, account):
309 raise NotAllowedError
311 path, node = self._lookup_container(account, container)
312 return self._get_policy(node)
315 def update_container_policy(self, user, account, container, policy, replace=False):
316 """Update the policy associated with the container."""
318 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
320 raise NotAllowedError
321 path, node = self._lookup_container(account, container)
322 self._check_policy(policy)
323 self._put_policy(node, policy, replace)
326 def put_container(self, user, account, container, policy={}):
327 """Create a new container with the given name."""
329 logger.debug("put_container: %s %s %s", account, container, policy)
331 raise NotAllowedError
333 path, node = self._lookup_container(account, container)
337 raise NameError('Container already exists')
339 self._check_policy(policy)
340 path = '/'.join((account, container))
341 node = self._put_path(user, self._lookup_account(account, True)[1], path)
342 self._put_policy(node, policy, True)
345 def delete_container(self, user, account, container, until=None):
346 """Delete/purge the container with the given name."""
348 logger.debug("delete_container: %s %s %s", account, container, until)
350 raise NotAllowedError
351 path, node = self._lookup_container(account, container)
353 if until is not None:
354 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
356 self.store.map_delete(h)
357 self.node.node_purge_children(node, until, CLUSTER_DELETED)
360 if self._get_statistics(node)[0] > 0:
361 raise IndexError('Container is not empty')
362 hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
364 self.store.map_delete(h)
365 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
366 self.node.node_remove(node)
369 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
370 """Return a list of objects existing under a container."""
372 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
376 raise NotAllowedError
377 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
379 raise NotAllowedError
382 allowed = self.permissions.access_list_shared('/'.join((account, container)))
385 path, node = self._lookup_container(account, container)
386 return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
389 def list_object_meta(self, user, account, container, until=None):
390 """Return a list with all the container's object meta keys."""
392 logger.debug("list_object_meta: %s %s %s", account, container, until)
396 raise NotAllowedError
397 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
399 raise NotAllowedError
400 path, node = self._lookup_container(account, container)
401 before = until if until is not None else inf
402 return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
405 def get_object_meta(self, user, account, container, name, version=None):
406 """Return a dictionary with the object metadata."""
408 logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
409 self._can_read(user, account, container, name)
410 path, node = self._lookup_object(account, container, name)
411 props = self._get_version(node, version)
413 modified = props[self.MTIME]
416 modified = self._get_version(node)[self.MTIME] # Overall last modification.
417 except NameError: # Object may be deleted.
418 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
419 if del_props is None:
420 raise NameError('Object does not exist')
421 modified = del_props[self.MTIME]
423 meta = dict(self.node.attribute_get(props[self.SERIAL]))
424 meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
425 meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
426 meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
430 def update_object_meta(self, user, account, container, name, meta, replace=False):
431 """Update the metadata associated with the object."""
433 logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
434 self._can_write(user, account, container, name)
435 path, node = self._lookup_object(account, container, name)
436 src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
437 self._apply_versioning(account, container, src_version_id)
438 return dest_version_id
441 def get_object_permissions(self, user, account, container, name):
442 """Return the action allowed on the object, the path
443 from which the object gets its permissions from,
444 along with a dictionary containing the permissions."""
446 logger.debug("get_object_permissions: %s %s %s", account, container, name)
449 path = '/'.join((account, container, name))
450 if self.permissions.access_check(path, self.WRITE, user):
452 elif self.permissions.access_check(path, self.READ, user):
455 raise NotAllowedError
456 path = self._lookup_object(account, container, name)[0]
457 return (allowed,) + self.permissions.access_inherit(path)
460 def update_object_permissions(self, user, account, container, name, permissions):
461 """Update the permissions associated with the object."""
463 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
465 raise NotAllowedError
466 path = self._lookup_object(account, container, name)[0]
467 self._check_permissions(path, permissions)
468 self.permissions.access_set(path, permissions)
471 def get_object_public(self, user, account, container, name):
472 """Return the public id of the object if applicable."""
474 logger.debug("get_object_public: %s %s %s", account, container, name)
475 self._can_read(user, account, container, name)
476 path = self._lookup_object(account, container, name)[0]
477 p = self.permissions.public_get(path)
483 def update_object_public(self, user, account, container, name, public):
484 """Update the public status of the object."""
486 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
487 self._can_write(user, account, container, name)
488 path = self._lookup_object(account, container, name)[0]
490 self.permissions.public_unset(path)
492 self.permissions.public_set(path)
495 def get_object_hashmap(self, user, account, container, name, version=None):
496 """Return the object's size and a list with partial hashes."""
498 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
499 self._can_read(user, account, container, name)
500 path, node = self._lookup_object(account, container, name)
501 props = self._get_version(node, version)
502 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
503 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
505 def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
506 if permissions is not None and user != account:
507 raise NotAllowedError
508 self._can_write(user, account, container, name)
509 if permissions is not None:
510 path = '/'.join((account, container, name))
511 self._check_permissions(path, permissions)
513 account_path, account_node = self._lookup_account(account, True)
514 container_path, container_node = self._lookup_container(account, container)
515 path, node = self._put_object_node(container_path, container_node, name)
516 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
519 size_delta = size # Change with versioning.
521 account_quota = long(self._get_policy(account_node)['quota'])
522 container_quota = long(self._get_policy(container_node)['quota'])
523 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
524 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
525 # This must be executed in a transaction, so the version is never created if it fails.
528 if not replace_meta and src_version_id is not None:
529 self.node.attribute_copy(src_version_id, dest_version_id)
530 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
531 if permissions is not None:
532 self.permissions.access_set(path, permissions)
533 self._apply_versioning(account, container, src_version_id)
534 return dest_version_id
537 def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
538 """Create/update an object with the specified size and partial hashes."""
540 logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
541 if size == 0: # No such thing as an empty hashmap.
542 hashmap = [self.put_block('')]
543 map = HashMap(self.block_size, self.hash_algorithm)
544 map.extend([binascii.unhexlify(x) for x in hashmap])
545 missing = self.store.block_search(map)
548 ie.data = [binascii.hexlify(x) for x in missing]
552 dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
553 self.store.map_put(hash, map)
554 return dest_version_id
556 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
557 self._can_read(user, src_account, src_container, src_name)
558 path, node = self._lookup_object(src_account, src_container, src_name)
559 props = self._get_version(node, src_version)
560 src_version_id = props[self.SERIAL]
561 hash = props[self.HASH]
562 size = props[self.SIZE]
564 if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
565 dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
571 dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
573 self.node.attribute_copy(src_version_id, dest_version_id)
574 self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
575 return dest_version_id
578 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
579 """Copy an object's data and metadata."""
581 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
582 return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
585 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
586 """Move an object's data and metadata."""
588 logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
589 if user != src_account:
590 raise NotAllowedError
591 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
592 if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
593 self._delete_object(user, src_account, src_container, src_name)
594 return dest_version_id
596 def _delete_object(self, user, account, container, name, until=None):
598 raise NotAllowedError
600 if until is not None:
601 path = '/'.join((account, container, name))
602 node = self.node.node_lookup(path)
605 hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
606 hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
608 self.store.map_delete(h)
609 self.node.node_purge(node, until, CLUSTER_DELETED)
611 props = self._get_version(node)
613 self.permissions.access_clear(path)
616 path, node = self._lookup_object(account, container, name)
617 src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
618 self._apply_versioning(account, container, src_version_id)
619 self.permissions.access_clear(path)
622 def delete_object(self, user, account, container, name, until=None):
623 """Delete/purge an object."""
625 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
626 self._delete_object(user, account, container, name, until)
629 def list_versions(self, user, account, container, name):
630 """Return a list of all (version, version_timestamp) tuples for an object."""
632 logger.debug("list_versions: %s %s %s", account, container, name)
633 self._can_read(user, account, container, name)
634 path, node = self._lookup_object(account, container, name)
635 versions = self.node.node_get_versions(node)
636 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
639 def get_public(self, user, public):
640 """Return the (account, container, name) for the public id given."""
641 logger.debug("get_public: %s", public)
642 if public is None or public < ULTIMATE_ANSWER:
644 path = self.permissions.public_path(public - ULTIMATE_ANSWER)
645 account, container, name = path.split('/', 2)
646 self._can_read(user, account, container, name)
647 return (account, container, name)
649 @backend_method(autocommit=0)
650 def get_block(self, hash):
651 """Return a block's data."""
653 logger.debug("get_block: %s", hash)
654 block = self.store.block_get(binascii.unhexlify(hash))
656 raise NameError('Block does not exist')
659 @backend_method(autocommit=0)
660 def put_block(self, data):
661 """Store a block and return the hash."""
663 logger.debug("put_block: %s", len(data))
664 return binascii.hexlify(self.store.block_put(data))
666 @backend_method(autocommit=0)
667 def update_block(self, hash, data, offset=0):
668 """Update a known block and return the hash."""
670 logger.debug("update_block: %s %s %s", hash, len(data), offset)
671 if offset == 0 and len(data) == self.block_size:
672 return self.put_block(data)
673 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
674 return binascii.hexlify(h)
678 def _put_object_node(self, path, parent, name):
679 path = '/'.join((path, name))
680 node = self.node.node_lookup(path)
682 node = self.node.node_create(parent, path)
685 def _put_path(self, user, parent, path):
686 node = self.node.node_create(parent, path)
687 self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
690 def _lookup_account(self, account, create=True):
691 node = self.node.node_lookup(account)
692 if node is None and create:
693 node = self._put_path(account, self.ROOTNODE, account) # User is account.
696 def _lookup_container(self, account, container):
697 path = '/'.join((account, container))
698 node = self.node.node_lookup(path)
700 raise NameError('Container does not exist')
703 def _lookup_object(self, account, container, name):
704 path = '/'.join((account, container, name))
705 node = self.node.node_lookup(path)
707 raise NameError('Object does not exist')
710 def _get_properties(self, node, until=None):
711 """Return properties until the timestamp given."""
713 before = until if until is not None else inf
714 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
715 if props is None and until is not None:
716 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
718 raise NameError('Path does not exist')
721 def _get_statistics(self, node, until=None):
722 """Return count, sum of size and latest timestamp of everything under node."""
725 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
727 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
732 def _get_version(self, node, version=None):
734 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
736 raise NameError('Object does not exist')
739 version = int(version)
741 raise IndexError('Version does not exist')
742 props = self.node.version_get_properties(version)
743 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
744 raise IndexError('Version does not exist')
747 def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
748 """Create a new version of the node."""
750 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
751 if props is not None:
752 src_version_id = props[self.SERIAL]
753 src_hash = props[self.HASH]
754 src_size = props[self.SIZE]
756 src_version_id = None
760 hash = src_hash # This way hash can be set to None.
763 if src_version_id is not None:
764 self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
765 dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
766 return src_version_id, dest_version_id
768 def _put_metadata(self, user, node, meta, replace=False):
769 """Create a new version and store metadata."""
771 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
773 # TODO: Merge with other functions that update metadata...
775 if src_version_id is not None:
776 self.node.attribute_copy(src_version_id, dest_version_id)
777 self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
778 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
780 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
781 return src_version_id, dest_version_id
783 def _list_limits(self, listing, marker, limit):
787 start = listing.index(marker) + 1
790 if not limit or limit > 10000:
794 def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
795 cont_prefix = path + '/'
796 prefix = cont_prefix + prefix
797 start = cont_prefix + marker if marker else None
798 before = until if until is not None else inf
799 filterq = ','.join(keys) if keys else None
801 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
802 objects.extend([(p, None) for p in prefixes] if virtual else [])
803 objects.sort(key=lambda x: x[0])
804 objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
806 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
807 return objects[start:start + limit]
811 def _check_policy(self, policy):
812 for k in policy.keys():
814 policy[k] = self.default_policy.get(k)
815 for k, v in policy.iteritems():
817 q = int(v) # May raise ValueError.
820 elif k == 'versioning':
821 if v not in ['auto', 'none']:
826 def _put_policy(self, node, policy, replace):
828 for k, v in self.default_policy.iteritems():
831 self.node.policy_set(node, policy)
833 def _get_policy(self, node):
834 policy = self.default_policy.copy()
835 policy.update(self.node.policy_get(node))
838 def _apply_versioning(self, account, container, version_id):
839 if version_id is None:
841 path, node = self._lookup_container(account, container)
842 versioning = self._get_policy(node)['versioning']
843 if versioning != 'auto':
844 hash = self.node.version_remove(version_id)
845 self.store.map_delete(hash)
847 # Access control functions.
849 def _check_groups(self, groups):
850 # raise ValueError('Bad characters in groups')
853 def _check_permissions(self, path, permissions):
854 # raise ValueError('Bad characters in permissions')
856 # Check for existing permissions.
857 paths = self.permissions.access_list(path)
859 ae = AttributeError()
863 def _can_read(self, user, account, container, name):
866 path = '/'.join((account, container, name))
867 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
868 raise NotAllowedError
870 def _can_write(self, user, account, container, name):
873 path = '/'.join((account, container, name))
874 if not self.permissions.access_check(path, self.WRITE, user):
875 raise NotAllowedError
877 def _allowed_accounts(self, user):
879 for path in self.permissions.access_list_paths(user):
880 allow.add(path.split('/', 1)[0])
883 def _allowed_containers(self, user, account):
885 for path in self.permissions.access_list_paths(user, account):
886 allow.add(path.split('/', 2)[1])