if w:
self.feature_setmany(feature, WRITE, w)
+ def access_get(self, path):
+ feature = self.xfeature_get(path)
+ if not feature:
+ return False
+ permissions = self.feature_dict(feature)
+ if READ in permissions:
+ permissions['read'] = permissions[READ]
+ del(permissions[READ])
+ if WRITE in permissions:
+ permissions['write'] = permissions[WRITE]
+ del(permissions[WRITE])
+ return permissions
+
def access_clear(self, path):
"""Revoke access to path (both permissions and public)."""
if not r:
return []
- def get_permissions(feature):
- permissions = self.feature_dict(feature)
- if READ in permissions:
- permissions['read'] = permissions[READ]
- del(permissions[READ])
- if WRITE in permissions:
- permissions['write'] = permissions[WRITE]
- del(permissions[WRITE])
- return permissions
-
# Only keep path components.
parts = path.rstrip('/').split('/')
valid = []
subp = '/'.join(parts[:i + 1])
valid.append(subp)
valid.append(subp + '/')
- return [(x[0], get_permissions(x[1])) for x in r if x[0] in valid]
+ return [x[0] for x in r if x[0] in valid]
def access_list_paths(self, member, prefix=None):
"""Return the list of paths granted to member."""
self.node.node_remove(node)
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
- # XXX: Up to here...
-
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
"""Return a list of objects existing under a container."""
if not allowed:
return []
path, node = self._lookup_container(account, container)
+ # XXX: Format allowed...
return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
@backend_method
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
+ # XXX: Format allowed...
return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
@backend_method
logger.debug("get_object_permissions: %s %s %s", account, container, name)
allowed = 'write'
+ permissions_path = self._get_permissions_path(account, container, name)
if user != account:
- path = '/'.join((account, container, name))
- if self.permissions.access_check(path, self.WRITE, user):
+ if self.permissions.access_check(permissions_path, self.WRITE, user):
allowed = 'write'
- elif self.permissions.access_check(path, self.READ, user):
+ elif self.permissions.access_check(permissions_path, self.READ, user):
allowed = 'read'
else:
raise NotAllowedError
- path = self._lookup_object(account, container, name)[0]
- return (allowed,) + self.permissions.access_inherit(path)
+ self._lookup_object(account, container, name)
+ return (allowed, permissions_path, self.permissions.access_get(permissions_path))
@backend_method
def update_object_permissions(self, user, account, container, name, permissions):
# raise ValueError('Bad characters in permissions')
pass
+ def _get_formatted_paths(self, paths):
+ formatted = []
+ for p in paths:
+ node = self.node.node_lookup(p)
+ if node is not None:
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is not None:
+ # XXX: Put type in properties...
+ meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
+ if meta['Content-Type'] == 'application/directory':
+ formatted.append((p.rstrip('/') + '/', 'prefix'))
+ else:
+ formatted.append((p, 'exact'))
+ return formatted
+
def _get_permissions_path(self, account, container, name):
path = '/'.join((account, container, name))
permission_paths = self.permissions.access_inherit(path)
if p == path:
return p
else:
- try:
- parts = p.split('/', 2)
- if len(parts) != 3:
- return None
- path, node = self._lookup_object(*p.split('/', 2))
- props = self._get_version(node)
+ if p.count('/') < 3:
+ return None
+ node = self.node.node_lookup(p)
+ if node is not None:
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is not None:
# XXX: Put type in properties...
meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
if meta['Content-Type'] == 'application/directory':
return p
- except NameError:
- pass
return None
def _can_read(self, user, account, container, name):