24 |
24 |
from signal import signal, SIGINT, SIGTERM
|
25 |
25 |
|
26 |
26 |
import logging
|
|
27 |
import logging.config
|
27 |
28 |
import time
|
28 |
29 |
import socket
|
|
30 |
from daemon import pidfile, daemon
|
29 |
31 |
|
30 |
32 |
from synnefo.logic import dispatcher_callbacks
|
31 |
33 |
|
... | ... | |
37 |
39 |
debug = False
|
38 |
40 |
clienttags = []
|
39 |
41 |
|
40 |
|
def __init__(self, debug = False, logger = None):
|
41 |
|
self.logger = logger
|
|
42 |
def __init__(self, debug = False):
|
|
43 |
# Initialize logger
|
|
44 |
logging.config.fileConfig("/Volumes/Files/Developer/grnet/synnefo/logging.conf")
|
|
45 |
self.logger = logging.getLogger("synnefo.dispatcher")
|
|
46 |
|
42 |
47 |
self.debug = debug
|
43 |
48 |
self._init()
|
44 |
49 |
|
|
50 |
|
45 |
51 |
def wait(self):
|
46 |
52 |
while True:
|
47 |
53 |
try:
|
... | ... | |
58 |
64 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
|
59 |
65 |
self.chan.connection.close()
|
60 |
66 |
self.chan.close()
|
61 |
|
sys.exit()
|
62 |
67 |
|
63 |
68 |
def _init(self):
|
|
69 |
self.logger.info("Initializing")
|
|
70 |
|
64 |
71 |
# Connect to RabbitMQ
|
65 |
72 |
conn = None
|
66 |
73 |
while conn == None:
|
... | ... | |
123 |
130 |
[os.kill(pid, SIGTERM) for pid in children]
|
124 |
131 |
|
125 |
132 |
|
126 |
|
def child(cmdline, logger):
|
|
133 |
def child(cmdline):
|
127 |
134 |
"""The context of the child process"""
|
128 |
135 |
|
129 |
136 |
# Cmd line argument parsing
|
130 |
137 |
(opts, args) = parse_arguments(cmdline)
|
131 |
|
disp = Dispatcher(debug = opts.debug, logger = logger)
|
|
138 |
disp = Dispatcher(debug = opts.debug)
|
132 |
139 |
|
133 |
140 |
# Start the event loop
|
134 |
141 |
disp.wait()
|
... | ... | |
146 |
153 |
settings.DISPATCHER_LOG_FILE)
|
147 |
154 |
parser.add_option("-c", "--cleanup-queues", action="store_true",
|
148 |
155 |
default=False, dest="cleanup_queues",
|
149 |
|
help="Remove all queues declared in settings.py (DANGEROUS!)")
|
|
156 |
help="Remove all declared queues (DANGEROUS!)")
|
150 |
157 |
parser.add_option("-w", "--workers", default=2, dest="workers",
|
151 |
158 |
help="Number of workers to spawn", type="int")
|
152 |
159 |
|
... | ... | |
185 |
192 |
chan.connection.close()
|
186 |
193 |
|
187 |
194 |
|
188 |
|
def debug_mode(logger):
|
189 |
|
disp = Dispatcher(debug = True, logger = logger)
|
|
195 |
def debug_mode():
|
|
196 |
disp = Dispatcher(debug = True)
|
190 |
197 |
signal(SIGINT, _exit_handler)
|
191 |
198 |
signal(SIGTERM, _exit_handler)
|
192 |
199 |
|
... | ... | |
198 |
205 |
(opts, args) = parse_arguments(sys.argv[1:])
|
199 |
206 |
|
200 |
207 |
# Initialize logger
|
201 |
|
lvl = logging.DEBUG if opts.debug else logging.INFO
|
|
208 |
logging.config.fileConfig("logging.conf")
|
202 |
209 |
logger = logging.getLogger("synnefo.dispatcher")
|
203 |
|
logger.setLevel(lvl)
|
204 |
|
formatter = logging.Formatter(
|
205 |
|
"%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
|
206 |
|
"%Y-%m-%d %H:%M:%S")
|
207 |
|
handler = logging.FileHandler(opts.log_file)
|
208 |
|
handler.setFormatter(formatter)
|
209 |
|
logger.addHandler(handler)
|
210 |
210 |
|
211 |
211 |
# Special case for the clean up queues action
|
212 |
212 |
if opts.cleanup_queues:
|
... | ... | |
215 |
215 |
|
216 |
216 |
# Debug mode, process messages without spawning workers
|
217 |
217 |
if opts.debug:
|
218 |
|
debug_mode(logger = logger)
|
|
218 |
debug_mode()
|
219 |
219 |
return
|
220 |
220 |
|
|
221 |
# Create pidfile
|
|
222 |
pidf = pidfile.TimeoutPIDLockFile("/Volumes/Files/Developer/grnet/synnefo/dispatcher.pid", 10)
|
|
223 |
|
|
224 |
# Become a daemon
|
|
225 |
daemon_context = daemon.DaemonContext(
|
|
226 |
pidfile=pidf,
|
|
227 |
stdout=sys.stdout,
|
|
228 |
stderr=sys.stderr,
|
|
229 |
umask=022)
|
|
230 |
daemon_context.open()
|
|
231 |
logger.info("Became a daemon")
|
|
232 |
|
221 |
233 |
# Fork workers
|
222 |
234 |
children = []
|
223 |
235 |
|
... | ... | |
228 |
240 |
if newpid == 0:
|
229 |
241 |
signal(SIGINT, _exit_handler)
|
230 |
242 |
signal(SIGTERM, _exit_handler)
|
231 |
|
child(sys.argv[1:], logger)
|
232 |
|
sys.exit(0)
|
|
243 |
child(sys.argv[1:])
|
|
244 |
try:
|
|
245 |
sys.exit(0)
|
|
246 |
except Exception:
|
|
247 |
print "foo"
|
233 |
248 |
else:
|
234 |
249 |
pids = (os.getpid(), newpid)
|
235 |
250 |
logger.debug("%d, forked child: %d" % pids)
|
... | ... | |
240 |
255 |
signal(SIGINT, _parent_handler)
|
241 |
256 |
signal(SIGTERM, _parent_handler)
|
242 |
257 |
|
243 |
|
# Wait for all children process to die, one by one
|
|
258 |
# Wait for all children processes to die, one by one
|
244 |
259 |
for pid in children:
|
245 |
260 |
try:
|
246 |
261 |
os.waitpid(pid, 0)
|