1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
41 from base import NotAllowedError, QuotaError, BaseBackend
43 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
48 logger = logging.getLogger(__name__)
53 def __init__(self, blocksize, blockhash):
54 super(HashMap, self).__init__()
55 self.blocksize = blocksize
56 self.blockhash = blockhash
58 def _hash_raw(self, v):
59 h = hashlib.new(self.blockhash)
65 return self._hash_raw('')
67 return self.__getitem__(0)
73 h += [('\x00' * len(h[0]))] * (s - len(h))
75 h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
79 def backend_method(func=None, autocommit=1):
82 return backend_method(func, autocommit)
87 def fn(self, *args, **kw):
88 self.wrapper.execute()
90 ret = func(self, *args, **kw)
94 self.wrapper.rollback()
99 class ModularBackend(BaseBackend):
100 """A modular backend.
102 Uses modules for SQL functions and storage.
105 def __init__(self, db_module, db_connection, block_module, block_path):
106 self.hash_algorithm = 'sha256'
107 self.block_size = 4 * 1024 * 1024 # 4MB
109 self.default_policy = {'quota': 0, 'versioning': 'manual'}
111 __import__(db_module)
112 self.db_module = sys.modules[db_module]
113 self.wrapper = self.db_module.DBWrapper(db_connection)
115 params = {'wrapper': self.wrapper}
116 self.permissions = self.db_module.Permissions(**params)
117 for x in ['READ', 'WRITE']:
118 setattr(self, x, getattr(self.db_module, x))
119 self.node = self.db_module.Node(**params)
120 for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
121 setattr(self, x, getattr(self.db_module, x))
123 __import__(block_module)
124 self.block_module = sys.modules[block_module]
126 params = {'path': block_path,
127 'block_size': self.block_size,
128 'hash_algorithm': self.hash_algorithm}
129 self.store = self.block_module.Store(**params)
135 def list_accounts(self, user, marker=None, limit=10000):
136 """Return a list of accounts the user can access."""
138 logger.debug("list_accounts: %s %s %s", user, marker, limit)
139 allowed = self._allowed_accounts(user)
140 start, limit = self._list_limits(allowed, marker, limit)
141 return allowed[start:start + limit]
144 def get_account_meta(self, user, account, until=None):
145 """Return a dictionary with the account metadata."""
147 logger.debug("get_account_meta: %s %s", account, until)
148 path, node = self._lookup_account(account, user == account)
150 if until or node is None or account not in self._allowed_accounts(user):
151 raise NotAllowedError
153 props = self._get_properties(node, until)
154 mtime = props[self.MTIME]
158 count, bytes, tstamp = self._get_statistics(node, until)
159 tstamp = max(tstamp, mtime)
163 modified = self._get_statistics(node)[2] # Overall last modification.
164 modified = max(modified, mtime)
167 meta = {'name': account}
170 if props is not None:
171 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
172 if until is not None:
173 meta.update({'until_timestamp': tstamp})
174 meta.update({'name': account, 'count': count, 'bytes': bytes})
175 meta.update({'modified': modified})
179 def update_account_meta(self, user, account, meta, replace=False):
180 """Update the metadata associated with the account."""
182 logger.debug("update_account_meta: %s %s %s", account, meta, replace)
184 raise NotAllowedError
185 path, node = self._lookup_account(account, True)
186 self._put_metadata(user, node, meta, replace)
189 def get_account_groups(self, user, account):
190 """Return a dictionary with the user groups defined for this account."""
192 logger.debug("get_account_groups: %s", account)
194 if account not in self._allowed_accounts(user):
195 raise NotAllowedError
197 self._lookup_account(account, True)
198 return self.permissions.group_dict(account)
201 def update_account_groups(self, user, account, groups, replace=False):
202 """Update the groups associated with the account."""
204 logger.debug("update_account_groups: %s %s %s", account, groups, replace)
206 raise NotAllowedError
207 self._lookup_account(account, True)
208 self._check_groups(groups)
210 self.permissions.group_destroy(account)
211 for k, v in groups.iteritems():
212 if not replace: # If not already deleted.
213 self.permissions.group_delete(account, k)
215 self.permissions.group_addmany(account, k, v)
218 def get_account_policy(self, user, account):
219 """Return a dictionary with the account policy."""
221 logger.debug("get_account_policy: %s", account)
223 if account not in self._allowed_accounts(user):
224 raise NotAllowedError
226 path, node = self._lookup_account(account, True)
227 return self._get_policy(node)
230 def update_account_policy(self, user, account, policy, replace=False):
231 """Update the policy associated with the account."""
233 logger.debug("update_account_policy: %s %s %s", account, policy, replace)
235 raise NotAllowedError
236 path, node = self._lookup_account(account, True)
237 self._check_policy(policy)
238 self._put_policy(node, policy, replace)
241 def put_account(self, user, account, policy={}):
242 """Create a new account with the given name."""
244 logger.debug("put_account: %s %s", account, policy)
246 raise NotAllowedError
247 node = self.node.node_lookup(account)
249 raise NameError('Account already exists')
251 self._check_policy(policy)
252 node = self._put_path(user, self.ROOTNODE, account)
253 self._put_policy(node, policy, True)
256 def delete_account(self, user, account):
257 """Delete the account with the given name."""
259 logger.debug("delete_account: %s", account)
261 raise NotAllowedError
262 node = self.node.node_lookup(account)
265 if not self.node.node_remove(node):
266 raise IndexError('Account is not empty')
267 self.permissions.group_destroy(account)
270 def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
271 """Return a list of containers existing under an account."""
273 logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
275 if until or account not in self._allowed_accounts(user):
276 raise NotAllowedError
277 allowed = self._allowed_containers(user, account)
278 start, limit = self._list_limits(allowed, marker, limit)
279 return allowed[start:start + limit]
281 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
282 allowed = list(set(allowed))
283 start, limit = self._list_limits(allowed, marker, limit)
284 return allowed[start:start + limit]
285 node = self.node.node_lookup(account)
286 return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
289 def get_container_meta(self, user, account, container, until=None):
290 """Return a dictionary with the container metadata."""
292 logger.debug("get_container_meta: %s %s %s", account, container, until)
294 if until or container not in self._allowed_containers(user, account):
295 raise NotAllowedError
296 path, node = self._lookup_container(account, container)
297 props = self._get_properties(node, until)
298 mtime = props[self.MTIME]
299 count, bytes, tstamp = self._get_statistics(node, until)
300 tstamp = max(tstamp, mtime)
304 modified = self._get_statistics(node)[2] # Overall last modification.
305 modified = max(modified, mtime)
308 meta = {'name': container}
310 meta = dict(self.node.attribute_get(props[self.SERIAL]))
311 if until is not None:
312 meta.update({'until_timestamp': tstamp})
313 meta.update({'name': container, 'count': count, 'bytes': bytes})
314 meta.update({'modified': modified})
318 def update_container_meta(self, user, account, container, meta, replace=False):
319 """Update the metadata associated with the container."""
321 logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
323 raise NotAllowedError
324 path, node = self._lookup_container(account, container)
325 self._put_metadata(user, node, meta, replace)
328 def get_container_policy(self, user, account, container):
329 """Return a dictionary with the container policy."""
331 logger.debug("get_container_policy: %s %s", account, container)
333 if container not in self._allowed_containers(user, account):
334 raise NotAllowedError
336 path, node = self._lookup_container(account, container)
337 return self._get_policy(node)
340 def update_container_policy(self, user, account, container, policy, replace=False):
341 """Update the policy associated with the container."""
343 logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
345 raise NotAllowedError
346 path, node = self._lookup_container(account, container)
347 self._check_policy(policy)
348 self._put_policy(node, policy, replace)
351 def put_container(self, user, account, container, policy={}):
352 """Create a new container with the given name."""
354 logger.debug("put_container: %s %s %s", account, container, policy)
356 raise NotAllowedError
358 path, node = self._lookup_container(account, container)
362 raise NameError('Container already exists')
364 self._check_policy(policy)
365 path = '/'.join((account, container))
366 node = self._put_path(user, self._lookup_account(account, True)[1], path)
367 self._put_policy(node, policy, True)
370 def delete_container(self, user, account, container, until=None):
371 """Delete/purge the container with the given name."""
373 logger.debug("delete_container: %s %s %s", account, container, until)
375 raise NotAllowedError
376 path, node = self._lookup_container(account, container)
378 if until is not None:
379 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
381 self.store.map_delete(h)
382 self.node.node_purge_children(node, until, CLUSTER_DELETED)
385 if self._get_statistics(node)[0] > 0:
386 raise IndexError('Container is not empty')
387 hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
389 self.store.map_delete(h)
390 self.node.node_purge_children(node, inf, CLUSTER_DELETED)
391 self.node.node_remove(node)
394 def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
395 """Return a list of objects existing under a container."""
397 logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
401 raise NotAllowedError
402 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
404 raise NotAllowedError
407 allowed = self.permissions.access_list_shared('/'.join((account, container)))
410 path, node = self._lookup_container(account, container)
411 return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
414 def list_object_meta(self, user, account, container, until=None):
415 """Return a list with all the container's object meta keys."""
417 logger.debug("list_object_meta: %s %s %s", account, container, until)
421 raise NotAllowedError
422 allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
424 raise NotAllowedError
425 path, node = self._lookup_container(account, container)
426 before = until if until is not None else inf
427 return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
430 def get_object_meta(self, user, account, container, name, version=None):
431 """Return a dictionary with the object metadata."""
433 logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
434 self._can_read(user, account, container, name)
435 path, node = self._lookup_object(account, container, name)
436 props = self._get_version(node, version)
438 modified = props[self.MTIME]
441 modified = self._get_version(node)[self.MTIME] # Overall last modification.
442 except NameError: # Object may be deleted.
443 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
444 if del_props is None:
445 raise NameError('Object does not exist')
446 modified = del_props[self.MTIME]
448 meta = dict(self.node.attribute_get(props[self.SERIAL]))
449 meta.update({'name': name, 'bytes': props[self.SIZE]})
450 meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
451 meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
455 def update_object_meta(self, user, account, container, name, meta, replace=False):
456 """Update the metadata associated with the object."""
458 logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
459 self._can_write(user, account, container, name)
460 path, node = self._lookup_object(account, container, name)
461 return self._put_metadata(user, node, meta, replace)
464 def get_object_permissions(self, user, account, container, name):
465 """Return the action allowed on the object, the path
466 from which the object gets its permissions from,
467 along with a dictionary containing the permissions."""
469 logger.debug("get_object_permissions: %s %s %s", account, container, name)
472 path = '/'.join((account, container, name))
473 if self.permissions.access_check(path, self.WRITE, user):
475 elif self.permissions.access_check(path, self.READ, user):
478 raise NotAllowedError
479 path = self._lookup_object(account, container, name)[0]
480 return (allowed,) + self.permissions.access_inherit(path)
483 def update_object_permissions(self, user, account, container, name, permissions):
484 """Update the permissions associated with the object."""
486 logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
488 raise NotAllowedError
489 path = self._lookup_object(account, container, name)[0]
490 self._check_permissions(path, permissions)
491 self.permissions.access_set(path, permissions)
494 def get_object_public(self, user, account, container, name):
495 """Return the public URL of the object if applicable."""
497 logger.debug("get_object_public: %s %s %s", account, container, name)
498 self._can_read(user, account, container, name)
499 path = self._lookup_object(account, container, name)[0]
500 if self.permissions.public_check(path):
501 return '/public/' + path
505 def update_object_public(self, user, account, container, name, public):
506 """Update the public status of the object."""
508 logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
509 self._can_write(user, account, container, name)
510 path = self._lookup_object(account, container, name)[0]
512 self.permissions.public_unset(path)
514 self.permissions.public_set(path)
517 def get_object_hashmap(self, user, account, container, name, version=None):
518 """Return the object's size and a list with partial hashes."""
520 logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
521 self._can_read(user, account, container, name)
522 path, node = self._lookup_object(account, container, name)
523 props = self._get_version(node, version)
524 hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
525 return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
527 def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
528 if permissions is not None and user != account:
529 raise NotAllowedError
530 self._can_write(user, account, container, name)
531 if permissions is not None:
532 path = '/'.join((account, container, name))
533 self._check_permissions(path, permissions)
535 account_path, account_node = self._lookup_account(account, True)
536 container_path, container_node = self._lookup_container(account, container)
537 path, node = self._put_object_node(container_path, container_node, name)
538 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
541 size_delta = size # Change with versioning.
543 account_quota = long(self._get_policy(account_node)['quota'])
544 container_quota = long(self._get_policy(container_node)['quota'])
545 if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
546 (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
547 # This must be executed in a transaction, so the version is never created if it fails.
550 if not replace_meta and src_version_id is not None:
551 self.node.attribute_copy(src_version_id, dest_version_id)
552 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
553 if permissions is not None:
554 self.permissions.access_set(path, permissions)
555 return dest_version_id
558 def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
559 """Create/update an object with the specified size and partial hashes."""
561 logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
562 if size == 0: # No such thing as an empty hashmap.
563 hashmap = [self.put_block('')]
564 map = HashMap(self.block_size, self.hash_algorithm)
565 map.extend([binascii.unhexlify(x) for x in hashmap])
566 missing = self.store.block_search(map)
569 ie.data = [binascii.hexlify(x) for x in missing]
573 dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
574 self.store.map_put(hash, map)
575 return dest_version_id
577 def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
578 self._can_read(user, src_account, src_container, src_name)
579 path, node = self._lookup_object(src_account, src_container, src_name)
580 props = self._get_version(node, src_version)
581 src_version_id = props[self.SERIAL]
582 hash = props[self.HASH]
583 size = props[self.SIZE]
589 dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
591 self.node.attribute_copy(src_version_id, dest_version_id)
592 self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
593 return dest_version_id
596 def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
597 """Copy an object's data and metadata."""
599 logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
600 return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
603 def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
604 """Move an object's data and metadata."""
606 logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
607 if user != src_account:
608 raise NotAllowedError
609 dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
610 self._delete_object(user, src_account, src_container, src_name)
611 return dest_version_id
613 def _delete_object(self, user, account, container, name, until=None):
615 raise NotAllowedError
617 if until is not None:
618 path = '/'.join((account, container, name))
619 node = self.node.node_lookup(path)
622 hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
623 hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
625 self.store.map_delete(h)
626 self.node.node_purge_children(node, until, CLUSTER_DELETED)
628 props = self._get_version(node)
632 self.permissions.access_clear(path)
635 path, node = self._lookup_object(account, container, name)
636 src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
637 self.permissions.access_clear(path)
640 def delete_object(self, user, account, container, name, until=None):
641 """Delete/purge an object."""
643 logger.debug("delete_object: %s %s %s %s", account, container, name, until)
644 self._delete_object(user, account, container, name, until)
647 def list_versions(self, user, account, container, name):
648 """Return a list of all (version, version_timestamp) tuples for an object."""
650 logger.debug("list_versions: %s %s %s", account, container, name)
651 self._can_read(user, account, container, name)
652 path, node = self._lookup_object(account, container, name)
653 versions = self.node.node_get_versions(node)
654 return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
656 @backend_method(autocommit=0)
657 def get_block(self, hash):
658 """Return a block's data."""
660 logger.debug("get_block: %s", hash)
661 block = self.store.block_get(binascii.unhexlify(hash))
663 raise NameError('Block does not exist')
666 @backend_method(autocommit=0)
667 def put_block(self, data):
668 """Store a block and return the hash."""
670 logger.debug("put_block: %s", len(data))
671 return binascii.hexlify(self.store.block_put(data))
673 @backend_method(autocommit=0)
674 def update_block(self, hash, data, offset=0):
675 """Update a known block and return the hash."""
677 logger.debug("update_block: %s %s %s", hash, len(data), offset)
678 if offset == 0 and len(data) == self.block_size:
679 return self.put_block(data)
680 h = self.store.block_update(binascii.unhexlify(hash), offset, data)
681 return binascii.hexlify(h)
685 def _put_object_node(self, path, parent, name):
686 path = '/'.join((path, name))
687 node = self.node.node_lookup(path)
689 node = self.node.node_create(parent, path)
692 def _put_path(self, user, parent, path):
693 node = self.node.node_create(parent, path)
694 self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
697 def _lookup_account(self, account, create=True):
698 node = self.node.node_lookup(account)
699 if node is None and create:
700 node = self._put_path(account, self.ROOTNODE, account) # User is account.
703 def _lookup_container(self, account, container):
704 path = '/'.join((account, container))
705 node = self.node.node_lookup(path)
707 raise NameError('Container does not exist')
710 def _lookup_object(self, account, container, name):
711 path = '/'.join((account, container, name))
712 node = self.node.node_lookup(path)
714 raise NameError('Object does not exist')
717 def _get_properties(self, node, until=None):
718 """Return properties until the timestamp given."""
720 before = until if until is not None else inf
721 props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
722 if props is None and until is not None:
723 props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
725 raise NameError('Path does not exist')
728 def _get_statistics(self, node, until=None):
729 """Return count, sum of size and latest timestamp of everything under node."""
732 stats = self.node.statistics_get(node, CLUSTER_NORMAL)
734 stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
739 def _get_version(self, node, version=None):
741 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
743 raise NameError('Object does not exist')
746 version = int(version)
748 raise IndexError('Version does not exist')
749 props = self.node.version_get_properties(version)
750 if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
751 raise IndexError('Version does not exist')
754 def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
755 """Create a new version of the node."""
757 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
758 if props is not None:
759 src_version_id = props[self.SERIAL]
760 src_hash = props[self.HASH]
761 src_size = props[self.SIZE]
763 src_version_id = None
767 hash = src_hash # This way hash can be set to None.
770 if src_version_id is not None:
771 self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
772 dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
773 return src_version_id, dest_version_id
775 def _put_metadata(self, user, node, meta, replace=False):
776 """Create a new version and store metadata."""
778 src_version_id, dest_version_id = self._put_version_duplicate(user, node)
780 # TODO: Merge with other functions that update metadata...
782 if src_version_id is not None:
783 self.node.attribute_copy(src_version_id, dest_version_id)
784 self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
785 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
787 self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
788 return dest_version_id
790 def _list_limits(self, listing, marker, limit):
794 start = listing.index(marker) + 1
797 if not limit or limit > 10000:
801 def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
802 cont_prefix = path + '/'
803 prefix = cont_prefix + prefix
804 start = cont_prefix + marker if marker else None
805 before = until if until is not None else inf
806 filterq = ','.join(keys) if keys else None
808 objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
809 objects.extend([(p, None) for p in prefixes] if virtual else [])
810 objects.sort(key=lambda x: x[0])
811 objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
813 start, limit = self._list_limits([x[0] for x in objects], marker, limit)
814 return objects[start:start + limit]
818 def _check_policy(self, policy):
819 for k in policy.keys():
821 policy[k] = self.default_policy.get(k)
822 for k, v in policy.iteritems():
824 q = int(v) # May raise ValueError.
827 elif k == 'versioning':
828 if v not in ['auto', 'manual', 'none']:
833 def _put_policy(self, node, policy, replace):
835 for k, v in self.default_policy.iteritems():
838 self.node.policy_set(node, policy)
840 def _get_policy(self, node):
841 policy = self.default_policy.copy()
842 policy.update(self.node.policy_get(node))
845 # Access control functions.
847 def _check_groups(self, groups):
848 # raise ValueError('Bad characters in groups')
851 def _check_permissions(self, path, permissions):
852 # raise ValueError('Bad characters in permissions')
854 # Check for existing permissions.
855 paths = self.permissions.access_list(path)
857 ae = AttributeError()
861 def _can_read(self, user, account, container, name):
864 path = '/'.join((account, container, name))
865 if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
866 raise NotAllowedError
868 def _can_write(self, user, account, container, name):
871 path = '/'.join((account, container, name))
872 if not self.permissions.access_check(path, self.WRITE, user):
873 raise NotAllowedError
875 def _allowed_accounts(self, user):
877 for path in self.permissions.access_list_paths(user):
878 allow.add(path.split('/', 1)[0])
881 def _allowed_containers(self, user, account):
883 for path in self.permissions.access_list_paths(user, account):
884 allow.add(path.split('/', 2)[1])