Revision 4cfccdd2

b/snf-pithos-app/README
38 38
PITHOS_BACKEND_BLOCK_PATH        /tmp/pithos-data/                                 Map and block storage path
39 39
PITHOS_BACKEND_BLOCK_UMASK       0o022                                             Map and block storage umask
40 40
PITHOS_BACKEND_QUEUE_MODULE      None                                              Use ``pithos.backends.lib.rabbitmq`` to enable
41
PITHOS_BACKEND_QUEUE_CONNECTION  None                                              Format like ``rabbitmq://guest:guest@localhost:5672/pithos``
41
PITHOS_BACKEND_QUEUE_HOSTS       None                                              Format like [``amqp://guest:guest@localhost:5672``
42
PITHOS_BACKEND_QUEUE_EXCHANGE    pithos
42 43
PITHOS_BACKEND_QUOTA             50 GB (50 * 1024 ** 3)                            Default user quota
43 44
PITHOS_BACKEND_VERSIONING        auto                                              Default versioning policy for containers
44 45
PITHOS_UPDATE_MD5                True                                              Update object checksums when using hashmaps
b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf
20 20
#PITHOS_BACKEND_QUEUE_MODULE = None
21 21
# Example: 'pithos.backends.lib.rabbitmq'
22 22
#
23
#PITHOS_BACKEND_QUEUE_CONNECTION = None
24
# Example: 'rabbitmq://guest:guest@localhost:5672/pithos'
23
#PITHOS_BACKEND_QUEUE_HOSTS = None
24
# Example: ['ammq://guest:guest@localhost:5672/']
25
#
26
#PITHOS_BACKEND_QUEUE_EXHANGE = 'pithos'
25 27

  
26 28
# Default setting for new accounts.
27 29
#PITHOS_BACKEND_QUOTA = 50 * 1024 * 1024 * 1024
b/snf-pithos-app/pithos/api/dispatch.py
1 1
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
2 2
                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
3 3
                                 BACKEND_BLOCK_UMASK,
4
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION,
4
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
5
                                 BACKEND_QUEUE_EXCHANGE,
5 6
                                 BACKEND_QUOTA, BACKEND_VERSIONING)
6 7
from pithos.backends import connect_backend
7 8
from pithos.api.util import hashmap_md5
......
25 26
                              block_path=BACKEND_BLOCK_PATH,
26 27
                              block_umask=BACKEND_BLOCK_UMASK,
27 28
                              queue_module=BACKEND_QUEUE_MODULE,
28
                              queue_connection=BACKEND_QUEUE_CONNECTION)
29
                              queue_hosts=BACKEND_QUEUE_HOSTS,
30
                              queue_exchange=BACKEND_QUEUE_EXHANGE)
29 31
    backend.default_policy['quota'] = BACKEND_QUOTA
30 32
    backend.default_policy['versioning'] = BACKEND_VERSIONING
31 33

  
b/snf-pithos-app/pithos/api/management/commands/storagequota.py
38 38
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
39 39
                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
40 40
                                 BACKEND_BLOCK_UMASK,
41
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION,
41
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
42
                                 BACKEND_QUEUE_EXCHANGE,
42 43
                                 BACKEND_QUOTA, BACKEND_VERSIONING)
43 44
from pithos.backends import connect_backend
44 45

  
......
72 73
                                  block_path=BACKEND_BLOCK_PATH,
73 74
                                  block_umask=BACKEND_BLOCK_UMASK,
74 75
                                  queue_module=BACKEND_QUEUE_MODULE,
75
                                  queue_connection=BACKEND_QUEUE_CONNECTION)
76
                                  queue_connection=BACKEND_QUEUE_HOSTS,
77
                                  queue_exchange=BACKEND_QUEUE_EXCHANGE)
76 78
        backend.default_policy['quota'] = BACKEND_QUOTA
77 79
        backend.default_policy['versioning'] = BACKEND_VERSIONING
78 80
        if quota is not None:
b/snf-pithos-app/pithos/api/settings.py
37 37
# Queue for billing.
38 38
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE',
39 39
                               None)  # Example: 'pithos.backends.lib.rabbitmq'
40
BACKEND_QUEUE_CONNECTION = getattr(settings, 'PITHOS_BACKEND_QUEUE_CONNECTION', None)  # Example: 'rabbitmq://guest:guest@localhost:5672/pithos'
40
BACKEND_QUEUE_HOSTS = getattr(settings, 'PITHOS_BACKEND_QUEUE_HOSTS', None) # Example: "['amqp://guest:guest@localhost:5672']"
41
BACKEND_QUEUE_EXCHANGE = getattr(settings, 'PITHOS_BACKEND_QUEUE_EXCHANGE', 'pithos')
41 42

  
42 43
# Default setting for new accounts.
43 44
BACKEND_QUOTA = getattr(
b/snf-pithos-app/pithos/api/util.py
59 59
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
60 60
                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
61 61
                                 BACKEND_BLOCK_UMASK,
62
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION,
62
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
63
                                 BACKEND_QUEUE_EXCHANGE,
63 64
                                 BACKEND_QUOTA, BACKEND_VERSIONING,
64 65
                                 AUTHENTICATION_URL, AUTHENTICATION_USERS,
65 66
                                 SERVICE_TOKEN, COOKIE_NAME)
b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from synnefo.lib.queue import exchange_connect, exchange_send, exchange_close, Receipt
34
import json
35 35

  
36
from synnefo.lib.amqp import AMQPClient
37
from synnefo.lib.queue import Receipt
36 38

  
37 39
class Queue(object):
38 40
    """Queue.
39
       Required constructor parameters: exchange, client_id.
41
       Required constructor parameters: hosts, exchange, client_id.
40 42
    """
41 43

  
42 44
    def __init__(self, **params):
43
        exchange = params['exchange']
44
        self.conn = exchange_connect(exchange)
45
        hosts = params['hosts']
46
        self.exchange = params['exchange']
45 47
        self.client_id = params['client_id']
46 48

  
47
    def send(self, message_key, user, instance, resource, value, details):
48
        body = Receipt(
49
            self.client_id, user, instance, resource, value, details).format()
50
        exchange_send(self.conn, message_key, body)
49
        self.client = AMQPClient(hosts=hosts)
50
        self.client.connect()
51

  
52
        self.client.exchange_declare(exchange=self.exchange,
53
                                     type='topic')
51 54

  
55
    def send(self, message_key, user, instance, resource, value, details):
56
        body = Receipt(self.client_id, user, instance, resource, value, details).format()
57
        self.client.basic_publish(exchange=self.exchange,
58
                                  routing_key=message_key,
59
                                  body=json.dumps(body))
60
    
52 61
    def close(self):
53
        exchange_close(self.conn)
62
        self.client.close()
b/snf-pithos-backend/pithos/backends/modular.py
79 79
DEFAULT_BLOCK_PATH = 'data/'
80 80
DEFAULT_BLOCK_UMASK = 0o022
81 81
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
82
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
82
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
83
#DEFAULT_QUEUE_EXCHANGE = 'pithos'
83 84

  
84 85
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
85 86
QUEUE_CLIENT_ID = 'pithos'
......
127 128

  
128 129
    def __init__(self, db_module=None, db_connection=None,
129 130
                 block_module=None, block_path=None, block_umask=None,
130
                 queue_module=None, queue_connection=None):
131
                 queue_module=None, queue_hosts=None,
132
                 queue_exchange=None):
131 133
        db_module = db_module or DEFAULT_DB_MODULE
132 134
        db_connection = db_connection or DEFAULT_DB_CONNECTION
133 135
        block_module = block_module or DEFAULT_BLOCK_MODULE
134 136
        block_path = block_path or DEFAULT_BLOCK_PATH
135 137
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
136 138
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
137
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
138

  
139
        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
140
        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
141
		
139 142
        self.hash_algorithm = 'sha256'
140 143
        self.block_size = 4 * 1024 * 1024  # 4MB
141 144

  
......
165 168
                  'umask': block_umask}
166 169
        self.store = self.block_module.Store(**params)
167 170

  
168
        if queue_module and queue_connection:
171
        if queue_module and queue_hosts:
169 172
            self.queue_module = load_module(queue_module)
170
            params = {'exchange': queue_connection,
173
            params = {'hosts': queue_hosts,
174
            		  'exchange': queue_exchange,
171 175
                      'client_id': QUEUE_CLIENT_ID}
172 176
            self.queue = self.queue_module.Queue(**params)
173 177
        else:
......
1221 1225
        details.update({'user': user})
1222 1226
        logger.debug("_report_object_change: %s %s %s %s", user,
1223 1227
                     account, path, details)
1224
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
1225
            'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1228
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
1229
        					  account, QUEUE_INSTANCE_ID, 'object', path, details))
1226 1230

  
1227 1231
    def _report_sharing_change(self, user, account, path, details={}):
1228 1232
        logger.debug("_report_permissions_change: %s %s %s %s",
1229 1233
                     user, account, path, details)
1230 1234
        details.update({'user': user})
1231
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
1232
                             QUEUE_INSTANCE_ID, 'sharing', path, details))
1235
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
1236
        					  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1233 1237

  
1234 1238
    # Policy functions.
1235 1239

  
b/snf-pithos-tools/pithos/tools/dispatcher.py
35 35

  
36 36
import sys
37 37
import logging
38
import json
38 39

  
39
from synnefo.lib.queue import (exchange_connect, exchange_close,
40
                               exchange_send, exchange_route, queue_callback, queue_start)
40
from synnefo.lib.amqp import AMQPClient
41 41

  
42 42
from optparse import OptionParser
43 43

  
......
94 94
        level=logging.DEBUG if DEBUG else logging.INFO)
95 95
    logger = logging.getLogger('dispatcher')
96 96

  
97
    exchange = 'rabbitmq://%s:%s@%s:%s/%s' % (
98
        opts.user, opts.password, opts.host, opts.port, opts.exchange)
99
    connection = exchange_connect(exchange)
97
    host =  'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port)
98
    queue = opts.queue
99
    key = opts.key
100
    exchange = opts.exchange
101
    
102
    client = AMQPClient(hosts=[host])
103
    client.connect()
104

  
100 105
    if opts.test:
101
        exchange_send(connection, opts.key, {"test": "0123456789"})
102
        exchange_close(connection)
106
        client.exchange_declare(exchange=exchange,
107
                                type='topic')
108
        client.basic_publish(exchange=exchange,
109
                             routing_key=key,
110
                             body= json.dumps({"test": "0123456789"}))
111
        client.close()
103 112
        sys.exit()
104 113

  
105 114
    callback = None
......
110 119
            cb_module = sys.modules[cb[0]]
111 120
            callback = getattr(cb_module, cb[1])
112 121

  
113
    def handle_message(msg):
114
        print msg
122
    def handle_message(client, msg):
115 123
        logger.debug('%s', msg)
116 124
        if callback:
117 125
            callback(msg)
126
        client.basic_ack(msg)
127

  
128
    client.queue_declare(queue=queue)
129
    client.queue_bind(queue=queue,
130
                      exchange=exchange,
131
                      routing_key=key)
132

  
133
    client.basic_consume(queue=queue, callback=handle_message)
118 134

  
119
    exchange_route(connection, opts.key, opts.queue)
120
    queue_callback(connection, opts.queue, handle_message)
121 135
    try:
122
        queue_start(connection)
136
        while True:
137
            client.basic_wait()
123 138
    except KeyboardInterrupt:
124 139
        pass
140
    finally:
141
        client.close()
125 142

  
126 143

  
127 144
if __name__ == '__main__':

Also available in: Unified diff