root / lib / daemon.py @ 8ee9a4f0
History | View | Annotate | Download (27.6 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Module with helper classes and functions for daemons"""
|
23 |
|
24 |
|
25 |
import asyncore |
26 |
import asynchat |
27 |
import collections |
28 |
import os |
29 |
import signal |
30 |
import logging |
31 |
import sched |
32 |
import time |
33 |
import socket |
34 |
import select |
35 |
import sys |
36 |
|
37 |
from ganeti import utils |
38 |
from ganeti import constants |
39 |
from ganeti import errors |
40 |
from ganeti import netutils |
41 |
from ganeti import ssconf |
42 |
from ganeti import runtime |
43 |
from ganeti import compat |
44 |
|
45 |
|
46 |
class SchedulerBreakout(Exception): |
47 |
"""Exception used to get out of the scheduler loop
|
48 |
|
49 |
"""
|
50 |
|
51 |
|
52 |
def AsyncoreDelayFunction(timeout): |
53 |
"""Asyncore-compatible scheduler delay function.
|
54 |
|
55 |
This is a delay function for sched that, rather than actually sleeping,
|
56 |
executes asyncore events happening in the meantime.
|
57 |
|
58 |
After an event has occurred, rather than returning, it raises a
|
59 |
SchedulerBreakout exception, which will force the current scheduler.run()
|
60 |
invocation to terminate, so that we can also check for signals. The main loop
|
61 |
will then call the scheduler run again, which will allow it to actually
|
62 |
process any due events.
|
63 |
|
64 |
This is needed because scheduler.run() doesn't support a count=..., as
|
65 |
asyncore loop, and the scheduler module documents throwing exceptions from
|
66 |
inside the delay function as an allowed usage model.
|
67 |
|
68 |
"""
|
69 |
asyncore.loop(timeout=timeout, count=1, use_poll=True) |
70 |
raise SchedulerBreakout()
|
71 |
|
72 |
|
73 |
class AsyncoreScheduler(sched.scheduler): |
74 |
"""Event scheduler integrated with asyncore
|
75 |
|
76 |
"""
|
77 |
def __init__(self, timefunc): |
78 |
"""Initializes this class.
|
79 |
|
80 |
"""
|
81 |
sched.scheduler.__init__(self, timefunc, self._LimitedDelay) |
82 |
self._max_delay = None |
83 |
|
84 |
def run(self, max_delay=None): # pylint: disable=W0221 |
85 |
"""Run any pending events.
|
86 |
|
87 |
@type max_delay: None or number
|
88 |
@param max_delay: Maximum delay (useful if caller has timeouts running)
|
89 |
|
90 |
"""
|
91 |
assert self._max_delay is None |
92 |
|
93 |
# The delay function used by the scheduler can't be different on each run,
|
94 |
# hence an instance variable must be used.
|
95 |
if max_delay is None: |
96 |
self._max_delay = None |
97 |
else:
|
98 |
self._max_delay = utils.RunningTimeout(max_delay, False) |
99 |
|
100 |
try:
|
101 |
return sched.scheduler.run(self) |
102 |
finally:
|
103 |
self._max_delay = None |
104 |
|
105 |
def _LimitedDelay(self, duration): |
106 |
"""Custom delay function for C{sched.scheduler}.
|
107 |
|
108 |
"""
|
109 |
if self._max_delay is None: |
110 |
timeout = duration |
111 |
else:
|
112 |
timeout = min(duration, self._max_delay.Remaining()) |
113 |
|
114 |
return AsyncoreDelayFunction(timeout)
|
115 |
|
116 |
|
117 |
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher): |
118 |
"""Base Ganeti Asyncore Dispacher
|
119 |
|
120 |
"""
|
121 |
# this method is overriding an asyncore.dispatcher method
|
122 |
def handle_error(self): |
123 |
"""Log an error in handling any request, and proceed.
|
124 |
|
125 |
"""
|
126 |
logging.exception("Error while handling asyncore request")
|
127 |
|
128 |
# this method is overriding an asyncore.dispatcher method
|
129 |
def writable(self): |
130 |
"""Most of the time we don't want to check for writability.
|
131 |
|
132 |
"""
|
133 |
return False |
134 |
|
135 |
|
136 |
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): |
137 |
"""A stream server to use with asyncore.
|
138 |
|
139 |
Each request is accepted, and then dispatched to a separate asyncore
|
140 |
dispatcher to handle.
|
141 |
|
142 |
"""
|
143 |
|
144 |
_REQUEST_QUEUE_SIZE = 5
|
145 |
|
146 |
def __init__(self, family, address): |
147 |
"""Constructor for AsyncUnixStreamSocket
|
148 |
|
149 |
@type family: integer
|
150 |
@param family: socket family (one of socket.AF_*)
|
151 |
@type address: address family dependent
|
152 |
@param address: address to bind the socket to
|
153 |
|
154 |
"""
|
155 |
GanetiBaseAsyncoreDispatcher.__init__(self)
|
156 |
self.family = family
|
157 |
self.create_socket(self.family, socket.SOCK_STREAM) |
158 |
self.set_reuse_addr()
|
159 |
self.bind(address)
|
160 |
self.listen(self._REQUEST_QUEUE_SIZE) |
161 |
|
162 |
# this method is overriding an asyncore.dispatcher method
|
163 |
def handle_accept(self): |
164 |
"""Accept a new client connection.
|
165 |
|
166 |
Creates a new instance of the handler class, which will use asyncore to
|
167 |
serve the client.
|
168 |
|
169 |
"""
|
170 |
accept_result = utils.IgnoreSignals(self.accept)
|
171 |
if accept_result is not None: |
172 |
connected_socket, client_address = accept_result |
173 |
if self.family == socket.AF_UNIX: |
174 |
# override the client address, as for unix sockets nothing meaningful
|
175 |
# is passed in from accept anyway
|
176 |
client_address = netutils.GetSocketCredentials(connected_socket) |
177 |
logging.info("Accepted connection from %s",
|
178 |
netutils.FormatAddress(client_address, family=self.family))
|
179 |
self.handle_connection(connected_socket, client_address)
|
180 |
|
181 |
def handle_connection(self, connected_socket, client_address): |
182 |
"""Handle an already accepted connection.
|
183 |
|
184 |
"""
|
185 |
raise NotImplementedError |
186 |
|
187 |
|
188 |
class AsyncTerminatedMessageStream(asynchat.async_chat): |
189 |
"""A terminator separated message stream asyncore module.
|
190 |
|
191 |
Handles a stream connection receiving messages terminated by a defined
|
192 |
separator. For each complete message handle_message is called.
|
193 |
|
194 |
"""
|
195 |
def __init__(self, connected_socket, peer_address, terminator, family, |
196 |
unhandled_limit): |
197 |
"""AsyncTerminatedMessageStream constructor.
|
198 |
|
199 |
@type connected_socket: socket.socket
|
200 |
@param connected_socket: connected stream socket to receive messages from
|
201 |
@param peer_address: family-specific peer address
|
202 |
@type terminator: string
|
203 |
@param terminator: terminator separating messages in the stream
|
204 |
@type family: integer
|
205 |
@param family: socket family
|
206 |
@type unhandled_limit: integer or None
|
207 |
@param unhandled_limit: maximum unanswered messages
|
208 |
|
209 |
"""
|
210 |
# python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
|
211 |
# using a positional argument rather than a keyword one.
|
212 |
asynchat.async_chat.__init__(self, connected_socket)
|
213 |
self.connected_socket = connected_socket
|
214 |
# on python 2.4 there is no "family" attribute for the socket class
|
215 |
# FIXME: when we move to python 2.5 or above remove the family parameter
|
216 |
#self.family = self.connected_socket.family
|
217 |
self.family = family
|
218 |
self.peer_address = peer_address
|
219 |
self.terminator = terminator
|
220 |
self.unhandled_limit = unhandled_limit
|
221 |
self.set_terminator(terminator)
|
222 |
self.ibuffer = []
|
223 |
self.receive_count = 0 |
224 |
self.send_count = 0 |
225 |
self.oqueue = collections.deque()
|
226 |
self.iqueue = collections.deque()
|
227 |
|
228 |
# this method is overriding an asynchat.async_chat method
|
229 |
def collect_incoming_data(self, data): |
230 |
self.ibuffer.append(data)
|
231 |
|
232 |
def _can_handle_message(self): |
233 |
return (self.unhandled_limit is None or |
234 |
(self.receive_count < self.send_count + self.unhandled_limit) and |
235 |
not self.iqueue) |
236 |
|
237 |
# this method is overriding an asynchat.async_chat method
|
238 |
def found_terminator(self): |
239 |
message = "".join(self.ibuffer) |
240 |
self.ibuffer = []
|
241 |
message_id = self.receive_count
|
242 |
# We need to increase the receive_count after checking if the message can
|
243 |
# be handled, but before calling handle_message
|
244 |
can_handle = self._can_handle_message()
|
245 |
self.receive_count += 1 |
246 |
if can_handle:
|
247 |
self.handle_message(message, message_id)
|
248 |
else:
|
249 |
self.iqueue.append((message, message_id))
|
250 |
|
251 |
def handle_message(self, message, message_id): |
252 |
"""Handle a terminated message.
|
253 |
|
254 |
@type message: string
|
255 |
@param message: message to handle
|
256 |
@type message_id: integer
|
257 |
@param message_id: stream's message sequence number
|
258 |
|
259 |
"""
|
260 |
pass
|
261 |
# TODO: move this method to raise NotImplementedError
|
262 |
# raise NotImplementedError
|
263 |
|
264 |
def send_message(self, message): |
265 |
"""Send a message to the remote peer. This function is thread-safe.
|
266 |
|
267 |
@type message: string
|
268 |
@param message: message to send, without the terminator
|
269 |
|
270 |
@warning: If calling this function from a thread different than the one
|
271 |
performing the main asyncore loop, remember that you have to wake that one
|
272 |
up.
|
273 |
|
274 |
"""
|
275 |
# If we just append the message we received to the output queue, this
|
276 |
# function can be safely called by multiple threads at the same time, and
|
277 |
# we don't need locking, since deques are thread safe. handle_write in the
|
278 |
# asyncore thread will handle the next input message if there are any
|
279 |
# enqueued.
|
280 |
self.oqueue.append(message)
|
281 |
|
282 |
# this method is overriding an asyncore.dispatcher method
|
283 |
def readable(self): |
284 |
# read from the socket if we can handle the next requests
|
285 |
return self._can_handle_message() and asynchat.async_chat.readable(self) |
286 |
|
287 |
# this method is overriding an asyncore.dispatcher method
|
288 |
def writable(self): |
289 |
# the output queue may become full just after we called writable. This only
|
290 |
# works if we know we'll have something else waking us up from the select,
|
291 |
# in such case, anyway.
|
292 |
return asynchat.async_chat.writable(self) or self.oqueue |
293 |
|
294 |
# this method is overriding an asyncore.dispatcher method
|
295 |
def handle_write(self): |
296 |
if self.oqueue: |
297 |
# if we have data in the output queue, then send_message was called.
|
298 |
# this means we can process one more message from the input queue, if
|
299 |
# there are any.
|
300 |
data = self.oqueue.popleft()
|
301 |
self.push(data + self.terminator) |
302 |
self.send_count += 1 |
303 |
if self.iqueue: |
304 |
self.handle_message(*self.iqueue.popleft()) |
305 |
self.initiate_send()
|
306 |
|
307 |
def close_log(self): |
308 |
logging.info("Closing connection from %s",
|
309 |
netutils.FormatAddress(self.peer_address, family=self.family)) |
310 |
self.close()
|
311 |
|
312 |
# this method is overriding an asyncore.dispatcher method
|
313 |
def handle_expt(self): |
314 |
self.close_log()
|
315 |
|
316 |
# this method is overriding an asyncore.dispatcher method
|
317 |
def handle_error(self): |
318 |
"""Log an error in handling any request, and proceed.
|
319 |
|
320 |
"""
|
321 |
logging.exception("Error while handling asyncore request")
|
322 |
self.close_log()
|
323 |
|
324 |
|
325 |
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): |
326 |
"""An improved asyncore udp socket.
|
327 |
|
328 |
"""
|
329 |
def __init__(self, family): |
330 |
"""Constructor for AsyncUDPSocket
|
331 |
|
332 |
"""
|
333 |
GanetiBaseAsyncoreDispatcher.__init__(self)
|
334 |
self._out_queue = []
|
335 |
self._family = family
|
336 |
self.create_socket(family, socket.SOCK_DGRAM)
|
337 |
|
338 |
# this method is overriding an asyncore.dispatcher method
|
339 |
def handle_connect(self): |
340 |
# Python thinks that the first udp message from a source qualifies as a
|
341 |
# "connect" and further ones are part of the same connection. We beg to
|
342 |
# differ and treat all messages equally.
|
343 |
pass
|
344 |
|
345 |
# this method is overriding an asyncore.dispatcher method
|
346 |
def handle_read(self): |
347 |
recv_result = utils.IgnoreSignals(self.recvfrom,
|
348 |
constants.MAX_UDP_DATA_SIZE) |
349 |
if recv_result is not None: |
350 |
payload, address = recv_result |
351 |
if self._family == socket.AF_INET6: |
352 |
# we ignore 'flow info' and 'scope id' as we don't need them
|
353 |
ip, port, _, _ = address |
354 |
else:
|
355 |
ip, port = address |
356 |
|
357 |
self.handle_datagram(payload, ip, port)
|
358 |
|
359 |
def handle_datagram(self, payload, ip, port): |
360 |
"""Handle an already read udp datagram
|
361 |
|
362 |
"""
|
363 |
raise NotImplementedError |
364 |
|
365 |
# this method is overriding an asyncore.dispatcher method
|
366 |
def writable(self): |
367 |
# We should check whether we can write to the socket only if we have
|
368 |
# something scheduled to be written
|
369 |
return bool(self._out_queue) |
370 |
|
371 |
# this method is overriding an asyncore.dispatcher method
|
372 |
def handle_write(self): |
373 |
if not self._out_queue: |
374 |
logging.error("handle_write called with empty output queue")
|
375 |
return
|
376 |
(ip, port, payload) = self._out_queue[0] |
377 |
utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) |
378 |
self._out_queue.pop(0) |
379 |
|
380 |
def enqueue_send(self, ip, port, payload): |
381 |
"""Enqueue a datagram to be sent when possible
|
382 |
|
383 |
"""
|
384 |
if len(payload) > constants.MAX_UDP_DATA_SIZE: |
385 |
raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload), |
386 |
constants.MAX_UDP_DATA_SIZE)) |
387 |
self._out_queue.append((ip, port, payload))
|
388 |
|
389 |
def process_next_packet(self, timeout=0): |
390 |
"""Process the next datagram, waiting for it if necessary.
|
391 |
|
392 |
@type timeout: float
|
393 |
@param timeout: how long to wait for data
|
394 |
@rtype: boolean
|
395 |
@return: True if some data has been handled, False otherwise
|
396 |
|
397 |
"""
|
398 |
result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
|
399 |
if result is not None and result & select.POLLIN: |
400 |
self.handle_read()
|
401 |
return True |
402 |
else:
|
403 |
return False |
404 |
|
405 |
|
406 |
class AsyncAwaker(GanetiBaseAsyncoreDispatcher): |
407 |
"""A way to notify the asyncore loop that something is going on.
|
408 |
|
409 |
If an asyncore daemon is multithreaded when a thread tries to push some data
|
410 |
to a socket, the main loop handling asynchronous requests might be sleeping
|
411 |
waiting on a select(). To avoid this it can create an instance of the
|
412 |
AsyncAwaker, which other threads can use to wake it up.
|
413 |
|
414 |
"""
|
415 |
def __init__(self, signal_fn=None): |
416 |
"""Constructor for AsyncAwaker
|
417 |
|
418 |
@type signal_fn: function
|
419 |
@param signal_fn: function to call when awaken
|
420 |
|
421 |
"""
|
422 |
GanetiBaseAsyncoreDispatcher.__init__(self)
|
423 |
assert signal_fn is None or callable(signal_fn) |
424 |
(self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX, |
425 |
socket.SOCK_STREAM) |
426 |
self.in_socket.setblocking(0) |
427 |
self.in_socket.shutdown(socket.SHUT_WR)
|
428 |
self.out_socket.shutdown(socket.SHUT_RD)
|
429 |
self.set_socket(self.in_socket) |
430 |
self.need_signal = True |
431 |
self.signal_fn = signal_fn
|
432 |
self.connected = True |
433 |
|
434 |
# this method is overriding an asyncore.dispatcher method
|
435 |
def handle_read(self): |
436 |
utils.IgnoreSignals(self.recv, 4096) |
437 |
if self.signal_fn: |
438 |
self.signal_fn()
|
439 |
self.need_signal = True |
440 |
|
441 |
# this method is overriding an asyncore.dispatcher method
|
442 |
def close(self): |
443 |
asyncore.dispatcher.close(self)
|
444 |
self.out_socket.close()
|
445 |
|
446 |
def signal(self): |
447 |
"""Signal the asyncore main loop.
|
448 |
|
449 |
Any data we send here will be ignored, but it will cause the select() call
|
450 |
to return.
|
451 |
|
452 |
"""
|
453 |
# Yes, there is a race condition here. No, we don't care, at worst we're
|
454 |
# sending more than one wakeup token, which doesn't harm at all.
|
455 |
if self.need_signal: |
456 |
self.need_signal = False |
457 |
self.out_socket.send(chr(0)) |
458 |
|
459 |
|
460 |
class _ShutdownCheck: |
461 |
"""Logic for L{Mainloop} shutdown.
|
462 |
|
463 |
"""
|
464 |
def __init__(self, fn): |
465 |
"""Initializes this class.
|
466 |
|
467 |
@type fn: callable
|
468 |
@param fn: Function returning C{None} if mainloop can be stopped or a
|
469 |
duration in seconds after which the function should be called again
|
470 |
@see: L{Mainloop.Run}
|
471 |
|
472 |
"""
|
473 |
assert callable(fn) |
474 |
|
475 |
self._fn = fn
|
476 |
self._defer = None |
477 |
|
478 |
def CanShutdown(self): |
479 |
"""Checks whether mainloop can be stopped.
|
480 |
|
481 |
@rtype: bool
|
482 |
|
483 |
"""
|
484 |
if self._defer and self._defer.Remaining() > 0: |
485 |
# A deferred check has already been scheduled
|
486 |
return False |
487 |
|
488 |
# Ask mainloop driver whether we can stop or should check again
|
489 |
timeout = self._fn()
|
490 |
|
491 |
if timeout is None: |
492 |
# Yes, can stop mainloop
|
493 |
return True |
494 |
|
495 |
# Schedule another check in the future
|
496 |
self._defer = utils.RunningTimeout(timeout, True) |
497 |
|
498 |
return False |
499 |
|
500 |
|
501 |
class Mainloop(object): |
502 |
"""Generic mainloop for daemons
|
503 |
|
504 |
@ivar scheduler: A sched.scheduler object, which can be used to register
|
505 |
timed events
|
506 |
|
507 |
"""
|
508 |
_SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
|
509 |
|
510 |
def __init__(self): |
511 |
"""Constructs a new Mainloop instance.
|
512 |
|
513 |
"""
|
514 |
self._signal_wait = []
|
515 |
self.scheduler = AsyncoreScheduler(time.time)
|
516 |
|
517 |
# Resolve uid/gids used
|
518 |
runtime.GetEnts() |
519 |
|
520 |
@utils.SignalHandled([signal.SIGCHLD])
|
521 |
@utils.SignalHandled([signal.SIGTERM])
|
522 |
@utils.SignalHandled([signal.SIGINT])
|
523 |
def Run(self, shutdown_wait_fn=None, signal_handlers=None): |
524 |
"""Runs the mainloop.
|
525 |
|
526 |
@type shutdown_wait_fn: callable
|
527 |
@param shutdown_wait_fn: Function to check whether loop can be terminated;
|
528 |
B{important}: function must be idempotent and must return either None
|
529 |
for shutting down or a timeout for another call
|
530 |
@type signal_handlers: dict
|
531 |
@param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
|
532 |
|
533 |
"""
|
534 |
assert isinstance(signal_handlers, dict) and \ |
535 |
len(signal_handlers) > 0, \ |
536 |
"Broken SignalHandled decorator"
|
537 |
|
538 |
# Counter for received signals
|
539 |
shutdown_signals = 0
|
540 |
|
541 |
# Logic to wait for shutdown
|
542 |
shutdown_waiter = None
|
543 |
|
544 |
# Start actual main loop
|
545 |
while True: |
546 |
if shutdown_signals == 1 and shutdown_wait_fn is not None: |
547 |
if shutdown_waiter is None: |
548 |
shutdown_waiter = _ShutdownCheck(shutdown_wait_fn) |
549 |
|
550 |
# Let mainloop driver decide if we can already abort
|
551 |
if shutdown_waiter.CanShutdown():
|
552 |
break
|
553 |
|
554 |
# Re-evaluate in a second
|
555 |
timeout = 1.0
|
556 |
|
557 |
elif shutdown_signals >= 1: |
558 |
# Abort loop if more than one signal has been sent or no callback has
|
559 |
# been given
|
560 |
break
|
561 |
|
562 |
else:
|
563 |
# Wait forever on I/O events
|
564 |
timeout = None
|
565 |
|
566 |
if self.scheduler.empty(): |
567 |
asyncore.loop(count=1, timeout=timeout, use_poll=True) |
568 |
else:
|
569 |
try:
|
570 |
self.scheduler.run(max_delay=timeout)
|
571 |
except SchedulerBreakout:
|
572 |
pass
|
573 |
|
574 |
# Check whether a signal was raised
|
575 |
for (sig, handler) in signal_handlers.items(): |
576 |
if handler.called:
|
577 |
self._CallSignalWaiters(sig)
|
578 |
if sig in (signal.SIGTERM, signal.SIGINT): |
579 |
logging.info("Received signal %s asking for shutdown", sig)
|
580 |
shutdown_signals += 1
|
581 |
handler.Clear() |
582 |
|
583 |
def _CallSignalWaiters(self, signum): |
584 |
"""Calls all signal waiters for a certain signal.
|
585 |
|
586 |
@type signum: int
|
587 |
@param signum: Signal number
|
588 |
|
589 |
"""
|
590 |
for owner in self._signal_wait: |
591 |
owner.OnSignal(signum) |
592 |
|
593 |
def RegisterSignal(self, owner): |
594 |
"""Registers a receiver for signal notifications
|
595 |
|
596 |
The receiver must support a "OnSignal(self, signum)" function.
|
597 |
|
598 |
@type owner: instance
|
599 |
@param owner: Receiver
|
600 |
|
601 |
"""
|
602 |
self._signal_wait.append(owner)
|
603 |
|
604 |
|
605 |
def _VerifyDaemonUser(daemon_name): |
606 |
"""Verifies the process uid matches the configured uid.
|
607 |
|
608 |
This method verifies that a daemon is started as the user it is
|
609 |
intended to be run
|
610 |
|
611 |
@param daemon_name: The name of daemon to be started
|
612 |
@return: A tuple with the first item indicating success or not,
|
613 |
the second item current uid and third with expected uid
|
614 |
|
615 |
"""
|
616 |
getents = runtime.GetEnts() |
617 |
running_uid = os.getuid() |
618 |
daemon_uids = { |
619 |
constants.MASTERD: getents.masterd_uid, |
620 |
constants.RAPI: getents.rapi_uid, |
621 |
constants.NODED: getents.noded_uid, |
622 |
constants.CONFD: getents.confd_uid, |
623 |
} |
624 |
assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name |
625 |
|
626 |
return (daemon_uids[daemon_name] == running_uid, running_uid,
|
627 |
daemon_uids[daemon_name]) |
628 |
|
629 |
|
630 |
def _BeautifyError(err): |
631 |
"""Try to format an error better.
|
632 |
|
633 |
Since we're dealing with daemon startup errors, in many cases this
|
634 |
will be due to socket error and such, so we try to format these cases better.
|
635 |
|
636 |
@param err: an exception object
|
637 |
@rtype: string
|
638 |
@return: the formatted error description
|
639 |
|
640 |
"""
|
641 |
try:
|
642 |
if isinstance(err, socket.error): |
643 |
return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0]) |
644 |
elif isinstance(err, EnvironmentError): |
645 |
if err.filename is None: |
646 |
return "%s (errno=%s)" % (err.strerror, err.errno) |
647 |
else:
|
648 |
return "%s (file %s) (errno=%s)" % (err.strerror, err.filename, |
649 |
err.errno) |
650 |
else:
|
651 |
return str(err) |
652 |
except Exception: # pylint: disable=W0703 |
653 |
logging.exception("Error while handling existing error %s", err)
|
654 |
return "%s" % str(err) |
655 |
|
656 |
|
657 |
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613 |
658 |
"""Handler for SIGHUP.
|
659 |
|
660 |
@param reopen_fn: List of callback functions for reopening log files
|
661 |
|
662 |
"""
|
663 |
logging.info("Reopening log files after receiving SIGHUP")
|
664 |
|
665 |
for fn in reopen_fn: |
666 |
if fn:
|
667 |
fn() |
668 |
|
669 |
|
670 |
def GenericMain(daemon_name, optionparser, |
671 |
check_fn, prepare_fn, exec_fn, |
672 |
multithreaded=False, console_logging=False, |
673 |
default_ssl_cert=None, default_ssl_key=None): |
674 |
"""Shared main function for daemons.
|
675 |
|
676 |
@type daemon_name: string
|
677 |
@param daemon_name: daemon name
|
678 |
@type optionparser: optparse.OptionParser
|
679 |
@param optionparser: initialized optionparser with daemon-specific options
|
680 |
(common -f -d options will be handled by this module)
|
681 |
@type check_fn: function which accepts (options, args)
|
682 |
@param check_fn: function that checks start conditions and exits if they're
|
683 |
not met
|
684 |
@type prepare_fn: function which accepts (options, args)
|
685 |
@param prepare_fn: function that is run before forking, or None;
|
686 |
it's result will be passed as the third parameter to exec_fn, or
|
687 |
if None was passed in, we will just pass None to exec_fn
|
688 |
@type exec_fn: function which accepts (options, args, prepare_results)
|
689 |
@param exec_fn: function that's executed with the daemon's pid file held, and
|
690 |
runs the daemon itself.
|
691 |
@type multithreaded: bool
|
692 |
@param multithreaded: Whether the daemon uses threads
|
693 |
@type console_logging: boolean
|
694 |
@param console_logging: if True, the daemon will fall back to the system
|
695 |
console if logging fails
|
696 |
@type default_ssl_cert: string
|
697 |
@param default_ssl_cert: Default SSL certificate path
|
698 |
@type default_ssl_key: string
|
699 |
@param default_ssl_key: Default SSL key path
|
700 |
|
701 |
"""
|
702 |
optionparser.add_option("-f", "--foreground", dest="fork", |
703 |
help="Don't detach from the current terminal",
|
704 |
default=True, action="store_false") |
705 |
optionparser.add_option("-d", "--debug", dest="debug", |
706 |
help="Enable some debug messages",
|
707 |
default=False, action="store_true") |
708 |
optionparser.add_option("--syslog", dest="syslog", |
709 |
help="Enable logging to syslog (except debug"
|
710 |
" messages); one of 'no', 'yes' or 'only' [%s]" %
|
711 |
constants.SYSLOG_USAGE, |
712 |
default=constants.SYSLOG_USAGE, |
713 |
choices=["no", "yes", "only"]) |
714 |
|
715 |
family = ssconf.SimpleStore().GetPrimaryIPFamily() |
716 |
# family will default to AF_INET if there is no ssconf file (e.g. when
|
717 |
# upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
|
718 |
# <= 2.2 can not be AF_INET6
|
719 |
if daemon_name in constants.DAEMONS_PORTS: |
720 |
default_bind_address = constants.IP4_ADDRESS_ANY |
721 |
if family == netutils.IP6Address.family:
|
722 |
default_bind_address = constants.IP6_ADDRESS_ANY |
723 |
|
724 |
default_port = netutils.GetDaemonPort(daemon_name) |
725 |
|
726 |
# For networked daemons we allow choosing the port and bind address
|
727 |
optionparser.add_option("-p", "--port", dest="port", |
728 |
help="Network port (default: %s)" % default_port,
|
729 |
default=default_port, type="int")
|
730 |
optionparser.add_option("-b", "--bind", dest="bind_address", |
731 |
help=("Bind address (default: '%s')" %
|
732 |
default_bind_address), |
733 |
default=default_bind_address, metavar="ADDRESS")
|
734 |
optionparser.add_option("-i", "--interface", dest="bind_interface", |
735 |
help=("Bind interface"), metavar="INTERFACE") |
736 |
|
737 |
if default_ssl_key is not None and default_ssl_cert is not None: |
738 |
optionparser.add_option("--no-ssl", dest="ssl", |
739 |
help="Do not secure HTTP protocol with SSL",
|
740 |
default=True, action="store_false") |
741 |
optionparser.add_option("-K", "--ssl-key", dest="ssl_key", |
742 |
help=("SSL key path (default: %s)" %
|
743 |
default_ssl_key), |
744 |
default=default_ssl_key, type="string",
|
745 |
metavar="SSL_KEY_PATH")
|
746 |
optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert", |
747 |
help=("SSL certificate path (default: %s)" %
|
748 |
default_ssl_cert), |
749 |
default=default_ssl_cert, type="string",
|
750 |
metavar="SSL_CERT_PATH")
|
751 |
|
752 |
# Disable the use of fork(2) if the daemon uses threads
|
753 |
if multithreaded:
|
754 |
utils.DisableFork() |
755 |
|
756 |
options, args = optionparser.parse_args() |
757 |
|
758 |
if getattr(options, "bind_interface", None) is not None: |
759 |
if options.bind_address != default_bind_address:
|
760 |
msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
|
761 |
(options.bind_address, options.bind_interface)) |
762 |
print >> sys.stderr, msg
|
763 |
sys.exit(constants.EXIT_FAILURE) |
764 |
interface_ip_addresses = \ |
765 |
netutils.GetInterfaceIpAddresses(options.bind_interface) |
766 |
if family == netutils.IP6Address.family:
|
767 |
if_addresses = interface_ip_addresses[constants.IP6_VERSION] |
768 |
else:
|
769 |
if_addresses = interface_ip_addresses[constants.IP4_VERSION] |
770 |
if len(if_addresses) < 1: |
771 |
msg = "Failed to find IP for interface %s" % options.bind_interace
|
772 |
print >> sys.stderr, msg
|
773 |
sys.exit(constants.EXIT_FAILURE) |
774 |
options.bind_address = if_addresses[0]
|
775 |
|
776 |
if getattr(options, "ssl", False): |
777 |
ssl_paths = { |
778 |
"certificate": options.ssl_cert,
|
779 |
"key": options.ssl_key,
|
780 |
} |
781 |
|
782 |
for name, path in ssl_paths.iteritems(): |
783 |
if not os.path.isfile(path): |
784 |
print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path) |
785 |
sys.exit(constants.EXIT_FAILURE) |
786 |
|
787 |
# TODO: By initiating http.HttpSslParams here we would only read the files
|
788 |
# once and have a proper validation (isfile returns False on directories)
|
789 |
# at the same time.
|
790 |
|
791 |
result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name) |
792 |
if not result: |
793 |
msg = ("%s started using wrong user ID (%d), expected %d" %
|
794 |
(daemon_name, running_uid, expected_uid)) |
795 |
print >> sys.stderr, msg
|
796 |
sys.exit(constants.EXIT_FAILURE) |
797 |
|
798 |
if check_fn is not None: |
799 |
check_fn(options, args) |
800 |
|
801 |
log_filename = constants.DAEMONS_LOGFILES[daemon_name] |
802 |
|
803 |
if options.fork:
|
804 |
utils.CloseFDs() |
805 |
(wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename) |
806 |
else:
|
807 |
(wpipe, stdio_reopen_fn) = (None, None) |
808 |
|
809 |
log_reopen_fn = \ |
810 |
utils.SetupLogging(log_filename, daemon_name, |
811 |
debug=options.debug, |
812 |
stderr_logging=not options.fork,
|
813 |
multithreaded=multithreaded, |
814 |
syslog=options.syslog, |
815 |
console_logging=console_logging) |
816 |
|
817 |
# Reopen log file(s) on SIGHUP
|
818 |
signal.signal(signal.SIGHUP, |
819 |
compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn])) |
820 |
|
821 |
try:
|
822 |
utils.WritePidFile(utils.DaemonPidFileName(daemon_name)) |
823 |
except errors.PidFileLockError, err:
|
824 |
print >> sys.stderr, "Error while locking PID file:\n%s" % err |
825 |
sys.exit(constants.EXIT_FAILURE) |
826 |
|
827 |
try:
|
828 |
try:
|
829 |
logging.info("%s daemon startup", daemon_name)
|
830 |
if callable(prepare_fn): |
831 |
prep_results = prepare_fn(options, args) |
832 |
else:
|
833 |
prep_results = None
|
834 |
except Exception, err: |
835 |
utils.WriteErrorToFD(wpipe, _BeautifyError(err)) |
836 |
raise
|
837 |
|
838 |
if wpipe is not None: |
839 |
# we're done with the preparation phase, we close the pipe to
|
840 |
# let the parent know it's safe to exit
|
841 |
os.close(wpipe) |
842 |
|
843 |
exec_fn(options, args, prep_results) |
844 |
finally:
|
845 |
utils.RemoveFile(utils.DaemonPidFileName(daemon_name)) |