Revision 2cd99e7a logic/dispatcher.py
b/logic/dispatcher.py | ||
---|---|---|
2 | 2 |
# |
3 | 3 |
# Copyright (c) 2011 Greek Research and Technology Network |
4 | 4 |
# |
5 |
"""Connect to a queue |
|
6 |
|
|
7 |
This daemon receives job notifications from a number of queues |
|
5 |
""" Message queue setup and dispatch |
|
8 | 6 |
|
7 |
This program sets up connections to the queues configured in settings.py |
|
8 |
and implements the message wait and dispatch loops. Actual messages are |
|
9 |
handled in the dispatched functions. |
|
9 | 10 |
|
10 | 11 |
""" |
11 | 12 |
|
... | ... | |
20 | 21 |
setup_environ(settings) |
21 | 22 |
|
22 | 23 |
from amqplib import client_0_8 as amqp |
23 |
|
|
24 | 24 |
from signal import signal, SIGINT, SIGTERM |
25 | 25 |
|
26 | 26 |
import logging |
... | ... | |
29 | 29 |
|
30 | 30 |
from synnefo.logic import dispatcher_callbacks |
31 | 31 |
|
32 |
# List of worker ids |
|
33 |
global children |
|
34 | 32 |
|
35 | 33 |
class Dispatcher: |
36 | 34 |
|
... | ... | |
53 | 51 |
except socket.error: |
54 | 52 |
self.logger.error("Server went away, reconnecting...") |
55 | 53 |
self._init() |
56 |
pass |
|
57 | 54 |
|
58 | 55 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags] |
59 | 56 |
self.chan.close() |
... | ... | |
73 | 70 |
virtual_host=settings.RABBIT_VHOST) |
74 | 71 |
except socket.error: |
75 | 72 |
time.sleep(1) |
76 |
pass |
|
77 | 73 |
|
78 | 74 |
self.logger.info("Connection succesful, opening channel") |
79 | 75 |
self.chan = conn.channel() |
... | ... | |
98 | 94 |
# Bind queues to handler methods |
99 | 95 |
for binding in bindings: |
100 | 96 |
try: |
101 |
cb = getattr(dispatcher_callbacks, binding[3])
|
|
97 |
callback = getattr(dispatcher_callbacks, binding[3])
|
|
102 | 98 |
except AttributeError: |
103 | 99 |
self.logger.error("Cannot find callback %s" % binding[3]) |
104 | 100 |
continue |
105 | 101 |
|
106 | 102 |
self.chan.queue_bind(queue=binding[0], exchange=binding[1], |
107 | 103 |
routing_key=binding[2]) |
108 |
tag = self.chan.basic_consume(queue=binding[0], callback=cb)
|
|
104 |
tag = self.chan.basic_consume(queue=binding[0], callback=callback)
|
|
109 | 105 |
self.logger.debug("Binding %s(%s) to queue %s with handler %s" % |
110 | 106 |
(binding[1], binding[2], binding[0], binding[3])) |
111 | 107 |
self.clienttags.append(tag) |
... | ... | |
113 | 109 |
|
114 | 110 |
def _exit_handler(signum, frame): |
115 | 111 |
""""Catch exit signal in children processes.""" |
116 |
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum) |
|
112 |
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
|
|
117 | 113 |
raise SystemExit |
118 | 114 |
|
119 | 115 |
|
... | ... | |
129 | 125 |
|
130 | 126 |
# Cmd line argument parsing |
131 | 127 |
(opts, args) = parse_arguments(cmdline) |
132 |
d = Dispatcher(debug = opts.debug, logger = logger) |
|
128 |
disp = Dispatcher(debug = opts.debug, logger = logger)
|
|
133 | 129 |
|
134 | 130 |
# Start the event loop |
135 |
d.wait() |
|
131 |
disp.wait()
|
|
136 | 132 |
|
137 | 133 |
|
138 | 134 |
def parse_arguments(args): |
... | ... | |
140 | 136 |
|
141 | 137 |
parser = OptionParser() |
142 | 138 |
parser.add_option("-d", "--debug", action="store_true", default=False, |
143 |
dest="debug", |
|
144 |
help="Enable debug mode") |
|
139 |
dest="debug", help="Enable debug mode") |
|
145 | 140 |
parser.add_option("-l", "--log", dest="log_file", |
146 |
default=settings.DISPATCHER_LOG_FILE, |
|
147 |
metavar="FILE", |
|
148 |
help="Write log to FILE instead of %s" % |
|
149 |
settings.DISPATCHER_LOG_FILE) |
|
141 |
default=settings.DISPATCHER_LOG_FILE, metavar="FILE", |
|
142 |
help="Write log to FILE instead of %s" % |
|
143 |
settings.DISPATCHER_LOG_FILE) |
|
150 | 144 |
parser.add_option("-c", "--cleanup-queues", action="store_true", |
151 | 145 |
default=False, dest="cleanup_queues", |
152 |
help="Remove all queues declared in settings.py (DANGEROUS!)") |
|
146 |
help="Remove all queues declared in settings.py (DANGEROUS!)")
|
|
153 | 147 |
parser.add_option("-w", "--workers", default=2, dest="workers", |
154 |
help="Number of workers to spawn", type="int") |
|
148 |
help="Number of workers to spawn", type="int")
|
|
155 | 149 |
|
156 | 150 |
return parser.parse_args(args) |
157 | 151 |
|
... | ... | |
197 | 191 |
logger = logging.getLogger("synnefo.dispatcher") |
198 | 192 |
logger.setLevel(lvl) |
199 | 193 |
formatter = logging.Formatter( |
200 |
"%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
|
|
201 |
"%Y-%m-%d %H:%M:%S")
|
|
194 |
"%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s", |
|
195 |
"%Y-%m-%d %H:%M:%S") |
|
202 | 196 |
handler = logging.FileHandler(opts.log_file) |
203 | 197 |
handler.setFormatter(formatter) |
204 | 198 |
logger.addHandler(handler) |
... | ... | |
231 | 225 |
signal(SIGTERM, _parent_handler) |
232 | 226 |
|
233 | 227 |
# Wait for all children process to die, one by one |
234 |
for c in children:
|
|
228 |
for pid in children:
|
|
235 | 229 |
try: |
236 |
os.wait()
|
|
230 |
os.waitpid(pid)
|
|
237 | 231 |
except Exception: |
238 | 232 |
pass |
239 | 233 |
|
Also available in: Unified diff