Revision 031d3f3a ganeti/ganeti-eventd.py
b/ganeti/ganeti-eventd.py | ||
---|---|---|
2 | 2 |
# |
3 | 3 |
# Copyright (c) 2010 Greek Research and Technology Network |
4 | 4 |
# |
5 |
"""Ganeti notification daemon with ampq
|
|
5 |
"""Ganeti notification daemon with amqp
|
|
6 | 6 |
|
7 | 7 |
A daemon to monitor the Ganeti job queue and publish job progress |
8 | 8 |
and Ganeti VM state notifications over a 0mq PUB endpoint. |
... | ... | |
27 | 27 |
import daemon.pidlockfile |
28 | 28 |
from signal import signal, SIGINT, SIGTERM |
29 | 29 |
|
30 |
from carrot.connection import BrokerConnection |
|
31 |
from carrot.messaging import Publisher |
|
30 |
from amqplib import client_0_8 as amqp |
|
32 | 31 |
|
33 | 32 |
from threading import Thread, Event, currentThread |
34 | 33 |
|
... | ... | |
38 | 37 |
from ganeti import serializer |
39 | 38 |
|
40 | 39 |
class JobFileHandler(pyinotify.ProcessEvent): |
41 |
def __init__(self, logger, amqpd):
|
|
40 |
def __init__(self, logger, chan):
|
|
42 | 41 |
pyinotify.ProcessEvent.__init__(self) |
43 | 42 |
self.logger = logger |
44 |
self.amqpd = amqpd
|
|
43 |
self.chan = chan
|
|
45 | 44 |
|
46 | 45 |
def process_IN_CLOSE_WRITE(self, event): |
47 | 46 |
jobfile = os.path.join(event.path, event.name) |
... | ... | |
92 | 91 |
msg["message"] = logmsg |
93 | 92 |
|
94 | 93 |
self.logger.debug("PUSHing msg: %s", json.dumps(msg)) |
95 |
amqpd.send(json.dumps(msg)) |
|
96 |
|
|
94 |
msg = amqp.Message(json.dumps(msg)) |
|
95 |
msg.properties["delivery_mode"] = 2 #Persistent |
|
96 |
self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd") |
|
97 | 97 |
|
98 | 98 |
handler_logger = None |
99 | 99 |
def fatal_signal_handler(signum, frame): |
... | ... | |
159 | 159 |
# signal(SIGTERM, fatal_signal_handler) |
160 | 160 |
|
161 | 161 |
#Init connection to RabbitMQ |
162 |
conn = BrokerConnection(hostname=settings.RABBIT_HOST, |
|
163 |
port=settings.RABBIT_PORT, |
|
162 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
164 | 163 |
userid=settings.RABBIT_USERNAME, |
165 | 164 |
password=settings.RABBIT_PASSWORD, |
166 | 165 |
virtual_host=settings.RABBIT_VHOST) |
167 |
publisher = Publisher(connection=conn, exchange="ganeti", |
|
168 |
routing_key="importer") |
|
169 |
|
|
170 |
|
|
166 |
chan = conn.channel() |
|
167 |
|
|
171 | 168 |
# Monitor the Ganeti job queue, create and push notifications |
172 | 169 |
wm = pyinotify.WatchManager() |
173 | 170 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"] |
174 |
handler = JobFileHandler(logger, publisher)
|
|
171 |
handler = JobFileHandler(logger, chan)
|
|
175 | 172 |
notifier = pyinotify.Notifier(wm, handler) |
176 | 173 |
|
177 | 174 |
try: |
... | ... | |
195 | 192 |
finally: |
196 | 193 |
# destroy the inotify's instance on this interrupt (stop monitoring) |
197 | 194 |
notifier.stop() |
198 |
# mark the 0mq thread as stopped, wake it up so that it notices |
|
195 |
#Close the amqp connection |
|
196 |
chan.close() |
|
197 |
conn.close() |
|
199 | 198 |
raise |
200 | 199 |
|
201 | 200 |
if __name__ == "__main__": |
Also available in: Unified diff