Different queue message types use different keys.
authorAntony Chazapis <chazapis@gmail.com>
Tue, 3 Apr 2012 15:45:56 +0000 (18:45 +0300)
committerAntony Chazapis <chazapis@gmail.com>
Tue, 3 Apr 2012 15:45:56 +0000 (18:45 +0300)
snf-pithos-app/README
snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py
snf-pithos-backend/pithos/backends/modular.py

index b506e8b..42d3a50 100644 (file)
@@ -45,7 +45,7 @@ PITHOS_UPDATE_MD5                True
 
 To update checksums asynchronously, enable the queue, install snf-pithos-tools and use ``pithos-dispatcher``::
 
 
 To update checksums asynchronously, enable the queue, install snf-pithos-tools and use ``pithos-dispatcher``::
 
-    pithos-dispatcher --exchange=pithos --callback=pithos.api.dispatch.update_md5
+    pithos-dispatcher --exchange=pithos --key=pithos.object --callback=pithos.api.dispatch.update_md5
 
 Administrator functions
 -----------------------
 
 Administrator functions
 -----------------------
index 747e476..c304a6e 100644 (file)
@@ -36,18 +36,17 @@ from synnefo.lib.queue import exchange_connect, exchange_send, exchange_close, R
 
 class Queue(object):
     """Queue.
 
 class Queue(object):
     """Queue.
-       Required constructor parameters: exchange, message_key, client_id.
+       Required constructor parameters: exchange, client_id.
     """
     
     def __init__(self, **params):
         exchange = params['exchange']
         self.conn = exchange_connect(exchange)
     """
     
     def __init__(self, **params):
         exchange = params['exchange']
         self.conn = exchange_connect(exchange)
-        self.message_key = params['message_key']
         self.client_id = params['client_id']
     
         self.client_id = params['client_id']
     
-    def send(self, user, resource, value, details):
+    def send(self, message_key, user, resource, value, details):
         body = Receipt(self.client_id, user, resource, value, details).format()
         body = Receipt(self.client_id, user, resource, value, details).format()
-        exchange_send(self.conn, self.message_key, body)
+        exchange_send(self.conn, message_key, body)
     
     def close(self):
         exchange_close(self.conn)
     
     def close(self):
         exchange_close(self.conn)
index 6f6b345..576ec80 100644 (file)
@@ -77,7 +77,7 @@ DEFAULT_BLOCK_PATH = 'data/'
 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
 
 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
 
-QUEUE_MESSAGE_KEY = 'pithos'
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
 QUEUE_CLIENT_ID = 'pithos'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 QUEUE_CLIENT_ID = 'pithos'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
@@ -157,7 +157,6 @@ class ModularBackend(BaseBackend):
         if queue_module and queue_connection:
             self.queue_module = load_module(queue_module)
             params = {'exchange': queue_connection,
         if queue_module and queue_connection:
             self.queue_module = load_module(queue_module)
             params = {'exchange': queue_connection,
-                      'message_key': QUEUE_MESSAGE_KEY,
                       'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
                       'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
@@ -1011,12 +1010,12 @@ class ModularBackend(BaseBackend):
         account_node = self._lookup_account(account, True)[1]
         total = self._get_statistics(account_node)[1]
         details.update({'user': user, 'total': total})
         account_node = self._lookup_account(account, True)[1]
         total = self._get_statistics(account_node)[1]
         details.update({'user': user, 'total': total})
-        self.messages.append((account, 'diskspace', size, details))
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
     
     def _report_object_change(self, user, account, path, details={}):
         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
         details.update({'user': user})
     
     def _report_object_change(self, user, account, path, details={}):
         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
         details.update({'user': user})
-        self.messages.append((account, 'object', path, details))
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
     
     # Policy functions.
     
     
     # Policy functions.