2 |
2 |
#
|
3 |
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
4 |
#
|
5 |
|
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
|
|
5 |
"""Ganeti notification daemon for 0mqd
|
|
6 |
|
|
7 |
A daemon to monitor the Ganeti job queue and publish job progress
|
|
8 |
and Ganeti VM state notifications over a 0mq PUB endpoint.
|
|
9 |
|
|
10 |
"""
|
|
11 |
|
|
12 |
from django.core.management import setup_environ
|
6 |
13 |
|
7 |
|
import os
|
8 |
14 |
import sys
|
|
15 |
import os
|
|
16 |
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
|
17 |
sys.path.append(path)
|
|
18 |
import synnefo.settings as settings
|
|
19 |
|
|
20 |
setup_environ(settings)
|
|
21 |
|
9 |
22 |
import zmq
|
10 |
23 |
import time
|
11 |
24 |
import json
|
12 |
25 |
import logging
|
13 |
26 |
import pyinotify
|
|
27 |
import daemon
|
|
28 |
import daemon.pidlockfile
|
|
29 |
from signal import signal, SIGINT, SIGTERM
|
|
30 |
|
|
31 |
from threading import Thread, Event, currentThread
|
14 |
32 |
|
15 |
33 |
from ganeti import utils
|
16 |
34 |
from ganeti import jqueue
|
17 |
35 |
from ganeti import constants
|
18 |
36 |
from ganeti import serializer
|
19 |
37 |
|
20 |
|
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
|
|
38 |
|
|
39 |
class StoppableThread(Thread):
|
|
40 |
"""Thread class with a stop() method.
|
|
41 |
|
|
42 |
The thread needs to check regularly for the stopped() condition.
|
|
43 |
When it does, it exits, so that another thread may .join() it.
|
|
44 |
|
|
45 |
"""
|
|
46 |
def __init__(self, *args, **kwargs):
|
|
47 |
Thread.__init__(self, *args, **kwargs)
|
|
48 |
self._stop = Event()
|
|
49 |
|
|
50 |
def stop(self):
|
|
51 |
self._stop.set()
|
|
52 |
|
|
53 |
def stopped(self):
|
|
54 |
return self._stop.isSet()
|
|
55 |
|
|
56 |
|
|
57 |
class GanetiZMQThread(StoppableThread):
|
|
58 |
"""The 0mq processing thread: PULLs and then PUBlishes notifications.
|
|
59 |
|
|
60 |
This thread runs until stopped, receiving notifications over a
|
|
61 |
0mq PULL socket, and publishing them over a 0mq PUB socket.
|
|
62 |
|
|
63 |
The are currently two sources of notifications:
|
|
64 |
a. ganeti-0mqd itself, monitoring the Ganeti job queue
|
|
65 |
b. hooks running in the context of Ganeti
|
|
66 |
|
|
67 |
"""
|
|
68 |
def __init__(self, logger, puller, publisher):
|
|
69 |
StoppableThread.__init__(self)
|
|
70 |
self.logger = logger
|
|
71 |
self.puller = puller
|
|
72 |
self.publisher = publisher
|
|
73 |
|
|
74 |
def run(self):
|
|
75 |
self.logger.debug("0mq thread ready")
|
|
76 |
try:
|
|
77 |
while True:
|
|
78 |
# Pull
|
|
79 |
self.logger.debug("Waiting on the 0mq PULL socket")
|
|
80 |
data = self.puller.recv()
|
|
81 |
self.logger.debug("Received message on 0mq PULL socket")
|
|
82 |
if currentThread().stopped():
|
|
83 |
self.logger.debug("Thread has been stopped, leaving request loop")
|
|
84 |
return
|
|
85 |
try:
|
|
86 |
msg = json.loads(data)
|
|
87 |
if msg['type'] not in ('ganeti-op-status'):
|
|
88 |
self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data))
|
|
89 |
continue
|
|
90 |
except Exception, e:
|
|
91 |
self.logger.exception("Unexpected Exception decoding msg: %s", data)
|
|
92 |
continue
|
|
93 |
|
|
94 |
# Publish
|
|
95 |
self.logger.debug("PUBlishing msg: %s", json.dumps(msg))
|
|
96 |
self.publisher.send_json(msg)
|
|
97 |
|
|
98 |
except:
|
|
99 |
self.logger.exception("Caught exception, terminating")
|
|
100 |
os.kill(os.getpid(), SIGTERM)
|
21 |
101 |
|
22 |
102 |
|
23 |
103 |
class JobFileHandler(pyinotify.ProcessEvent):
|
24 |
|
def __init__(self, publisher):
|
|
104 |
def __init__(self, logger, pusher):
|
25 |
105 |
pyinotify.ProcessEvent.__init__(self)
|
26 |
|
self.publisher = publisher
|
|
106 |
self.logger = logger
|
|
107 |
self.pusher = pusher
|
27 |
108 |
|
28 |
109 |
def process_IN_CLOSE_WRITE(self, event):
|
29 |
110 |
jobfile = os.path.join(event.path, event.name)
|
30 |
111 |
if not event.name.startswith("job-"):
|
31 |
|
logging.debug("Not a job file: %s" % event.path)
|
|
112 |
self.logger.debug("Not a job file: %s" % event.path)
|
32 |
113 |
return
|
33 |
114 |
|
34 |
115 |
try:
|
... | ... | |
58 |
139 |
except IndexError:
|
59 |
140 |
logmsg = None
|
60 |
141 |
|
61 |
|
logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
|
62 |
|
if op.status in constants.JOBS_FINALIZED:
|
63 |
|
logging.info("%d: %s" % (int(job.id), op.status))
|
|
142 |
self.logger.debug("%d: %s(%s) %s %s",
|
|
143 |
int(job.id), op.input.OP_ID, instances, op.status, logmsg)
|
64 |
144 |
|
65 |
145 |
# Construct message
|
66 |
146 |
msg = {
|
... | ... | |
69 |
149 |
"operation": op.input.OP_ID,
|
70 |
150 |
"jobId": int(job.id),
|
71 |
151 |
"status": op.status,
|
72 |
|
"logmsg": logmsg
|
|
152 |
"logmsg": logmsg
|
73 |
153 |
}
|
74 |
154 |
if logmsg:
|
75 |
155 |
msg["message"] = logmsg
|
76 |
156 |
|
77 |
|
# Output as JSON
|
78 |
|
print json.dumps(msg)
|
79 |
|
|
80 |
|
self.publisher.send_json(msg)
|
|
157 |
# Push to the 0mq thread for PUBlication
|
|
158 |
self.logger.debug("PUSHing msg: %s", json.dumps(msg))
|
|
159 |
self.pusher.send_json(msg)
|
|
160 |
|
|
161 |
|
|
162 |
handler_logger = None
|
|
163 |
def fatal_signal_handler(signum, frame):
|
|
164 |
global handler_logger
|
81 |
165 |
|
|
166 |
handler_logger.info("Caught fatal signal %d, will raise SystemExit",
|
|
167 |
signum)
|
|
168 |
raise SystemExit
|
|
169 |
|
|
170 |
def parse_arguments(args):
|
|
171 |
from optparse import OptionParser
|
|
172 |
|
|
173 |
parser = OptionParser()
|
|
174 |
parser.add_option("-d", "--debug", action="store_true", dest="debug",
|
|
175 |
help="Enable debugging information")
|
|
176 |
parser.add_option("-l", "--log", dest="log_file",
|
|
177 |
default=settings.GANETI_0MQD_LOG_FILE,
|
|
178 |
metavar="FILE",
|
|
179 |
help="Write log to FILE instead of %s" %
|
|
180 |
settings.GANETI_0MQD_LOG_FILE),
|
|
181 |
parser.add_option('--pid-file', dest="pid_file",
|
|
182 |
default=settings.GANETI_0MQD_PID_FILE,
|
|
183 |
metavar='PIDFILE',
|
|
184 |
help="Save PID to file (default: %s)" %
|
|
185 |
settings.GANETI_0MQD_PID_FILE)
|
|
186 |
parser.add_option("-p", "--pull-port", dest="pull_port",
|
|
187 |
default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT",
|
|
188 |
help="The TCP port number to use for the 0mq PULL endpoint")
|
|
189 |
parser.add_option("-P", "--pub-port", dest="pub_port",
|
|
190 |
default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT",
|
|
191 |
help="The TCP port number to use for the 0mq PUB endpoint")
|
|
192 |
|
|
193 |
return parser.parse_args(args)
|
82 |
194 |
|
83 |
195 |
def main():
|
|
196 |
global handler_logger
|
|
197 |
|
|
198 |
(opts, args) = parse_arguments(sys.argv[1:])
|
|
199 |
|
|
200 |
# The 0mq endpoints to use for receiving and publishing notifications.
|
|
201 |
GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port)
|
|
202 |
GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port)
|
|
203 |
GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
|
|
204 |
|
|
205 |
# Create pidfile
|
|
206 |
pidf = daemon.pidlockfile.TimeoutPIDLockFile(
|
|
207 |
opts.pid_file, 10)
|
|
208 |
|
|
209 |
# Initialize logger
|
|
210 |
lvl = logging.DEBUG if opts.debug else logging.INFO
|
|
211 |
logger = logging.getLogger("ganeti-0mqd")
|
|
212 |
logger.setLevel(lvl)
|
|
213 |
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
|
|
214 |
"%Y-%m-%d %H:%M:%S")
|
|
215 |
handler = logging.FileHandler(opts.log_file)
|
|
216 |
handler.setFormatter(formatter)
|
|
217 |
logger.addHandler(handler)
|
|
218 |
handler_logger = logger
|
|
219 |
|
|
220 |
# Become a daemon:
|
|
221 |
# Redirect stdout and stderr to handler.stream to catch
|
|
222 |
# early errors in the daemonization process [e.g., pidfile creation]
|
|
223 |
# which will otherwise go to /dev/null.
|
|
224 |
daemon_context = daemon.DaemonContext(
|
|
225 |
pidfile=pidf,
|
|
226 |
umask=022,
|
|
227 |
stdout=handler.stream,
|
|
228 |
stderr=handler.stream,
|
|
229 |
files_preserve=[handler.stream])
|
|
230 |
daemon_context.open()
|
|
231 |
logger.info("Became a daemon")
|
|
232 |
|
|
233 |
# Catch signals to ensure graceful shutdown
|
|
234 |
signal(SIGINT, fatal_signal_handler)
|
|
235 |
signal(SIGTERM, fatal_signal_handler)
|
|
236 |
|
|
237 |
# Create 0mq sockets: One for the PUBlisher, one for the PULLer,
|
|
238 |
# one inproc PUSHer for inter-thread communication.
|
84 |
239 |
zmqc = zmq.Context()
|
85 |
|
publisher = zmqc.socket(zmq.PUB)
|
86 |
|
publisher.bind(GANETI_ZMQ_PUBLISHER)
|
87 |
|
logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
|
|
240 |
puller = zmqc.socket(zmq.PULL)
|
|
241 |
puller.bind(GANETI_0MQD_PULL_ENDPOINT)
|
|
242 |
puller.bind(GANETI_0MQD_INPROC_ENDPOINT)
|
88 |
243 |
|
|
244 |
publisher = zmqc.socket(zmq.PUB)
|
|
245 |
publisher.bind(GANETI_0MQD_PUB_ENDPOINT)
|
|
246 |
|
|
247 |
pusher = zmqc.socket(zmq.PUSH)
|
|
248 |
pusher.connect(GANETI_0MQD_INPROC_ENDPOINT)
|
|
249 |
logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
|
|
250 |
logger.info("PULLing from (%s, %s)",
|
|
251 |
GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT)
|
|
252 |
logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
|
|
253 |
|
|
254 |
# Use a separate thread for 0mq processing,
|
|
255 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
|
256 |
zmqt = GanetiZMQThread(logger, puller, publisher)
|
|
257 |
zmqt.start()
|
|
258 |
|
|
259 |
# Monitor the Ganeti job queue, create and push notifications
|
89 |
260 |
wm = pyinotify.WatchManager()
|
90 |
261 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
91 |
|
handler = JobFileHandler(publisher)
|
|
262 |
handler = JobFileHandler(logger, pusher)
|
92 |
263 |
notifier = pyinotify.Notifier(wm, handler)
|
93 |
|
wm.add_watch(constants.QUEUE_DIR, mask)
|
94 |
264 |
|
95 |
|
logging.info("Now watching %s" % constants.QUEUE_DIR)
|
|
265 |
try:
|
|
266 |
# Fail if adding the inotify() watch fails for any reason
|
|
267 |
res = wm.add_watch(constants.QUEUE_DIR, mask)
|
|
268 |
if res[constants.QUEUE_DIR] < 0:
|
|
269 |
raise Exception("pyinotify add_watch returned negative watch descriptor")
|
|
270 |
|
|
271 |
logger.info("Now watching %s" % constants.QUEUE_DIR)
|
96 |
272 |
|
97 |
|
while True: # loop forever
|
98 |
|
try:
|
99 |
|
# process the queue of events as explained above
|
|
273 |
while True: # loop forever
|
|
274 |
# process the queue of events as explained above
|
100 |
275 |
notifier.process_events()
|
101 |
276 |
if notifier.check_events():
|
102 |
277 |
# read notified events and enqeue them
|
103 |
278 |
notifier.read_events()
|
104 |
|
except KeyboardInterrupt:
|
105 |
|
# destroy the inotify's instance on this interrupt (stop monitoring)
|
106 |
|
notifier.stop()
|
107 |
|
break
|
|
279 |
except SystemExit:
|
|
280 |
logger.info("SystemExit")
|
|
281 |
except:
|
|
282 |
logger.exception("Caught exception, terminating")
|
|
283 |
finally:
|
|
284 |
# destroy the inotify's instance on this interrupt (stop monitoring)
|
|
285 |
notifier.stop()
|
|
286 |
# mark the 0mq thread as stopped, wake it up so that it notices
|
|
287 |
zmqt.stop()
|
|
288 |
pusher.send_json({'type': 'null'})
|
|
289 |
raise
|
108 |
290 |
|
109 |
291 |
|
110 |
292 |
if __name__ == "__main__":
|
111 |
|
logging.basicConfig(level=logging.DEBUG)
|
112 |
293 |
sys.exit(main())
|
113 |
294 |
|
114 |
295 |
# vim: set ts=4 sts=4 sw=4 et ai :
|