class Queue(object):
"""Queue.
- Required constructor parameters: exchange, message_key, client_id.
+ Required constructor parameters: exchange, client_id.
"""
def __init__(self, **params):
exchange = params['exchange']
self.conn = exchange_connect(exchange)
- self.message_key = params['message_key']
self.client_id = params['client_id']
- def send(self, user, resource, value, details):
+ def send(self, message_key, user, resource, value, details):
body = Receipt(self.client_id, user, resource, value, details).format()
- exchange_send(self.conn, self.message_key, body)
+ exchange_send(self.conn, message_key, body)
def close(self):
exchange_close(self.conn)
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
-QUEUE_MESSAGE_KEY = 'pithos'
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
QUEUE_CLIENT_ID = 'pithos'
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
if queue_module and queue_connection:
self.queue_module = load_module(queue_module)
params = {'exchange': queue_connection,
- 'message_key': QUEUE_MESSAGE_KEY,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node)[1]
details.update({'user': user, 'total': total})
- self.messages.append((account, 'diskspace', size, details))
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
def _report_object_change(self, user, account, path, details={}):
logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
details.update({'user': user})
- self.messages.append((account, 'object', path, details))
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
# Policy functions.