Revision 4cfccdd2
b/snf-pithos-app/README | ||
---|---|---|
38 | 38 |
PITHOS_BACKEND_BLOCK_PATH /tmp/pithos-data/ Map and block storage path |
39 | 39 |
PITHOS_BACKEND_BLOCK_UMASK 0o022 Map and block storage umask |
40 | 40 |
PITHOS_BACKEND_QUEUE_MODULE None Use ``pithos.backends.lib.rabbitmq`` to enable |
41 |
PITHOS_BACKEND_QUEUE_CONNECTION None Format like ``rabbitmq://guest:guest@localhost:5672/pithos`` |
|
41 |
PITHOS_BACKEND_QUEUE_HOSTS None Format like [``amqp://guest:guest@localhost:5672`` |
|
42 |
PITHOS_BACKEND_QUEUE_EXCHANGE pithos |
|
42 | 43 |
PITHOS_BACKEND_QUOTA 50 GB (50 * 1024 ** 3) Default user quota |
43 | 44 |
PITHOS_BACKEND_VERSIONING auto Default versioning policy for containers |
44 | 45 |
PITHOS_UPDATE_MD5 True Update object checksums when using hashmaps |
b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf | ||
---|---|---|
20 | 20 |
#PITHOS_BACKEND_QUEUE_MODULE = None |
21 | 21 |
# Example: 'pithos.backends.lib.rabbitmq' |
22 | 22 |
# |
23 |
#PITHOS_BACKEND_QUEUE_CONNECTION = None |
|
24 |
# Example: 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
23 |
#PITHOS_BACKEND_QUEUE_HOSTS = None |
|
24 |
# Example: ['ammq://guest:guest@localhost:5672/'] |
|
25 |
# |
|
26 |
#PITHOS_BACKEND_QUEUE_EXHANGE = 'pithos' |
|
25 | 27 |
|
26 | 28 |
# Default setting for new accounts. |
27 | 29 |
#PITHOS_BACKEND_QUOTA = 50 * 1024 * 1024 * 1024 |
b/snf-pithos-app/pithos/api/dispatch.py | ||
---|---|---|
1 | 1 |
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION, |
2 | 2 |
BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH, |
3 | 3 |
BACKEND_BLOCK_UMASK, |
4 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION, |
|
4 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS, |
|
5 |
BACKEND_QUEUE_EXCHANGE, |
|
5 | 6 |
BACKEND_QUOTA, BACKEND_VERSIONING) |
6 | 7 |
from pithos.backends import connect_backend |
7 | 8 |
from pithos.api.util import hashmap_md5 |
... | ... | |
25 | 26 |
block_path=BACKEND_BLOCK_PATH, |
26 | 27 |
block_umask=BACKEND_BLOCK_UMASK, |
27 | 28 |
queue_module=BACKEND_QUEUE_MODULE, |
28 |
queue_connection=BACKEND_QUEUE_CONNECTION) |
|
29 |
queue_hosts=BACKEND_QUEUE_HOSTS, |
|
30 |
queue_exchange=BACKEND_QUEUE_EXHANGE) |
|
29 | 31 |
backend.default_policy['quota'] = BACKEND_QUOTA |
30 | 32 |
backend.default_policy['versioning'] = BACKEND_VERSIONING |
31 | 33 |
|
b/snf-pithos-app/pithos/api/management/commands/storagequota.py | ||
---|---|---|
38 | 38 |
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION, |
39 | 39 |
BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH, |
40 | 40 |
BACKEND_BLOCK_UMASK, |
41 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION, |
|
41 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS, |
|
42 |
BACKEND_QUEUE_EXCHANGE, |
|
42 | 43 |
BACKEND_QUOTA, BACKEND_VERSIONING) |
43 | 44 |
from pithos.backends import connect_backend |
44 | 45 |
|
... | ... | |
72 | 73 |
block_path=BACKEND_BLOCK_PATH, |
73 | 74 |
block_umask=BACKEND_BLOCK_UMASK, |
74 | 75 |
queue_module=BACKEND_QUEUE_MODULE, |
75 |
queue_connection=BACKEND_QUEUE_CONNECTION) |
|
76 |
queue_connection=BACKEND_QUEUE_HOSTS, |
|
77 |
queue_exchange=BACKEND_QUEUE_EXCHANGE) |
|
76 | 78 |
backend.default_policy['quota'] = BACKEND_QUOTA |
77 | 79 |
backend.default_policy['versioning'] = BACKEND_VERSIONING |
78 | 80 |
if quota is not None: |
b/snf-pithos-app/pithos/api/settings.py | ||
---|---|---|
37 | 37 |
# Queue for billing. |
38 | 38 |
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', |
39 | 39 |
None) # Example: 'pithos.backends.lib.rabbitmq' |
40 |
BACKEND_QUEUE_CONNECTION = getattr(settings, 'PITHOS_BACKEND_QUEUE_CONNECTION', None) # Example: 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
40 |
BACKEND_QUEUE_HOSTS = getattr(settings, 'PITHOS_BACKEND_QUEUE_HOSTS', None) # Example: "['amqp://guest:guest@localhost:5672']" |
|
41 |
BACKEND_QUEUE_EXCHANGE = getattr(settings, 'PITHOS_BACKEND_QUEUE_EXCHANGE', 'pithos') |
|
41 | 42 |
|
42 | 43 |
# Default setting for new accounts. |
43 | 44 |
BACKEND_QUOTA = getattr( |
b/snf-pithos-app/pithos/api/util.py | ||
---|---|---|
59 | 59 |
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION, |
60 | 60 |
BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH, |
61 | 61 |
BACKEND_BLOCK_UMASK, |
62 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION, |
|
62 |
BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS, |
|
63 |
BACKEND_QUEUE_EXCHANGE, |
|
63 | 64 |
BACKEND_QUOTA, BACKEND_VERSIONING, |
64 | 65 |
AUTHENTICATION_URL, AUTHENTICATION_USERS, |
65 | 66 |
SERVICE_TOKEN, COOKIE_NAME) |
b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from synnefo.lib.queue import exchange_connect, exchange_send, exchange_close, Receipt
|
|
34 |
import json
|
|
35 | 35 |
|
36 |
from synnefo.lib.amqp import AMQPClient |
|
37 |
from synnefo.lib.queue import Receipt |
|
36 | 38 |
|
37 | 39 |
class Queue(object): |
38 | 40 |
"""Queue. |
39 |
Required constructor parameters: exchange, client_id. |
|
41 |
Required constructor parameters: hosts, exchange, client_id.
|
|
40 | 42 |
""" |
41 | 43 |
|
42 | 44 |
def __init__(self, **params): |
43 |
exchange = params['exchange']
|
|
44 |
self.conn = exchange_connect(exchange)
|
|
45 |
hosts = params['hosts']
|
|
46 |
self.exchange = params['exchange']
|
|
45 | 47 |
self.client_id = params['client_id'] |
46 | 48 |
|
47 |
def send(self, message_key, user, instance, resource, value, details): |
|
48 |
body = Receipt( |
|
49 |
self.client_id, user, instance, resource, value, details).format() |
|
50 |
exchange_send(self.conn, message_key, body) |
|
49 |
self.client = AMQPClient(hosts=hosts) |
|
50 |
self.client.connect() |
|
51 |
|
|
52 |
self.client.exchange_declare(exchange=self.exchange, |
|
53 |
type='topic') |
|
51 | 54 |
|
55 |
def send(self, message_key, user, instance, resource, value, details): |
|
56 |
body = Receipt(self.client_id, user, instance, resource, value, details).format() |
|
57 |
self.client.basic_publish(exchange=self.exchange, |
|
58 |
routing_key=message_key, |
|
59 |
body=json.dumps(body)) |
|
60 |
|
|
52 | 61 |
def close(self): |
53 |
exchange_close(self.conn) |
|
62 |
self.client.close() |
b/snf-pithos-backend/pithos/backends/modular.py | ||
---|---|---|
79 | 79 |
DEFAULT_BLOCK_PATH = 'data/' |
80 | 80 |
DEFAULT_BLOCK_UMASK = 0o022 |
81 | 81 |
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' |
82 |
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
82 |
#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]' |
|
83 |
#DEFAULT_QUEUE_EXCHANGE = 'pithos' |
|
83 | 84 |
|
84 | 85 |
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s' |
85 | 86 |
QUEUE_CLIENT_ID = 'pithos' |
... | ... | |
127 | 128 |
|
128 | 129 |
def __init__(self, db_module=None, db_connection=None, |
129 | 130 |
block_module=None, block_path=None, block_umask=None, |
130 |
queue_module=None, queue_connection=None): |
|
131 |
queue_module=None, queue_hosts=None, |
|
132 |
queue_exchange=None): |
|
131 | 133 |
db_module = db_module or DEFAULT_DB_MODULE |
132 | 134 |
db_connection = db_connection or DEFAULT_DB_CONNECTION |
133 | 135 |
block_module = block_module or DEFAULT_BLOCK_MODULE |
134 | 136 |
block_path = block_path or DEFAULT_BLOCK_PATH |
135 | 137 |
block_umask = block_umask or DEFAULT_BLOCK_UMASK |
136 | 138 |
#queue_module = queue_module or DEFAULT_QUEUE_MODULE |
137 |
#queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION |
|
138 |
|
|
139 |
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS |
|
140 |
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE |
|
141 |
|
|
139 | 142 |
self.hash_algorithm = 'sha256' |
140 | 143 |
self.block_size = 4 * 1024 * 1024 # 4MB |
141 | 144 |
|
... | ... | |
165 | 168 |
'umask': block_umask} |
166 | 169 |
self.store = self.block_module.Store(**params) |
167 | 170 |
|
168 |
if queue_module and queue_connection:
|
|
171 |
if queue_module and queue_hosts:
|
|
169 | 172 |
self.queue_module = load_module(queue_module) |
170 |
params = {'exchange': queue_connection, |
|
173 |
params = {'hosts': queue_hosts, |
|
174 |
'exchange': queue_exchange, |
|
171 | 175 |
'client_id': QUEUE_CLIENT_ID} |
172 | 176 |
self.queue = self.queue_module.Queue(**params) |
173 | 177 |
else: |
... | ... | |
1221 | 1225 |
details.update({'user': user}) |
1222 | 1226 |
logger.debug("_report_object_change: %s %s %s %s", user, |
1223 | 1227 |
account, path, details) |
1224 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ( |
|
1225 |
'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
|
|
1228 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
|
|
1229 |
account, QUEUE_INSTANCE_ID, 'object', path, details))
|
|
1226 | 1230 |
|
1227 | 1231 |
def _report_sharing_change(self, user, account, path, details={}): |
1228 | 1232 |
logger.debug("_report_permissions_change: %s %s %s %s", |
1229 | 1233 |
user, account, path, details) |
1230 | 1234 |
details.update({'user': user}) |
1231 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
|
|
1232 |
QUEUE_INSTANCE_ID, 'sharing', path, details))
|
|
1235 |
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), |
|
1236 |
account, QUEUE_INSTANCE_ID, 'sharing', path, details))
|
|
1233 | 1237 |
|
1234 | 1238 |
# Policy functions. |
1235 | 1239 |
|
b/snf-pithos-tools/pithos/tools/dispatcher.py | ||
---|---|---|
35 | 35 |
|
36 | 36 |
import sys |
37 | 37 |
import logging |
38 |
import json |
|
38 | 39 |
|
39 |
from synnefo.lib.queue import (exchange_connect, exchange_close, |
|
40 |
exchange_send, exchange_route, queue_callback, queue_start) |
|
40 |
from synnefo.lib.amqp import AMQPClient |
|
41 | 41 |
|
42 | 42 |
from optparse import OptionParser |
43 | 43 |
|
... | ... | |
94 | 94 |
level=logging.DEBUG if DEBUG else logging.INFO) |
95 | 95 |
logger = logging.getLogger('dispatcher') |
96 | 96 |
|
97 |
exchange = 'rabbitmq://%s:%s@%s:%s/%s' % ( |
|
98 |
opts.user, opts.password, opts.host, opts.port, opts.exchange) |
|
99 |
connection = exchange_connect(exchange) |
|
97 |
host = 'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port) |
|
98 |
queue = opts.queue |
|
99 |
key = opts.key |
|
100 |
exchange = opts.exchange |
|
101 |
|
|
102 |
client = AMQPClient(hosts=[host]) |
|
103 |
client.connect() |
|
104 |
|
|
100 | 105 |
if opts.test: |
101 |
exchange_send(connection, opts.key, {"test": "0123456789"}) |
|
102 |
exchange_close(connection) |
|
106 |
client.exchange_declare(exchange=exchange, |
|
107 |
type='topic') |
|
108 |
client.basic_publish(exchange=exchange, |
|
109 |
routing_key=key, |
|
110 |
body= json.dumps({"test": "0123456789"})) |
|
111 |
client.close() |
|
103 | 112 |
sys.exit() |
104 | 113 |
|
105 | 114 |
callback = None |
... | ... | |
110 | 119 |
cb_module = sys.modules[cb[0]] |
111 | 120 |
callback = getattr(cb_module, cb[1]) |
112 | 121 |
|
113 |
def handle_message(msg): |
|
114 |
print msg |
|
122 |
def handle_message(client, msg): |
|
115 | 123 |
logger.debug('%s', msg) |
116 | 124 |
if callback: |
117 | 125 |
callback(msg) |
126 |
client.basic_ack(msg) |
|
127 |
|
|
128 |
client.queue_declare(queue=queue) |
|
129 |
client.queue_bind(queue=queue, |
|
130 |
exchange=exchange, |
|
131 |
routing_key=key) |
|
132 |
|
|
133 |
client.basic_consume(queue=queue, callback=handle_message) |
|
118 | 134 |
|
119 |
exchange_route(connection, opts.key, opts.queue) |
|
120 |
queue_callback(connection, opts.queue, handle_message) |
|
121 | 135 |
try: |
122 |
queue_start(connection) |
|
136 |
while True: |
|
137 |
client.basic_wait() |
|
123 | 138 |
except KeyboardInterrupt: |
124 | 139 |
pass |
140 |
finally: |
|
141 |
client.close() |
|
125 | 142 |
|
126 | 143 |
|
127 | 144 |
if __name__ == '__main__': |
Also available in: Unified diff