root / ganeti / ganeti-0mqd.py @ dac67c0a
History | View | Annotate | Download (9.7 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
"""Ganeti notification daemon for 0mqd
|
6 |
|
7 |
A daemon to monitor the Ganeti job queue and publish job progress
|
8 |
and Ganeti VM state notifications over a 0mq PUB endpoint.
|
9 |
|
10 |
"""
|
11 |
|
12 |
from django.core.management import setup_environ |
13 |
|
14 |
import sys |
15 |
import os |
16 |
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 |
sys.path.append(path) |
18 |
import synnefo.settings as settings |
19 |
|
20 |
setup_environ(settings) |
21 |
|
22 |
import zmq |
23 |
import time |
24 |
import json |
25 |
import logging |
26 |
import pyinotify |
27 |
import daemon |
28 |
import daemon.pidlockfile |
29 |
from signal import signal, SIGINT, SIGTERM |
30 |
|
31 |
from threading import Thread, Event, currentThread |
32 |
|
33 |
from ganeti import utils |
34 |
from ganeti import jqueue |
35 |
from ganeti import constants |
36 |
from ganeti import serializer |
37 |
|
38 |
|
39 |
class StoppableThread(Thread): |
40 |
"""Thread class with a stop() method.
|
41 |
|
42 |
The thread needs to check regularly for the stopped() condition.
|
43 |
When it does, it exits, so that another thread may .join() it.
|
44 |
|
45 |
"""
|
46 |
def __init__(self, *args, **kwargs): |
47 |
Thread.__init__(self, *args, **kwargs)
|
48 |
self._stop = Event()
|
49 |
|
50 |
def stop(self): |
51 |
self._stop.set()
|
52 |
|
53 |
def stopped(self): |
54 |
return self._stop.isSet() |
55 |
|
56 |
|
57 |
class GanetiZMQThread(StoppableThread): |
58 |
"""The 0mq processing thread: PULLs and then PUBlishes notifications.
|
59 |
|
60 |
This thread runs until stopped, receiving notifications over a
|
61 |
0mq PULL socket, and publishing them over a 0mq PUB socket.
|
62 |
|
63 |
The are currently two sources of notifications:
|
64 |
a. ganeti-0mqd itself, monitoring the Ganeti job queue
|
65 |
b. hooks running in the context of Ganeti
|
66 |
|
67 |
"""
|
68 |
def __init__(self, logger, puller, publisher): |
69 |
StoppableThread.__init__(self)
|
70 |
self.logger = logger
|
71 |
self.puller = puller
|
72 |
self.publisher = publisher
|
73 |
|
74 |
def run(self): |
75 |
self.logger.debug("0mq thread ready") |
76 |
try:
|
77 |
while True: |
78 |
# Pull
|
79 |
self.logger.debug("Waiting on the 0mq PULL socket") |
80 |
data = self.puller.recv()
|
81 |
self.logger.debug("Received message on 0mq PULL socket") |
82 |
if currentThread().stopped():
|
83 |
self.logger.debug("Thread has been stopped, leaving request loop") |
84 |
return
|
85 |
try:
|
86 |
msg = json.loads(data) |
87 |
if msg['type'] not in ('ganeti-op-status'): |
88 |
self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data)) |
89 |
continue
|
90 |
except Exception, e: |
91 |
self.logger.exception("Unexpected Exception decoding msg: %s", data) |
92 |
continue
|
93 |
|
94 |
# Publish
|
95 |
self.logger.debug("PUBlishing msg: %s", json.dumps(msg)) |
96 |
self.publisher.send_json(msg)
|
97 |
|
98 |
except:
|
99 |
self.logger.exception("Caught exception, terminating") |
100 |
os.kill(os.getpid(), SIGTERM) |
101 |
|
102 |
|
103 |
class JobFileHandler(pyinotify.ProcessEvent): |
104 |
def __init__(self, logger, pusher): |
105 |
pyinotify.ProcessEvent.__init__(self)
|
106 |
self.logger = logger
|
107 |
self.pusher = pusher
|
108 |
|
109 |
def process_IN_CLOSE_WRITE(self, event): |
110 |
jobfile = os.path.join(event.path, event.name) |
111 |
if not event.name.startswith("job-"): |
112 |
self.logger.debug("Not a job file: %s" % event.path) |
113 |
return
|
114 |
|
115 |
try:
|
116 |
data = utils.ReadFile(jobfile) |
117 |
except IOError: |
118 |
|
119 |
return
|
120 |
|
121 |
data = serializer.LoadJson(data) |
122 |
job = jqueue._QueuedJob.Restore(None, data)
|
123 |
|
124 |
for op in job.ops: |
125 |
instances = ""
|
126 |
try:
|
127 |
instances = " ".join(op.input.instances)
|
128 |
except AttributeError: |
129 |
pass
|
130 |
|
131 |
try:
|
132 |
instances = op.input.instance_name |
133 |
except AttributeError: |
134 |
pass
|
135 |
|
136 |
# Get the last line of the op log as message
|
137 |
try:
|
138 |
logmsg = op.log[-1][-1] |
139 |
except IndexError: |
140 |
logmsg = None
|
141 |
|
142 |
self.logger.debug("%d: %s(%s) %s %s", |
143 |
int(job.id), op.input.OP_ID, instances, op.status, logmsg)
|
144 |
|
145 |
# Construct message
|
146 |
msg = { |
147 |
"type": "ganeti-op-status", |
148 |
"instance": instances,
|
149 |
"operation": op.input.OP_ID,
|
150 |
"jobId": int(job.id), |
151 |
"status": op.status,
|
152 |
"logmsg": logmsg
|
153 |
} |
154 |
if logmsg:
|
155 |
msg["message"] = logmsg
|
156 |
|
157 |
# Push to the 0mq thread for PUBlication
|
158 |
self.logger.debug("PUSHing msg: %s", json.dumps(msg)) |
159 |
self.pusher.send_json(msg)
|
160 |
|
161 |
|
162 |
handler_logger = None
|
163 |
def fatal_signal_handler(signum, frame): |
164 |
global handler_logger
|
165 |
|
166 |
handler_logger.info("Caught fatal signal %d, will raise SystemExit",
|
167 |
signum) |
168 |
raise SystemExit |
169 |
|
170 |
def parse_arguments(args): |
171 |
from optparse import OptionParser |
172 |
|
173 |
parser = OptionParser() |
174 |
parser.add_option("-d", "--debug", action="store_true", dest="debug", |
175 |
help="Enable debugging information")
|
176 |
parser.add_option("-l", "--log", dest="log_file", |
177 |
default=settings.GANETI_0MQD_LOG_FILE, |
178 |
metavar="FILE",
|
179 |
help="Write log to FILE instead of %s" %
|
180 |
settings.GANETI_0MQD_LOG_FILE), |
181 |
parser.add_option('--pid-file', dest="pid_file", |
182 |
default=settings.GANETI_0MQD_PID_FILE, |
183 |
metavar='PIDFILE',
|
184 |
help="Save PID to file (default: %s)" %
|
185 |
settings.GANETI_0MQD_PID_FILE) |
186 |
parser.add_option("-p", "--pull-port", dest="pull_port", |
187 |
default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT", |
188 |
help="The TCP port number to use for the 0mq PULL endpoint")
|
189 |
parser.add_option("-P", "--pub-port", dest="pub_port", |
190 |
default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT", |
191 |
help="The TCP port number to use for the 0mq PUB endpoint")
|
192 |
|
193 |
return parser.parse_args(args)
|
194 |
|
195 |
def main(): |
196 |
global handler_logger
|
197 |
|
198 |
(opts, args) = parse_arguments(sys.argv[1:])
|
199 |
|
200 |
# The 0mq endpoints to use for receiving and publishing notifications.
|
201 |
GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port) |
202 |
GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port) |
203 |
GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
|
204 |
|
205 |
# Create pidfile
|
206 |
pidf = daemon.pidlockfile.TimeoutPIDLockFile( |
207 |
opts.pid_file, 10)
|
208 |
|
209 |
# Initialize logger
|
210 |
lvl = logging.DEBUG if opts.debug else logging.INFO |
211 |
logger = logging.getLogger("ganeti-0mqd")
|
212 |
logger.setLevel(lvl) |
213 |
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
|
214 |
"%Y-%m-%d %H:%M:%S")
|
215 |
handler = logging.FileHandler(opts.log_file) |
216 |
handler.setFormatter(formatter) |
217 |
logger.addHandler(handler) |
218 |
handler_logger = logger |
219 |
|
220 |
# Become a daemon:
|
221 |
# Redirect stdout and stderr to handler.stream to catch
|
222 |
# early errors in the daemonization process [e.g., pidfile creation]
|
223 |
# which will otherwise go to /dev/null.
|
224 |
daemon_context = daemon.DaemonContext( |
225 |
pidfile=pidf, |
226 |
umask=022,
|
227 |
stdout=handler.stream, |
228 |
stderr=handler.stream, |
229 |
files_preserve=[handler.stream]) |
230 |
daemon_context.open() |
231 |
logger.info("Became a daemon")
|
232 |
|
233 |
# Catch signals to ensure graceful shutdown
|
234 |
signal(SIGINT, fatal_signal_handler) |
235 |
signal(SIGTERM, fatal_signal_handler) |
236 |
|
237 |
# Create 0mq sockets: One for the PUBlisher, one for the PULLer,
|
238 |
# one inproc PUSHer for inter-thread communication.
|
239 |
zmqc = zmq.Context() |
240 |
puller = zmqc.socket(zmq.PULL) |
241 |
puller.bind(GANETI_0MQD_PULL_ENDPOINT) |
242 |
puller.bind(GANETI_0MQD_INPROC_ENDPOINT) |
243 |
|
244 |
publisher = zmqc.socket(zmq.PUB) |
245 |
publisher.bind(GANETI_0MQD_PUB_ENDPOINT) |
246 |
|
247 |
pusher = zmqc.socket(zmq.PUSH) |
248 |
pusher.connect(GANETI_0MQD_INPROC_ENDPOINT) |
249 |
logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
|
250 |
logger.info("PULLing from (%s, %s)",
|
251 |
GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT) |
252 |
logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
|
253 |
|
254 |
# Use a separate thread for 0mq processing,
|
255 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
256 |
zmqt = GanetiZMQThread(logger, puller, publisher) |
257 |
zmqt.start() |
258 |
|
259 |
# Monitor the Ganeti job queue, create and push notifications
|
260 |
wm = pyinotify.WatchManager() |
261 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
|
262 |
handler = JobFileHandler(logger, pusher) |
263 |
notifier = pyinotify.Notifier(wm, handler) |
264 |
|
265 |
try:
|
266 |
# Fail if adding the inotify() watch fails for any reason
|
267 |
res = wm.add_watch(constants.QUEUE_DIR, mask) |
268 |
if res[constants.QUEUE_DIR] < 0: |
269 |
raise Exception("pyinotify add_watch returned negative watch descriptor") |
270 |
|
271 |
logger.info("Now watching %s" % constants.QUEUE_DIR)
|
272 |
|
273 |
while True: # loop forever |
274 |
# process the queue of events as explained above
|
275 |
notifier.process_events() |
276 |
if notifier.check_events():
|
277 |
# read notified events and enqeue them
|
278 |
notifier.read_events() |
279 |
except SystemExit: |
280 |
logger.info("SystemExit")
|
281 |
except:
|
282 |
logger.exception("Caught exception, terminating")
|
283 |
finally:
|
284 |
# destroy the inotify's instance on this interrupt (stop monitoring)
|
285 |
notifier.stop() |
286 |
# mark the 0mq thread as stopped, wake it up so that it notices
|
287 |
zmqt.stop() |
288 |
pusher.send_json({'type': 'null'}) |
289 |
raise
|
290 |
|
291 |
|
292 |
if __name__ == "__main__": |
293 |
sys.exit(main()) |
294 |
|
295 |
# vim: set ts=4 sts=4 sw=4 et ai :
|