Revision 8d9a3fbd
b/snf-pithos-app/README | ||
---|---|---|
45 | 45 |
|
46 | 46 |
To update checksums asynchronously, enable the queue, install snf-pithos-tools and use ``pithos-dispatcher``:: |
47 | 47 |
|
48 |
pithos-dispatcher --exchange=pithos --callback=pithos.api.dispatch.update_md5 |
|
48 |
pithos-dispatcher --exchange=pithos --key=pithos.object --callback=pithos.api.dispatch.update_md5
|
|
49 | 49 |
|
50 | 50 |
Administrator functions |
51 | 51 |
----------------------- |
b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py | ||
---|---|---|
36 | 36 |
|
37 | 37 |
class Queue(object): |
38 | 38 |
"""Queue. |
39 |
Required constructor parameters: exchange, message_key, client_id.
|
|
39 |
Required constructor parameters: exchange, client_id. |
|
40 | 40 |
""" |
41 | 41 |
|
42 | 42 |
def __init__(self, **params): |
43 | 43 |
exchange = params['exchange'] |
44 | 44 |
self.conn = exchange_connect(exchange) |
45 |
self.message_key = params['message_key'] |
|
46 | 45 |
self.client_id = params['client_id'] |
47 | 46 |
|
48 |
def send(self, user, resource, value, details): |
|
47 |
def send(self, message_key, user, resource, value, details):
|
|
49 | 48 |
body = Receipt(self.client_id, user, resource, value, details).format() |
50 |
exchange_send(self.conn, self.message_key, body)
|
|
49 |
exchange_send(self.conn, message_key, body) |
|
51 | 50 |
|
52 | 51 |
def close(self): |
53 | 52 |
exchange_close(self.conn) |
b/snf-pithos-backend/pithos/backends/modular.py | ||
---|---|---|
77 | 77 |
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' |
78 | 78 |
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' |
79 | 79 |
|
80 |
QUEUE_MESSAGE_KEY = 'pithos'
|
|
80 |
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
|
|
81 | 81 |
QUEUE_CLIENT_ID = 'pithos' |
82 | 82 |
|
83 | 83 |
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) |
... | ... | |
157 | 157 |
if queue_module and queue_connection: |
158 | 158 |
self.queue_module = load_module(queue_module) |
159 | 159 |
params = {'exchange': queue_connection, |
160 |
'message_key': QUEUE_MESSAGE_KEY, |
|
161 | 160 |
'client_id': QUEUE_CLIENT_ID} |
162 | 161 |
self.queue = self.queue_module.Queue(**params) |
163 | 162 |
else: |
... | ... | |
1011 | 1010 |
account_node = self._lookup_account(account, True)[1] |
1012 | 1011 |
total = self._get_statistics(account_node)[1] |
1013 | 1012 |
details.update({'user': user, 'total': total}) |
1014 |
self.messages.append((account, 'diskspace', size, details)) |
|
1013 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
|
|
1015 | 1014 |
|
1016 | 1015 |
def _report_object_change(self, user, account, path, details={}): |
1017 | 1016 |
logger.debug("_report_object_change: %s %s %s %s", user, account, path, details) |
1018 | 1017 |
details.update({'user': user}) |
1019 |
self.messages.append((account, 'object', path, details)) |
|
1018 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
|
|
1020 | 1019 |
|
1021 | 1020 |
# Policy functions. |
1022 | 1021 |
|
Also available in: Unified diff