Different queue message types use different keys.
[pithos] / snf-pithos-backend / pithos / backends / modular.py
index c90a6e1..576ec80 100644 (file)
@@ -77,8 +77,8 @@ DEFAULT_BLOCK_PATH = 'data/'
 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
 
-QUEUE_MESSAGE_KEY = '#'
-QUEUE_CLIENT_ID = 2 # Pithos.
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
+QUEUE_CLIENT_ID = 'pithos'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -101,8 +101,11 @@ def backend_method(func=None, autocommit=1):
     def fn(self, *args, **kw):
         self.wrapper.execute()
         try:
+            self.messages = []
             ret = func(self, *args, **kw)
             self.wrapper.commit()
+            for m in self.messages:
+                self.queue.send(*m)
             return ret
         except:
             self.wrapper.rollback()
@@ -154,7 +157,6 @@ class ModularBackend(BaseBackend):
         if queue_module and queue_connection:
             self.queue_module = load_module(queue_module)
             params = {'exchange': queue_connection,
-                      'message_key': QUEUE_MESSAGE_KEY,
                       'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
@@ -381,7 +383,11 @@ class ModularBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
-        self._put_metadata(user, node, domain, meta, replace)
+        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+        if src_version_id is not None:
+            versioning = self._get_policy(node)['versioning']
+            if versioning != 'auto':
+                self.node.version_remove(src_version_id)
     
     @backend_method
     def get_container_policy(self, user, account, container):
@@ -632,7 +638,7 @@ class ModularBackend(BaseBackend):
         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
+    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
@@ -645,6 +651,11 @@ class ModularBackend(BaseBackend):
         path, node = self._put_object_node(container_path, container_node, name)
         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
         
+        # Handle meta.
+        if src_version_id is None:
+            src_version_id = pre_version_id
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
+        
         # Check quota.
         del_size = self._apply_versioning(account, container, pre_version_id)
         size_delta = size - del_size
@@ -659,7 +670,9 @@ class ModularBackend(BaseBackend):
         
         if permissions is not None:
             self.permissions.access_set(path, permissions)
-        return pre_version_id, dest_version_id
+        
+        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
+        return dest_version_id
     
     @backend_method
     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
@@ -677,8 +690,7 @@ class ModularBackend(BaseBackend):
             raise ie
         
         hash = map.hash()
-        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
-        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
         self.store.map_put(hash, map)
         return dest_version_id
     
@@ -706,8 +718,7 @@ class ModularBackend(BaseBackend):
         size = props[self.SIZE]
         
         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
-        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
-        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
+        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
         return dest_version_id
     
     @backend_method
@@ -762,6 +773,7 @@ class ModularBackend(BaseBackend):
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+        self._report_object_change(user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
     
     @backend_method
@@ -998,7 +1010,12 @@ class ModularBackend(BaseBackend):
         account_node = self._lookup_account(account, True)[1]
         total = self._get_statistics(account_node)[1]
         details.update({'user': user, 'total': total})
-        self.queue.send(account, 'diskspace', size, details)
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
+    
+    def _report_object_change(self, user, account, path, details={}):
+        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
+        details.update({'user': user})
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
     
     # Policy functions.
     
@@ -1061,7 +1078,7 @@ class ModularBackend(BaseBackend):
             if node is not None:
                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
             if props is not None:
-                if props[self.TYPE] in ('application/directory', 'application/folder'):
+                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
                 formatted.append((p, self.MATCH_EXACT))
         return formatted
@@ -1081,7 +1098,7 @@ class ModularBackend(BaseBackend):
                 if node is not None:
                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
                 if props is not None:
-                    if props[self.TYPE] in ('application/directory', 'application/folder'):
+                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
                         return p
         return None