2 |
2 |
#
|
3 |
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
4 |
#
|
5 |
|
"""Ganeti notification daemon with ampq
|
|
5 |
"""Ganeti notification daemon with amqp
|
6 |
6 |
|
7 |
7 |
A daemon to monitor the Ganeti job queue and publish job progress
|
8 |
8 |
and Ganeti VM state notifications over a 0mq PUB endpoint.
|
... | ... | |
27 |
27 |
import daemon.pidlockfile
|
28 |
28 |
from signal import signal, SIGINT, SIGTERM
|
29 |
29 |
|
30 |
|
from carrot.connection import BrokerConnection
|
31 |
|
from carrot.messaging import Publisher
|
|
30 |
from amqplib import client_0_8 as amqp
|
32 |
31 |
|
33 |
32 |
from threading import Thread, Event, currentThread
|
34 |
33 |
|
... | ... | |
38 |
37 |
from ganeti import serializer
|
39 |
38 |
|
40 |
39 |
class JobFileHandler(pyinotify.ProcessEvent):
|
41 |
|
def __init__(self, logger, amqpd):
|
|
40 |
def __init__(self, logger, chan):
|
42 |
41 |
pyinotify.ProcessEvent.__init__(self)
|
43 |
42 |
self.logger = logger
|
44 |
|
self.amqpd = amqpd
|
|
43 |
self.chan = chan
|
45 |
44 |
|
46 |
45 |
def process_IN_CLOSE_WRITE(self, event):
|
47 |
46 |
jobfile = os.path.join(event.path, event.name)
|
... | ... | |
92 |
91 |
msg["message"] = logmsg
|
93 |
92 |
|
94 |
93 |
self.logger.debug("PUSHing msg: %s", json.dumps(msg))
|
95 |
|
amqpd.send(json.dumps(msg))
|
96 |
|
|
|
94 |
msg = amqp.Message(json.dumps(msg))
|
|
95 |
msg.properties["delivery_mode"] = 2 #Persistent
|
|
96 |
self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
|
97 |
97 |
|
98 |
98 |
handler_logger = None
|
99 |
99 |
def fatal_signal_handler(signum, frame):
|
... | ... | |
159 |
159 |
# signal(SIGTERM, fatal_signal_handler)
|
160 |
160 |
|
161 |
161 |
#Init connection to RabbitMQ
|
162 |
|
conn = BrokerConnection(hostname=settings.RABBIT_HOST,
|
163 |
|
port=settings.RABBIT_PORT,
|
|
162 |
conn = amqp.Connection( host=settings.RABBIT_HOST,
|
164 |
163 |
userid=settings.RABBIT_USERNAME,
|
165 |
164 |
password=settings.RABBIT_PASSWORD,
|
166 |
165 |
virtual_host=settings.RABBIT_VHOST)
|
167 |
|
publisher = Publisher(connection=conn, exchange="ganeti",
|
168 |
|
routing_key="importer")
|
169 |
|
|
170 |
|
|
|
166 |
chan = conn.channel()
|
|
167 |
|
171 |
168 |
# Monitor the Ganeti job queue, create and push notifications
|
172 |
169 |
wm = pyinotify.WatchManager()
|
173 |
170 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
174 |
|
handler = JobFileHandler(logger, publisher)
|
|
171 |
handler = JobFileHandler(logger, chan)
|
175 |
172 |
notifier = pyinotify.Notifier(wm, handler)
|
176 |
173 |
|
177 |
174 |
try:
|
... | ... | |
195 |
192 |
finally:
|
196 |
193 |
# destroy the inotify's instance on this interrupt (stop monitoring)
|
197 |
194 |
notifier.stop()
|
198 |
|
# mark the 0mq thread as stopped, wake it up so that it notices
|
|
195 |
#Close the amqp connection
|
|
196 |
chan.close()
|
|
197 |
conn.close()
|
199 |
198 |
raise
|
200 |
199 |
|
201 |
200 |
if __name__ == "__main__":
|