Report object changes to the queue.
authorAntony Chazapis <chazapis@gmail.com>
Mon, 2 Apr 2012 22:17:21 +0000 (01:17 +0300)
committerAntony Chazapis <chazapis@gmail.com>
Mon, 2 Apr 2012 22:17:21 +0000 (01:17 +0300)
Refs #1792

snf-pithos-backend/pithos/backends/modular.py

index 82cc20f..6f6b345 100644 (file)
@@ -77,8 +77,8 @@ DEFAULT_BLOCK_PATH = 'data/'
 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
 
-QUEUE_MESSAGE_KEY = '#'
-QUEUE_CLIENT_ID = 2 # Pithos.
+QUEUE_MESSAGE_KEY = 'pithos'
+QUEUE_CLIENT_ID = 'pithos'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -101,8 +101,11 @@ def backend_method(func=None, autocommit=1):
     def fn(self, *args, **kw):
         self.wrapper.execute()
         try:
+            self.messages = []
             ret = func(self, *args, **kw)
             self.wrapper.commit()
+            for m in self.messages:
+                self.queue.send(*m)
             return ret
         except:
             self.wrapper.rollback()
@@ -668,6 +671,8 @@ class ModularBackend(BaseBackend):
         
         if permissions is not None:
             self.permissions.access_set(path, permissions)
+        
+        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
         return dest_version_id
     
     @backend_method
@@ -769,6 +774,7 @@ class ModularBackend(BaseBackend):
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+        self._report_object_change(user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
     
     @backend_method
@@ -1005,7 +1011,12 @@ class ModularBackend(BaseBackend):
         account_node = self._lookup_account(account, True)[1]
         total = self._get_statistics(account_node)[1]
         details.update({'user': user, 'total': total})
-        self.queue.send(account, 'diskspace', size, details)
+        self.messages.append((account, 'diskspace', size, details))
+    
+    def _report_object_change(self, user, account, path, details={}):
+        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
+        details.update({'user': user})
+        self.messages.append((account, 'object', path, details))
     
     # Policy functions.