Revision 8d9a3fbd

b/snf-pithos-app/README
45 45

  
46 46
To update checksums asynchronously, enable the queue, install snf-pithos-tools and use ``pithos-dispatcher``::
47 47

  
48
    pithos-dispatcher --exchange=pithos --callback=pithos.api.dispatch.update_md5
48
    pithos-dispatcher --exchange=pithos --key=pithos.object --callback=pithos.api.dispatch.update_md5
49 49

  
50 50
Administrator functions
51 51
-----------------------
b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py
36 36

  
37 37
class Queue(object):
38 38
    """Queue.
39
       Required constructor parameters: exchange, message_key, client_id.
39
       Required constructor parameters: exchange, client_id.
40 40
    """
41 41
    
42 42
    def __init__(self, **params):
43 43
        exchange = params['exchange']
44 44
        self.conn = exchange_connect(exchange)
45
        self.message_key = params['message_key']
46 45
        self.client_id = params['client_id']
47 46
    
48
    def send(self, user, resource, value, details):
47
    def send(self, message_key, user, resource, value, details):
49 48
        body = Receipt(self.client_id, user, resource, value, details).format()
50
        exchange_send(self.conn, self.message_key, body)
49
        exchange_send(self.conn, message_key, body)
51 50
    
52 51
    def close(self):
53 52
        exchange_close(self.conn)
b/snf-pithos-backend/pithos/backends/modular.py
77 77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78 78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79 79

  
80
QUEUE_MESSAGE_KEY = 'pithos'
80
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81 81
QUEUE_CLIENT_ID = 'pithos'
82 82

  
83 83
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
......
157 157
        if queue_module and queue_connection:
158 158
            self.queue_module = load_module(queue_module)
159 159
            params = {'exchange': queue_connection,
160
                      'message_key': QUEUE_MESSAGE_KEY,
161 160
                      'client_id': QUEUE_CLIENT_ID}
162 161
            self.queue = self.queue_module.Queue(**params)
163 162
        else:
......
1011 1010
        account_node = self._lookup_account(account, True)[1]
1012 1011
        total = self._get_statistics(account_node)[1]
1013 1012
        details.update({'user': user, 'total': total})
1014
        self.messages.append((account, 'diskspace', size, details))
1013
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
1015 1014
    
1016 1015
    def _report_object_change(self, user, account, path, details={}):
1017 1016
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1018 1017
        details.update({'user': user})
1019
        self.messages.append((account, 'object', path, details))
1018
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
1020 1019
    
1021 1020
    # Policy functions.
1022 1021
    

Also available in: Unified diff