Revision 7ca9e930
b/README.develop | ||
---|---|---|
182 | 182 |
username in settings.py. Several functional conventions within the system |
183 | 183 |
require this variable to include a dash at its end (e.g. snf-) |
184 | 184 |
|
185 |
8. Specify a new queue to receive ganeti events, based on the specified |
|
186 |
BACKEND_PREFIX_ID. Something like the following at the end of |
|
187 |
settings.py should do the trick. |
|
188 |
|
|
189 |
DB_HANDLER_KEY = 'ganeti.%s.#' % BACKEND_PREFIX_ID.split('-')[0] |
|
190 |
BINDINGS[0] = ("events-%s" % BACKEND_PREFIX_ID.split('-')[0], EXCHANGE_GANETI, DB_HANDLER_KEY, 'update_db') |
|
191 |
QUEUES += ("events-%s" % BACKEND_PREFIX_ID.split('-')[0], ) |
|
185 |
8. Fix the AMQP-specific settings based on the selected BACKEND_PREFIX_ID. |
|
186 |
The fix_amqp_settings() function is called once at the end of |
|
187 |
settings.py.dist, you must call it again if you change BACKEND_PREFIX_ID |
|
188 |
at some later point. |
|
192 | 189 |
|
193 | 190 |
9. Start the system |
194 | 191 |
$ ./bin/python logic/dispatcher.py # DB synch daemon |
b/ganeti/ganeti-eventd.py | ||
---|---|---|
110 | 110 |
msg["message"] = logmsg |
111 | 111 |
|
112 | 112 |
instance = instances.split('-')[0] |
113 |
routekey = "ganeti.%s.event.%s" % (instance,op.status)
|
|
113 |
routekey = "ganeti.%s.event.op" % instance
|
|
114 | 114 |
|
115 | 115 |
self.logger.debug("Delivering msg: %s (key=%s)", |
116 | 116 |
json.dumps(msg), routekey) |
b/ganeti/hooks.py | ||
---|---|---|
20 | 20 |
|
21 | 21 |
import synnefo.settings as settings |
22 | 22 |
|
23 |
|
|
23 | 24 |
def ganeti_net_status(logger, environ): |
24 | 25 |
"""Produce notifications of type 'Ganeti-net-status' |
25 | 26 |
|
... | ... | |
152 | 153 |
|
153 | 154 |
self.publish_msgs(notifs) |
154 | 155 |
|
155 |
print >> sys.stderr, "post_start_hook: ", notifs |
|
156 |
|
|
157 |
return 1 |
|
156 |
return 0 |
|
158 | 157 |
|
159 | 158 |
|
160 | 159 |
class PostStopHook(GanetiHook): |
b/logic/dispatcher_callbacks.py | ||
---|---|---|
10 | 10 |
|
11 | 11 |
def update_db(message): |
12 | 12 |
"""Process the status of a VM based on a ganeti status message""" |
13 |
_logger.debug("Processing msg: %s" % message.body)
|
|
13 |
_logger.debug("Processing msg: %s", message.body)
|
|
14 | 14 |
try: |
15 | 15 |
msg = json.loads(message.body) |
16 | 16 |
|
17 | 17 |
if msg["type"] != "ganeti-op-status": |
18 |
_logger.error("Message is of uknown type %s." % (msg["type"],))
|
|
18 |
_logger.error("Message is of uknown type %s.", msg["type"])
|
|
19 | 19 |
return |
20 | 20 |
|
21 | 21 |
vmid = utils.id_from_instance_name(msg["instance"]) |
22 | 22 |
vm = VirtualMachine.objects.get(id=vmid) |
23 | 23 |
|
24 | 24 |
backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
25 |
_logger.debug("Done processing msg for vm %s." % (msg["instance"]))
|
|
25 |
_logger.debug("Done processing msg for vm %s.", msg["instance"])
|
|
26 | 26 |
message.channel.basic_ack(message.delivery_tag) |
27 | 27 |
except KeyError: |
28 |
_logger.error("Malformed incoming JSON, missing attributes: " + message.body)
|
|
28 |
_logger.error("Malformed incoming JSON, missing attributes: %s", message.body)
|
|
29 | 29 |
except VirtualMachine.InvalidBackendIdError: |
30 |
_logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
|
|
30 |
_logger.debug("Ignoring msg for unknown instance %s.", msg["instance"])
|
|
31 | 31 |
except VirtualMachine.DoesNotExist: |
32 |
_logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
|
|
32 |
_logger.error("VM for instance %s with id %d not found in DB.", msg["instance"], vmid)
|
|
33 | 33 |
except Exception as e: |
34 | 34 |
_logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
35 | 35 |
|
36 |
|
|
37 |
def update_net(message): |
|
38 |
"""Process a network status update notification from Ganeti""" |
|
39 |
_logger.debug("MINE msg: %s", message.body) |
|
40 |
message.channel.basic_ack(message.delivery_tag) |
|
41 |
|
|
42 |
|
|
36 | 43 |
def send_email(message): |
37 | 44 |
_logger.debug("Request to send email message") |
38 | 45 |
message.channel.basic_ack(message.delivery_tag) |
... | ... | |
46 | 53 |
def dummy_proc(message): |
47 | 54 |
try: |
48 | 55 |
msg = json.loads(message.body) |
49 |
_logger.debug("Msg (exchange:%s) " % (msg, ))
|
|
56 |
_logger.debug("Msg (exchange:%s) ", msg)
|
|
50 | 57 |
message.channel.basic_ack(message.delivery_tag) |
51 | 58 |
except Exception as e: |
52 | 59 |
_logger.error("Could not receive message %s" % e.message) |
b/settings.py.dist | ||
---|---|---|
167 | 167 |
GANETI_EVENTD_LOG_FILE = "/var/log/synnefo/ganeti-eventd.log" |
168 | 168 |
GANETI_EVENTD_PID_FILE = "/var/run/synnefo/ganeti-eventd.pid" |
169 | 169 |
|
170 |
#Rabbit work queue end point
|
|
170 |
# Rabbit work queue endpoint
|
|
171 | 171 |
RABBIT_HOST = "62.217.120.67:5672" |
172 | 172 |
RABBIT_USERNAME = "okeanos" |
173 | 173 |
RABBIT_PASSWORD = "0k3@n0s" |
174 | 174 |
RABBIT_VHOST = "/" |
175 | 175 |
|
176 |
# Queues, exchanges and bindings |
|
176 |
# |
|
177 |
# Queues, exchanges and bindings for AMQP |
|
178 |
# |
|
177 | 179 |
EXCHANGE_GANETI = "ganeti" # Messages from Ganeti |
178 | 180 |
EXCHANGE_CRON = "cron" # Messages from periodically triggered tasks |
179 | 181 |
EXCHANGE_API = "api" # Messages from the Rest API |
... | ... | |
192 | 194 |
(QUEUE_DEBUG, EXCHANGE_API, '#', 'dummy_proc'), |
193 | 195 |
] |
194 | 196 |
|
195 |
#Prefix-based naming for db routing key, only |
|
196 |
DB_HANDLER_KEY = 'ganeti.%s.#' % BACKEND_PREFIX_ID.split('-')[0] |
|
197 |
|
|
198 | 197 |
BINDINGS = [ |
199 |
# Queue # Exchange # RouteKey #Handler |
|
200 |
(QUEUE_GANETI_EVENTS, EXCHANGE_GANETI, 'ganeti.*', 'update_db'), |
|
201 |
(QUEUE_CRON_CREDITS, EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
202 |
(QUEUE_EMAIL, EXCHANGE_API, '*.email.*', 'send_email'), |
|
203 |
(QUEUE_EMAIL, EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
198 |
# Queue # Exchange # RouteKey #Handler |
|
199 |
(QUEUE_GANETI_EVENTS, EXCHANGE_GANETI, 'ganeti.*.event.op', 'update_db'), |
|
200 |
(QUEUE_GANETI_EVENTS, EXCHANGE_GANETI, 'ganeti.*.event.net', 'update_net'), |
|
201 |
(QUEUE_CRON_CREDITS, EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
202 |
(QUEUE_EMAIL, EXCHANGE_API, '*.email.*', 'send_email'), |
|
203 |
(QUEUE_EMAIL, EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
204 | 204 |
] |
205 | 205 |
|
206 |
def fix_amqp_settings(backend_prefix): |
|
207 |
"""Configure AMQP-specific settings |
|
208 |
|
|
209 |
Configure AMQP-specific settings based on backend_prefix. |
|
210 |
This function *must* be called later in settings.py, with |
|
211 |
BACKEND_PREFIX_ID as argument. |
|
212 |
|
|
213 |
""" |
|
214 |
global DB_HANDLER_KEY_OP, DB_HANDLER_KEY_NET, BINDINGS, QUEUES |
|
215 |
|
|
216 |
prefix = backend_prefix.split('-')[0] |
|
217 |
DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix # notifications of type "ganeti-op-status" |
|
218 |
DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix # notifications of type "ganeti-net-status" |
|
219 |
BINDINGS[0] = ("events-%s-op" % prefix, EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db') |
|
220 |
BINDINGS[1] = ("events-%s-net" % prefix, EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net') |
|
221 |
QUEUES += ("events-%s-op" % prefix, "events-%s-net" % prefix) |
|
222 |
|
|
223 |
# Fix up the AMQP-specific settings based on BACKEND_PREFIX_ID |
|
224 |
# Make sure to call it again, if you modify it at some later point |
|
225 |
fix_amqp_settings(BACKEND_PREFIX_ID) |
|
226 |
|
|
206 | 227 |
# Logic dispatcher settings |
207 | 228 |
DISPATCHER_LOG_FILE = "/var/log/synnefo/dispatcher.log" |
208 | 229 |
|
Also available in: Unified diff