root / daemons / import-export @ 043f2292
History | View | Annotate | Download (20.7 kB)
1 |
#!/usr/bin/python |
---|---|
2 |
# |
3 |
|
4 |
# Copyright (C) 2010 Google Inc. |
5 |
# |
6 |
# This program is free software; you can redistribute it and/or modify |
7 |
# it under the terms of the GNU General Public License as published by |
8 |
# the Free Software Foundation; either version 2 of the License, or |
9 |
# (at your option) any later version. |
10 |
# |
11 |
# This program is distributed in the hope that it will be useful, but |
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 |
# General Public License for more details. |
15 |
# |
16 |
# You should have received a copy of the GNU General Public License |
17 |
# along with this program; if not, write to the Free Software |
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 |
# 02110-1301, USA. |
20 |
|
21 |
|
22 |
"""Import/export daemon. |
23 |
|
24 |
""" |
25 |
|
26 |
# pylint: disable-msg=C0103 |
27 |
# C0103: Invalid name import-export |
28 |
|
29 |
import errno |
30 |
import logging |
31 |
import optparse |
32 |
import os |
33 |
import re |
34 |
import select |
35 |
import signal |
36 |
import socket |
37 |
import subprocess |
38 |
import sys |
39 |
import time |
40 |
from cStringIO import StringIO |
41 |
|
42 |
from ganeti import constants |
43 |
from ganeti import cli |
44 |
from ganeti import utils |
45 |
from ganeti import serializer |
46 |
from ganeti import objects |
47 |
from ganeti import locking |
48 |
|
49 |
|
50 |
#: Used to recognize point at which socat(1) starts to listen on its socket. |
51 |
#: The local address is required for the remote peer to connect (in particular |
52 |
#: the port number). |
53 |
LISTENING_RE = re.compile(r"^listening on\s+" |
54 |
r"AF=(?P<family>\d+)\s+" |
55 |
r"(?P<address>.+):(?P<port>\d+)$", re.I) |
56 |
|
57 |
#: Used to recognize point at which socat(1) is sending data over the wire |
58 |
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", |
59 |
re.I) |
60 |
|
61 |
SOCAT_LOG_DEBUG = "D" |
62 |
SOCAT_LOG_INFO = "I" |
63 |
SOCAT_LOG_NOTICE = "N" |
64 |
SOCAT_LOG_WARNING = "W" |
65 |
SOCAT_LOG_ERROR = "E" |
66 |
SOCAT_LOG_FATAL = "F" |
67 |
|
68 |
SOCAT_LOG_IGNORE = frozenset([ |
69 |
SOCAT_LOG_DEBUG, |
70 |
SOCAT_LOG_INFO, |
71 |
SOCAT_LOG_NOTICE, |
72 |
]) |
73 |
|
74 |
#: Socat buffer size: at most this many bytes are transferred per step |
75 |
SOCAT_BUFSIZE = 1024 * 1024 |
76 |
|
77 |
#: How many lines to keep in the status file |
78 |
MAX_RECENT_OUTPUT_LINES = 20 |
79 |
|
80 |
#: Don't update status file more than once every 5 seconds (unless forced) |
81 |
MIN_UPDATE_INTERVAL = 5.0 |
82 |
|
83 |
#: Give child process up to 5 seconds to exit after sending a signal |
84 |
CHILD_LINGER_TIMEOUT = 5.0 |
85 |
|
86 |
#: How long to wait for a connection to be established |
87 |
DEFAULT_CONNECT_TIMEOUT = 60 |
88 |
|
89 |
# Common options for socat |
90 |
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
91 |
SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] |
92 |
|
93 |
|
94 |
# Global variable for options |
95 |
options = None |
96 |
|
97 |
|
98 |
class Error(Exception): |
99 |
"""Generic exception""" |
100 |
|
101 |
|
102 |
def SetupLogging(): |
103 |
"""Configures the logging module. |
104 |
|
105 |
""" |
106 |
formatter = logging.Formatter("%(asctime)s: %(message)s") |
107 |
|
108 |
stderr_handler = logging.StreamHandler() |
109 |
stderr_handler.setFormatter(formatter) |
110 |
stderr_handler.setLevel(logging.NOTSET) |
111 |
|
112 |
root_logger = logging.getLogger("") |
113 |
root_logger.addHandler(stderr_handler) |
114 |
|
115 |
if options.debug: |
116 |
root_logger.setLevel(logging.NOTSET) |
117 |
elif options.verbose: |
118 |
root_logger.setLevel(logging.INFO) |
119 |
else: |
120 |
root_logger.setLevel(logging.ERROR) |
121 |
|
122 |
# Create special logger for child process output |
123 |
child_logger = logging.Logger("child output") |
124 |
child_logger.addHandler(stderr_handler) |
125 |
child_logger.setLevel(logging.NOTSET) |
126 |
|
127 |
return child_logger |
128 |
|
129 |
|
130 |
def _VerifyListening(family, address, port): |
131 |
"""Verify address given as listening address by socat. |
132 |
|
133 |
""" |
134 |
# TODO: Implement IPv6 support |
135 |
if family != socket.AF_INET: |
136 |
raise Error("Address family %r not supported" % family) |
137 |
|
138 |
try: |
139 |
packed_address = socket.inet_pton(family, address) |
140 |
except socket.error: |
141 |
raise Error("Invalid address %r for family %s" % (address, family)) |
142 |
|
143 |
return (socket.inet_ntop(family, packed_address), port) |
144 |
|
145 |
|
146 |
class StatusFile: |
147 |
"""Status file manager. |
148 |
|
149 |
""" |
150 |
def __init__(self, path): |
151 |
"""Initializes class. |
152 |
|
153 |
""" |
154 |
self._path = path |
155 |
self._data = objects.ImportExportStatus(ctime=time.time(), |
156 |
mtime=None, |
157 |
recent_output=[]) |
158 |
|
159 |
def AddRecentOutput(self, line): |
160 |
"""Adds a new line of recent output. |
161 |
|
162 |
""" |
163 |
self._data.recent_output.append(line) |
164 |
|
165 |
# Remove old lines |
166 |
del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES] |
167 |
|
168 |
def SetListenPort(self, port): |
169 |
"""Sets the port the daemon is listening on. |
170 |
|
171 |
@type port: int |
172 |
@param port: TCP/UDP port |
173 |
|
174 |
""" |
175 |
assert isinstance(port, (int, long)) and 0 < port < 2**16 |
176 |
self._data.listen_port = port |
177 |
|
178 |
def GetListenPort(self): |
179 |
"""Returns the port the daemon is listening on. |
180 |
|
181 |
""" |
182 |
return self._data.listen_port |
183 |
|
184 |
def SetConnected(self): |
185 |
"""Sets the connected flag. |
186 |
|
187 |
""" |
188 |
self._data.connected = True |
189 |
|
190 |
def GetConnected(self): |
191 |
"""Determines whether the daemon is connected. |
192 |
|
193 |
""" |
194 |
return self._data.connected |
195 |
|
196 |
def SetExitStatus(self, exit_status, error_message): |
197 |
"""Sets the exit status and an error message. |
198 |
|
199 |
""" |
200 |
# Require error message when status isn't 0 |
201 |
assert exit_status == 0 or error_message |
202 |
|
203 |
self._data.exit_status = exit_status |
204 |
self._data.error_message = error_message |
205 |
|
206 |
def ExitStatusIsSuccess(self): |
207 |
"""Returns whether the exit status means "success". |
208 |
|
209 |
""" |
210 |
return not bool(self._data.error_message) |
211 |
|
212 |
def Update(self, force): |
213 |
"""Updates the status file. |
214 |
|
215 |
@type force: bool |
216 |
@param force: Write status file in any case, not only when minimum interval |
217 |
is expired |
218 |
|
219 |
""" |
220 |
if not (force or |
221 |
self._data.mtime is None or |
222 |
time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)): |
223 |
return |
224 |
|
225 |
logging.debug("Updating status file %s", self._path) |
226 |
|
227 |
self._data.mtime = time.time() |
228 |
utils.WriteFile(self._path, |
229 |
data=serializer.DumpJson(self._data.ToDict(), indent=True), |
230 |
mode=0400) |
231 |
|
232 |
|
233 |
def _ProcessSocatOutput(status_file, level, msg): |
234 |
"""Interprets socat log output. |
235 |
|
236 |
""" |
237 |
if level == SOCAT_LOG_NOTICE: |
238 |
if status_file.GetListenPort() is None: |
239 |
# TODO: Maybe implement timeout to not listen forever |
240 |
m = LISTENING_RE.match(msg) |
241 |
if m: |
242 |
(_, port) = _VerifyListening(int(m.group("family")), m.group("address"), |
243 |
int(m.group("port"))) |
244 |
|
245 |
status_file.SetListenPort(port) |
246 |
return True |
247 |
|
248 |
if not status_file.GetConnected(): |
249 |
m = TRANSFER_LOOP_RE.match(msg) |
250 |
if m: |
251 |
status_file.SetConnected() |
252 |
return True |
253 |
|
254 |
return False |
255 |
|
256 |
|
257 |
def ProcessOutput(line, status_file, logger, socat): |
258 |
"""Takes care of child process output. |
259 |
|
260 |
@param status_file: Status file manager |
261 |
@param logger: Child output logger |
262 |
@type socat: bool |
263 |
@param socat: Whether it's a socat output line |
264 |
@type line: string |
265 |
@param line: Child output line |
266 |
|
267 |
""" |
268 |
force_update = False |
269 |
forward_line = line |
270 |
|
271 |
if socat: |
272 |
level = None |
273 |
parts = line.split(None, 4) |
274 |
|
275 |
if len(parts) == 5: |
276 |
(_, _, _, level, msg) = parts |
277 |
|
278 |
force_update = _ProcessSocatOutput(status_file, level, msg) |
279 |
|
280 |
if options.debug or (level and level not in SOCAT_LOG_IGNORE): |
281 |
forward_line = "socat: %s %s" % (level, msg) |
282 |
else: |
283 |
forward_line = None |
284 |
else: |
285 |
forward_line = "socat: %s" % line |
286 |
|
287 |
if forward_line: |
288 |
logger.info(forward_line) |
289 |
status_file.AddRecentOutput(forward_line) |
290 |
|
291 |
status_file.Update(force_update) |
292 |
|
293 |
|
294 |
def GetBashCommand(cmd): |
295 |
"""Prepares a command to be run in Bash. |
296 |
|
297 |
""" |
298 |
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
299 |
|
300 |
|
301 |
def GetSocatCommand(mode): |
302 |
"""Returns the socat command. |
303 |
|
304 |
""" |
305 |
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
306 |
"key=%s" % options.key, |
307 |
"cert=%s" % options.cert, |
308 |
"cafile=%s" % options.ca, |
309 |
] |
310 |
|
311 |
if options.bind is not None: |
312 |
common_addr_opts.append("bind=%s" % options.bind) |
313 |
|
314 |
if mode == constants.IEM_IMPORT: |
315 |
if options.port is None: |
316 |
port = 0 |
317 |
else: |
318 |
port = options.port |
319 |
|
320 |
addr1 = [ |
321 |
"OPENSSL-LISTEN:%s" % port, |
322 |
"reuseaddr", |
323 |
|
324 |
# Retry to listen if connection wasn't established successfully, up to |
325 |
# 100 times a second. Note that this still leaves room for DoS attacks. |
326 |
"forever", |
327 |
"intervall=0.01", |
328 |
] + common_addr_opts |
329 |
addr2 = ["stdout"] |
330 |
|
331 |
elif mode == constants.IEM_EXPORT: |
332 |
addr1 = ["stdin"] |
333 |
addr2 = [ |
334 |
"OPENSSL:%s:%s" % (options.host, options.port), |
335 |
|
336 |
# How long to wait per connection attempt |
337 |
"connect-timeout=%s" % options.connect_timeout, |
338 |
|
339 |
# Retry a few times before giving up to connect (once per second) |
340 |
"retry=%s" % options.connect_retries, |
341 |
"intervall=1", |
342 |
] + common_addr_opts |
343 |
|
344 |
else: |
345 |
raise Error("Invalid mode") |
346 |
|
347 |
for i in [addr1, addr2]: |
348 |
for value in i: |
349 |
if "," in value: |
350 |
raise Error("Comma not allowed in socat option value: %r" % value) |
351 |
|
352 |
return [ |
353 |
constants.SOCAT_PATH, |
354 |
|
355 |
# Log to stderr |
356 |
"-ls", |
357 |
|
358 |
# Log level |
359 |
"-d", "-d", |
360 |
|
361 |
# Buffer size |
362 |
"-b%s" % SOCAT_BUFSIZE, |
363 |
|
364 |
# Unidirectional mode, the first address is only used for reading, and the |
365 |
# second address is only used for writing |
366 |
"-u", |
367 |
|
368 |
",".join(addr1), ",".join(addr2) |
369 |
] |
370 |
|
371 |
|
372 |
def GetTransportCommand(mode, socat_stderr_fd): |
373 |
"""Returns the command for the transport part of the daemon. |
374 |
|
375 |
@param mode: Daemon mode (import or export) |
376 |
@type socat_stderr_fd: int |
377 |
@param socat_stderr_fd: File descriptor socat should write its stderr to |
378 |
|
379 |
""" |
380 |
socat_cmd = ("%s 2>&%d" % |
381 |
(utils.ShellQuoteArgs(GetSocatCommand(mode)), |
382 |
socat_stderr_fd)) |
383 |
|
384 |
# TODO: Make compression configurable |
385 |
|
386 |
if mode == constants.IEM_IMPORT: |
387 |
transport_cmd = "%s | gunzip -c" % socat_cmd |
388 |
elif mode == constants.IEM_EXPORT: |
389 |
transport_cmd = "gzip -c | %s" % socat_cmd |
390 |
else: |
391 |
raise Error("Invalid mode") |
392 |
|
393 |
# TODO: Use "dd" to measure processed data (allows to give an ETA) |
394 |
|
395 |
# TODO: Run transport as separate user |
396 |
# The transport uses its own shell to simplify running it as a separate user |
397 |
# in the future. |
398 |
return GetBashCommand(transport_cmd) |
399 |
|
400 |
|
401 |
def GetCommand(mode, socat_stderr_fd): |
402 |
"""Returns the complete child process command. |
403 |
|
404 |
""" |
405 |
buf = StringIO() |
406 |
|
407 |
if options.cmd_prefix: |
408 |
buf.write(options.cmd_prefix) |
409 |
buf.write(" ") |
410 |
|
411 |
buf.write(utils.ShellQuoteArgs(GetTransportCommand(mode, socat_stderr_fd))) |
412 |
|
413 |
if options.cmd_suffix: |
414 |
buf.write(" ") |
415 |
buf.write(options.cmd_suffix) |
416 |
|
417 |
return GetBashCommand(buf.getvalue()) |
418 |
|
419 |
|
420 |
def ProcessChildIO(child, socat_stderr_read_fd, status_file, child_logger, |
421 |
signal_notify, signal_handler, mode): |
422 |
"""Handles the child processes' output. |
423 |
|
424 |
""" |
425 |
assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \ |
426 |
"Other signals are not handled in this function" |
427 |
|
428 |
# Buffer size 0 is important, otherwise .read() with a specified length |
429 |
# might buffer data while poll(2) won't mark its file descriptor as |
430 |
# readable again. |
431 |
socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) |
432 |
|
433 |
script_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, |
434 |
child_logger, False) |
435 |
try: |
436 |
socat_stderr_lines = utils.LineSplitter(ProcessOutput, status_file, |
437 |
child_logger, True) |
438 |
try: |
439 |
fdmap = { |
440 |
child.stderr.fileno(): (child.stderr, script_stderr_lines), |
441 |
socat_stderr_read.fileno(): (socat_stderr_read, socat_stderr_lines), |
442 |
signal_notify.fileno(): (signal_notify, None), |
443 |
} |
444 |
|
445 |
poller = select.poll() |
446 |
for fd in fdmap: |
447 |
utils.SetNonblockFlag(fd, True) |
448 |
poller.register(fd, select.POLLIN) |
449 |
|
450 |
if options.connect_timeout and mode == constants.IEM_IMPORT: |
451 |
listen_timeout = locking.RunningTimeout(options.connect_timeout, True) |
452 |
else: |
453 |
listen_timeout = None |
454 |
|
455 |
exit_timeout = None |
456 |
|
457 |
while True: |
458 |
# Break out of loop if only signal notify FD is left |
459 |
if len(fdmap) == 1 and signal_notify.fileno() in fdmap: |
460 |
break |
461 |
|
462 |
timeout = None |
463 |
|
464 |
if listen_timeout and not exit_timeout: |
465 |
if status_file.GetConnected(): |
466 |
listen_timeout = None |
467 |
elif listen_timeout.Remaining() < 0: |
468 |
logging.info("Child process didn't establish connection in time") |
469 |
child.Kill(signal.SIGTERM) |
470 |
exit_timeout = \ |
471 |
locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) |
472 |
# Next block will calculate timeout |
473 |
else: |
474 |
# Not yet connected, check again in a second |
475 |
timeout = 1000 |
476 |
|
477 |
if exit_timeout: |
478 |
timeout = exit_timeout.Remaining() * 1000 |
479 |
if timeout < 0: |
480 |
logging.info("Child process didn't exit in time") |
481 |
break |
482 |
|
483 |
for fd, event in utils.RetryOnSignal(poller.poll, timeout): |
484 |
if event & (select.POLLIN | event & select.POLLPRI): |
485 |
(from_, to) = fdmap[fd] |
486 |
|
487 |
# Read up to 1 KB of data |
488 |
data = from_.read(1024) |
489 |
if data: |
490 |
if to: |
491 |
to.write(data) |
492 |
elif fd == signal_notify.fileno(): |
493 |
# Signal handling |
494 |
if signal_handler.called: |
495 |
signal_handler.Clear() |
496 |
if exit_timeout: |
497 |
logging.info("Child process still has about %0.2f seconds" |
498 |
" to exit", exit_timeout.Remaining()) |
499 |
else: |
500 |
logging.info("Giving child process %0.2f seconds to exit", |
501 |
CHILD_LINGER_TIMEOUT) |
502 |
exit_timeout = \ |
503 |
locking.RunningTimeout(CHILD_LINGER_TIMEOUT, True) |
504 |
else: |
505 |
poller.unregister(fd) |
506 |
del fdmap[fd] |
507 |
|
508 |
elif event & (select.POLLNVAL | select.POLLHUP | |
509 |
select.POLLERR): |
510 |
poller.unregister(fd) |
511 |
del fdmap[fd] |
512 |
|
513 |
script_stderr_lines.flush() |
514 |
socat_stderr_lines.flush() |
515 |
|
516 |
# If there was a timeout calculator, we were waiting for the child to |
517 |
# finish, e.g. due to a signal |
518 |
return not bool(exit_timeout) |
519 |
finally: |
520 |
socat_stderr_lines.close() |
521 |
finally: |
522 |
script_stderr_lines.close() |
523 |
|
524 |
|
525 |
def ParseOptions(): |
526 |
"""Parses the options passed to the program. |
527 |
|
528 |
@return: Arguments to program |
529 |
|
530 |
""" |
531 |
global options # pylint: disable-msg=W0603 |
532 |
|
533 |
parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" % |
534 |
(constants.IEM_IMPORT, |
535 |
constants.IEM_EXPORT))) |
536 |
parser.add_option(cli.DEBUG_OPT) |
537 |
parser.add_option(cli.VERBOSE_OPT) |
538 |
parser.add_option("--key", dest="key", action="store", type="string", |
539 |
help="RSA key file") |
540 |
parser.add_option("--cert", dest="cert", action="store", type="string", |
541 |
help="X509 certificate file") |
542 |
parser.add_option("--ca", dest="ca", action="store", type="string", |
543 |
help="X509 CA file") |
544 |
parser.add_option("--bind", dest="bind", action="store", type="string", |
545 |
help="Bind address") |
546 |
parser.add_option("--host", dest="host", action="store", type="string", |
547 |
help="Remote hostname") |
548 |
parser.add_option("--port", dest="port", action="store", type="int", |
549 |
help="Remote port") |
550 |
parser.add_option("--connect-retries", dest="connect_retries", action="store", |
551 |
type="int", default=0, |
552 |
help=("How many times the connection should be retried" |
553 |
" (export only)")) |
554 |
parser.add_option("--connect-timeout", dest="connect_timeout", action="store", |
555 |
type="int", default=DEFAULT_CONNECT_TIMEOUT, |
556 |
help="Timeout for connection to be established (seconds)") |
557 |
parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", |
558 |
type="string", help="Command prefix") |
559 |
parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", |
560 |
type="string", help="Command suffix") |
561 |
|
562 |
(options, args) = parser.parse_args() |
563 |
|
564 |
if len(args) != 2: |
565 |
# Won't return |
566 |
parser.error("Expected exactly two arguments") |
567 |
|
568 |
(status_file_path, mode) = args |
569 |
|
570 |
if mode not in (constants.IEM_IMPORT, |
571 |
constants.IEM_EXPORT): |
572 |
# Won't return |
573 |
parser.error("Invalid mode: %s" % mode) |
574 |
|
575 |
return (status_file_path, mode) |
576 |
|
577 |
|
578 |
class ChildProcess(subprocess.Popen): |
579 |
def __init__(self, cmd, noclose_fds): |
580 |
"""Initializes this class. |
581 |
|
582 |
""" |
583 |
self._noclose_fds = noclose_fds |
584 |
|
585 |
# Not using close_fds because doing so would also close the socat stderr |
586 |
# pipe, which we still need. |
587 |
subprocess.Popen.__init__(self, cmd, shell=False, close_fds=False, |
588 |
stderr=subprocess.PIPE, stdout=None, stdin=None, |
589 |
preexec_fn=self._ChildPreexec) |
590 |
self._SetProcessGroup() |
591 |
|
592 |
def _ChildPreexec(self): |
593 |
"""Called before child executable is execve'd. |
594 |
|
595 |
""" |
596 |
# Move to separate process group. By sending a signal to its process group |
597 |
# we can kill the child process and all grandchildren. |
598 |
os.setpgid(0, 0) |
599 |
|
600 |
# Close almost all file descriptors |
601 |
utils.CloseFDs(noclose_fds=self._noclose_fds) |
602 |
|
603 |
def _SetProcessGroup(self): |
604 |
"""Sets the child's process group. |
605 |
|
606 |
""" |
607 |
assert self.pid, "Can't be called in child process" |
608 |
|
609 |
# Avoid race condition by setting child's process group (as good as |
610 |
# possible in Python) before sending signals to child. For an |
611 |
# explanation, see preexec function for child. |
612 |
try: |
613 |
os.setpgid(self.pid, self.pid) |
614 |
except EnvironmentError, err: |
615 |
# If the child process was faster we receive EPERM or EACCES |
616 |
if err.errno not in (errno.EPERM, errno.EACCES): |
617 |
raise |
618 |
|
619 |
def Kill(self, signum): |
620 |
"""Sends signal to child process. |
621 |
|
622 |
""" |
623 |
logging.info("Sending signal %s to child process", signum) |
624 |
os.killpg(self.pid, signum) |
625 |
|
626 |
def ForceQuit(self): |
627 |
"""Ensure child process is no longer running. |
628 |
|
629 |
""" |
630 |
# Final check if child process is still alive |
631 |
if utils.RetryOnSignal(self.poll) is None: |
632 |
logging.error("Child process still alive, sending SIGKILL") |
633 |
self.Kill(signal.SIGKILL) |
634 |
utils.RetryOnSignal(self.wait) |
635 |
|
636 |
|
637 |
def main(): |
638 |
"""Main function. |
639 |
|
640 |
""" |
641 |
# Option parsing |
642 |
(status_file_path, mode) = ParseOptions() |
643 |
|
644 |
# Configure logging |
645 |
child_logger = SetupLogging() |
646 |
|
647 |
status_file = StatusFile(status_file_path) |
648 |
try: |
649 |
try: |
650 |
# Pipe to receive socat's stderr output |
651 |
(socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
652 |
|
653 |
# Get child process command |
654 |
cmd = GetCommand(mode, socat_stderr_write_fd) |
655 |
|
656 |
logging.debug("Starting command %r", cmd) |
657 |
|
658 |
# Start child process |
659 |
child = ChildProcess(cmd, [socat_stderr_write_fd]) |
660 |
try: |
661 |
def _ForwardSignal(signum, _): |
662 |
"""Forwards signals to child process. |
663 |
|
664 |
""" |
665 |
child.Kill(signum) |
666 |
|
667 |
signal_wakeup = utils.SignalWakeupFd() |
668 |
try: |
669 |
# TODO: There is a race condition between starting the child and |
670 |
# handling the signals here. While there might be a way to work around |
671 |
# it by registering the handlers before starting the child and |
672 |
# deferring sent signals until the child is available, doing so can be |
673 |
# complicated. |
674 |
signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], |
675 |
handler_fn=_ForwardSignal, |
676 |
wakeup=signal_wakeup) |
677 |
try: |
678 |
# Close child's side |
679 |
utils.RetryOnSignal(os.close, socat_stderr_write_fd) |
680 |
|
681 |
if ProcessChildIO(child, socat_stderr_read_fd, status_file, |
682 |
child_logger, signal_wakeup, signal_handler, |
683 |
mode): |
684 |
# The child closed all its file descriptors and there was no |
685 |
# signal |
686 |
# TODO: Implement timeout instead of waiting indefinitely |
687 |
utils.RetryOnSignal(child.wait) |
688 |
finally: |
689 |
signal_handler.Reset() |
690 |
finally: |
691 |
signal_wakeup.Reset() |
692 |
finally: |
693 |
child.ForceQuit() |
694 |
|
695 |
if child.returncode == 0: |
696 |
errmsg = None |
697 |
elif child.returncode < 0: |
698 |
errmsg = "Exited due to signal %s" % (-child.returncode, ) |
699 |
else: |
700 |
errmsg = "Exited with status %s" % (child.returncode, ) |
701 |
|
702 |
status_file.SetExitStatus(child.returncode, errmsg) |
703 |
except Exception, err: # pylint: disable-msg=W0703 |
704 |
logging.exception("Unhandled error occurred") |
705 |
status_file.SetExitStatus(constants.EXIT_FAILURE, |
706 |
"Unhandled error occurred: %s" % (err, )) |
707 |
|
708 |
if status_file.ExitStatusIsSuccess(): |
709 |
sys.exit(constants.EXIT_SUCCESS) |
710 |
|
711 |
sys.exit(constants.EXIT_FAILURE) |
712 |
finally: |
713 |
status_file.Update(True) |
714 |
|
715 |
|
716 |
if __name__ == "__main__": |
717 |
main() |