Revision 8861126f
b/logic/dispatcher.py | ||
---|---|---|
1 | 1 |
#!/usr/bin/env python |
2 | 2 |
# |
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
|
3 |
# Copyright (c) 201! Greek Research and Technology Network
|
|
4 | 4 |
# |
5 |
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
|
|
5 |
"""Connect to a queue
|
|
6 | 6 |
|
7 | 7 |
This daemon receives job notifications from ganeti-amqpd |
8 | 8 |
and updates VM state in the DB accordingly. |
... | ... | |
21 | 21 |
|
22 | 22 |
from amqplib import client_0_8 as amqp |
23 | 23 |
|
24 |
import daemon |
|
25 |
from signal import signal, SIGINT, SIGTERM, SIGKILL |
|
24 |
from signal import signal, SIGINT, SIGTERM |
|
26 | 25 |
|
27 | 26 |
import logging |
28 | 27 |
import time |
... | ... | |
30 | 29 |
|
31 | 30 |
from synnefo.logic import dispatcher_callbacks |
32 | 31 |
|
32 |
#List of worker ids |
|
33 |
global children |
|
34 |
|
|
33 | 35 |
class Dispatcher: |
34 | 36 |
|
35 | 37 |
logger = None |
... | ... | |
56 | 58 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags] |
57 | 59 |
self.chan.close() |
58 | 60 |
self.chan.connection.close() |
61 |
sys.exit() |
|
59 | 62 |
|
60 | 63 |
def _init(self): |
61 | 64 |
conn = None |
... | ... | |
101 | 104 |
self.clienttags.append(tag) |
102 | 105 |
|
103 | 106 |
def exit_handler(signum, frame): |
104 |
global handler_logger |
|
105 |
|
|
106 |
handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum) |
|
107 |
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum) |
|
107 | 108 |
raise SystemExit |
108 | 109 |
|
109 |
def child(cmdline): |
|
110 |
global logger |
|
110 |
def parent_handler(signum, frame): |
|
111 |
global children |
|
112 |
print "Caught signal %d, sending kill signal to children" % signum |
|
113 |
[os.kill(pid, SIGTERM) for pid in children] |
|
114 |
|
|
115 |
def child(cmdline, logger): |
|
111 | 116 |
#Cmd line argument parsing |
112 | 117 |
(opts, args) = parse_arguments(cmdline) |
113 |
|
|
114 |
# Initialize logger |
|
115 |
lvl = logging.DEBUG if opts.debug else logging.INFO |
|
116 |
logger = logging.getLogger("synnefo.dispatcher") |
|
117 |
logger.setLevel(lvl) |
|
118 |
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s", |
|
119 |
"%Y-%m-%d %H:%M:%S") |
|
120 |
handler = logging.FileHandler(opts.log_file) |
|
121 |
handler.setFormatter(formatter) |
|
122 |
logger.addHandler(handler) |
|
123 |
|
|
124 | 118 |
d = Dispatcher(debug = opts.debug, logger = logger) |
125 | 119 |
|
126 | 120 |
d.wait() |
... | ... | |
138 | 132 |
settings.DISPATCHER_LOG_FILE) |
139 | 133 |
parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues", |
140 | 134 |
help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)") |
135 |
parser.add_option("-w", "--workers", default=1, dest="workers", |
|
136 |
help="Number of workers to spawn") |
|
141 | 137 |
|
142 | 138 |
return parser.parse_args(args) |
143 | 139 |
|
... | ... | |
169 | 165 |
chan.queue_delete(queue=queue) |
170 | 166 |
except amqp.exceptions.AMQPChannelException as e: |
171 | 167 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
168 |
chan.close() |
|
169 |
chan.connection.close() |
|
172 | 170 |
|
173 | 171 |
def main(): |
172 |
global children, logger |
|
174 | 173 |
(opts, args) = parse_arguments(sys.argv[1:]) |
175 | 174 |
|
175 |
# Initialize logger |
|
176 |
lvl = logging.DEBUG if opts.debug else logging.INFO |
|
177 |
logger = logging.getLogger("synnefo.dispatcher") |
|
178 |
logger.setLevel(lvl) |
|
179 |
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s", |
|
180 |
"%Y-%m-%d %H:%M:%S") |
|
181 |
handler = logging.FileHandler(opts.log_file) |
|
182 |
handler.setFormatter(formatter) |
|
183 |
logger.addHandler(handler) |
|
184 |
|
|
185 |
#Special case for the clean up queues action |
|
176 | 186 |
if opts.cleanup_queues: |
177 | 187 |
cleanup_queues() |
178 | 188 |
return |
179 | 189 |
|
180 |
#newpid = os.fork()
|
|
181 |
#if newpid == 0:
|
|
182 |
child(sys.argv[1:]) |
|
183 |
#else:
|
|
184 |
# pids = (os.getpid(), newpid)
|
|
185 |
# print "parent: %d, child: %d" % pids
|
|
186 |
|
|
187 |
# Become a daemon:
|
|
188 |
# Redirect stdout and stderr to handler.stream to catch
|
|
189 |
# early errors in the daemonization process [e.g., pidfile creation]
|
|
190 |
# which will otherwise go to /dev/null.
|
|
191 |
#daemon_context = daemon.DaemonContext(
|
|
192 |
# umask=022,
|
|
193 |
# stdout=handler.stream,
|
|
194 |
# stderr=handler.stream,
|
|
195 |
# files_preserve=[handler.stream])
|
|
196 |
#daemon_context.open()
|
|
197 |
#logger.info("Became a daemon")
|
|
198 |
|
|
190 |
#Fork workers
|
|
191 |
children = []
|
|
192 |
|
|
193 |
i = 0
|
|
194 |
while i < opts.workers:
|
|
195 |
newpid = os.fork()
|
|
196 |
|
|
197 |
if newpid == 0:
|
|
198 |
signal(SIGINT, exit_handler)
|
|
199 |
signal(SIGTERM, exit_handler)
|
|
200 |
#child(sys.argv[1:], logger)
|
|
201 |
time.sleep(5)
|
|
202 |
sys.exit(0)
|
|
203 |
else:
|
|
204 |
pids = (os.getpid(), newpid)
|
|
205 |
logger.debug("%d, forked child: %d" % pids)
|
|
206 |
children.append(pids[1])
|
|
207 |
i += 1
|
|
208 |
|
|
199 | 209 |
# Catch signals to ensure graceful shutdown |
200 |
#signal(SIGINT, exit_handler) |
|
201 |
#signal(SIGTERM, exit_handler) |
|
202 |
#signal(SIGKILL, exit_handler) |
|
210 |
signal(SIGINT, parent_handler) |
|
211 |
signal(SIGTERM, parent_handler) |
|
212 |
|
|
213 |
try: |
|
214 |
os.wait() |
|
215 |
except Exception : |
|
216 |
pass |
|
203 | 217 |
|
204 | 218 |
if __name__ == "__main__": |
205 | 219 |
logging.basicConfig(level=logging.DEBUG) |
Also available in: Unified diff