include path in messages sent to aquarium
[pithos] / snf-pithos-backend / pithos / backends / modular.py
index b3ab147..8b6458f 100644 (file)
@@ -39,7 +39,8 @@ import logging
 import hashlib
 import binascii
 
-from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
+    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
 
 # Stripped-down version of the HashMap class found in tools.
 class HashMap(list):
@@ -292,7 +293,7 @@ class ModularBackend(BaseBackend):
             raise NotAllowedError
         node = self.node.node_lookup(account)
         if node is not None:
-            raise NameError('Account already exists')
+            raise AccountExists('Account already exists')
         if policy:
             self._check_policy(policy)
         node = self._put_path(user, self.ROOTNODE, account)
@@ -309,7 +310,7 @@ class ModularBackend(BaseBackend):
         if node is None:
             return
         if not self.node.node_remove(node):
-            raise IndexError('Account is not empty')
+            raise AccountNotEmpty('Account is not empty')
         self.permissions.group_destroy(account)
     
     @backend_method
@@ -324,13 +325,12 @@ class ModularBackend(BaseBackend):
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
         if shared or public:
-            allowed = []
+            allowed = set()
             if shared:
-                allowed.extend([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
+                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
             if public:
-                allowed.extend([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
-            allowed = list(set(allowed))
-            allowed.sort()
+                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
+            allowed = sorted(allowed)
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
         node = self.node.node_lookup(account)
@@ -435,7 +435,7 @@ class ModularBackend(BaseBackend):
         except NameError:
             pass
         else:
-            raise NameError('Container already exists')
+            raise ContainerExists('Container already exists')
         if policy:
             self._check_policy(policy)
         path = '/'.join((account, container))
@@ -456,17 +456,32 @@ class ModularBackend(BaseBackend):
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
-            self._report_size_change(user, account, -size, {'action': 'container purge'})
+            self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path})
             return
         
-        if self._get_statistics(node)[0] > 0:
-            raise IndexError('Container is not empty')
-        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
-        for h in hashes:
-            self.store.map_delete(h)
-        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
-        self.node.node_remove(node)
-        self._report_size_change(user, account, -size, {'action': 'container delete'})
+        if not delimiter:
+            if self._get_statistics(node)[0] > 0:
+                raise ContainerNotEmpty('Container is not empty')
+            hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+            for h in hashes:
+                self.store.map_delete(h)
+            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+            self.node.node_remove(node)
+            self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path})
+        else:
+               # remove only contents
+            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            paths = []
+            for t in src_names:
+                path = '/'.join((account, container, t[0]))
+                node = t[2]
+                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+                del_size = self._apply_versioning(account, container, src_version_id)
+                if del_size:
+                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
+                self._report_object_change(user, account, path, details={'action': 'object delete'})
+                paths.append(path)
+            self.permissions.access_clear_bulk(paths)
     
     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
         if user != account and until:
@@ -474,14 +489,15 @@ class ModularBackend(BaseBackend):
         if shared and public:
             # get shared first
             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
-            objects = []
+            objects = set()
             if shared:
                 path, node = self._lookup_container(account, container)
                 shared = self._get_formatted_paths(shared)
-                objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
+                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
             
             # get public
-            objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
+            objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
+            objects = list(objects)
             
             objects.sort(key=lambda x: x[0])
             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
@@ -529,13 +545,12 @@ class ModularBackend(BaseBackend):
             if not allowed:
                 raise NotAllowedError
         else:
-            allowed = []
+            allowed = set()
             if shared:
-                allowed.extend(self.permissions.access_list_shared(path))
+                allowed.update(self.permissions.access_list_shared(path))
             if public:
-                allowed.extend([x[0] for x in self.permissions.public_list(path)])
-            allowed = list(set(allowed))
-            allowed.sort()
+                allowed.update([x[0] for x in self.permissions.public_list(path)])
+            allowed = sorted(allowed)
             if not allowed:
                 return []
         return allowed
@@ -603,7 +618,7 @@ class ModularBackend(BaseBackend):
             except NameError: # Object may be deleted.
                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
                 if del_props is None:
-                    raise NameError('Object does not exist')
+                    raise ItemNotExists('Object does not exist')
                 modified = del_props[self.MTIME]
         
         meta = {}
@@ -726,7 +741,7 @@ class ModularBackend(BaseBackend):
                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
-        self._report_size_change(user, account, size_delta, {'action': 'object update'})
+        self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path})
         
         if permissions is not None:
             self.permissions.access_set(path, permissions)
@@ -785,7 +800,8 @@ class ModularBackend(BaseBackend):
         
         if delimiter:
             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
-            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            src_names.sort(key=lambda x: x[2]) # order by nodes
             paths = [elem[0] for elem in src_names]
             nodes = [elem[2] for elem in src_names]
             # TODO: Will do another fetch of the properties in duplicate version...
@@ -795,9 +811,10 @@ class ModularBackend(BaseBackend):
                 src_version_id = prop[self.SERIAL]
                 hash = prop[self.HASH]
                 vtype = prop[self.TYPE]
+                size = prop[self.SIZE]
                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
                 vdest_name = path.replace(prefix, dest_prefix, 1)
-                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
                        self._delete_object(user, src_account, src_container, path)
         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
@@ -844,20 +861,20 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
-            self._report_size_change(user, account, -size, {'action': 'object purge'})
+            self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path})
             return
         
         path, node = self._lookup_object(account, container, name)
         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
-            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+            self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
         self._report_object_change(user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
         
         if delimiter:
             prefix = name + delimiter if not name.endswith(delimiter) else name
-            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
             paths = []
             for t in src_names:
                path = '/'.join((account, container, t[0]))
@@ -865,7 +882,7 @@ class ModularBackend(BaseBackend):
                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
                 del_size = self._apply_versioning(account, container, src_version_id)
                 if del_size:
-                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
                 self._report_object_change(user, account, path, details={'action': 'object delete'})
                 paths.append(path)
             self.permissions.access_clear_bulk(paths)
@@ -921,7 +938,7 @@ class ModularBackend(BaseBackend):
         logger.debug("get_block: %s", hash)
         block = self.store.block_get(binascii.unhexlify(hash))
         if not block:
-            raise NameError('Block does not exist')
+            raise ItemNotExists('Block does not exist')
         return block
     
     @backend_method(autocommit=0)
@@ -968,20 +985,18 @@ class ModularBackend(BaseBackend):
         path = '/'.join((account, container))
         node = self.node.node_lookup(path)
         if node is None:
-            raise NameError('Container does not exist')
+            raise ItemNotExists('Container does not exist')
         return path, node
     
     def _lookup_object(self, account, container, name):
         path = '/'.join((account, container, name))
         node = self.node.node_lookup(path)
         if node is None:
-            raise NameError('Object does not exist')
+            raise ItemNotExists('Object does not exist')
         return path, node
     
     def _lookup_objects(self, paths):
-       nodes = self.node.node_lookup_bulk(paths)
-        if nodes is None:
-            raise NameError('Object does not exist')
+        nodes = self.node.node_lookup_bulk(paths)
         return paths, nodes
     
     def _get_properties(self, node, until=None):
@@ -992,7 +1007,7 @@ class ModularBackend(BaseBackend):
         if props is None and until is not None:
             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
         if props is None:
-            raise NameError('Path does not exist')
+            raise ItemNotExists('Path does not exist')
         return props
     
     def _get_statistics(self, node, until=None):
@@ -1010,31 +1025,19 @@ class ModularBackend(BaseBackend):
         if version is None:
             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
             if props is None:
-                raise NameError('Object does not exist')
+                raise ItemNotExists('Object does not exist')
         else:
             try:
                 version = int(version)
             except ValueError:
-                raise IndexError('Version does not exist')
+                raise VersionNotExists('Version does not exist')
             props = self.node.version_get_properties(version)
             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
-                raise IndexError('Version does not exist')
+                raise VersionNotExists('Version does not exist')
         return props
 
-    def _get_versions(self, nodes, version=None):
-        if version is None:
-            props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
-            if not props:
-                raise NameError('Object does not exist')
-        else:
-            try:
-                version = int(version)
-            except ValueError:
-                raise IndexError('Version does not exist')
-            props = self.node.version_get_properties(version)
-            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
-                raise IndexError('Version does not exist')
-        return props
+    def _get_versions(self, nodes):
+        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
     
     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
         """Create a new version of the node."""
@@ -1119,15 +1122,15 @@ class ModularBackend(BaseBackend):
     # Reporting functions.
     
     def _report_size_change(self, user, account, size, details={}):
-        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
         account_node = self._lookup_account(account, True)[1]
         total = self._get_statistics(account_node)[1]
         details.update({'user': user, 'total': total})
+        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
     
     def _report_object_change(self, user, account, path, details={}):
-        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
         details.update({'user': user})
+        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
     
     def _report_sharing_change(self, user, account, path, details={}):