--select-instances hbal manpage update
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
44
45
46 class SchedulerBreakout(Exception):
47   """Exception used to get out of the scheduler loop
48
49   """
50
51
52 def AsyncoreDelayFunction(timeout):
53   """Asyncore-compatible scheduler delay function.
54
55   This is a delay function for sched that, rather than actually sleeping,
56   executes asyncore events happening in the meantime.
57
58   After an event has occurred, rather than returning, it raises a
59   SchedulerBreakout exception, which will force the current scheduler.run()
60   invocation to terminate, so that we can also check for signals. The main loop
61   will then call the scheduler run again, which will allow it to actually
62   process any due events.
63
64   This is needed because scheduler.run() doesn't support a count=..., as
65   asyncore loop, and the scheduler module documents throwing exceptions from
66   inside the delay function as an allowed usage model.
67
68   """
69   asyncore.loop(timeout=timeout, count=1, use_poll=True)
70   raise SchedulerBreakout()
71
72
73 class AsyncoreScheduler(sched.scheduler):
74   """Event scheduler integrated with asyncore
75
76   """
77   def __init__(self, timefunc):
78     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79
80
81 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
82   """Base Ganeti Asyncore Dispacher
83
84   """
85   # this method is overriding an asyncore.dispatcher method
86   def handle_error(self):
87     """Log an error in handling any request, and proceed.
88
89     """
90     logging.exception("Error while handling asyncore request")
91
92   # this method is overriding an asyncore.dispatcher method
93   def writable(self):
94     """Most of the time we don't want to check for writability.
95
96     """
97     return False
98
99
100 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
101   """A stream server to use with asyncore.
102
103   Each request is accepted, and then dispatched to a separate asyncore
104   dispatcher to handle.
105
106   """
107
108   _REQUEST_QUEUE_SIZE = 5
109
110   def __init__(self, family, address):
111     """Constructor for AsyncUnixStreamSocket
112
113     @type family: integer
114     @param family: socket family (one of socket.AF_*)
115     @type address: address family dependent
116     @param address: address to bind the socket to
117
118     """
119     GanetiBaseAsyncoreDispatcher.__init__(self)
120     self.family = family
121     self.create_socket(self.family, socket.SOCK_STREAM)
122     self.set_reuse_addr()
123     self.bind(address)
124     self.listen(self._REQUEST_QUEUE_SIZE)
125
126   # this method is overriding an asyncore.dispatcher method
127   def handle_accept(self):
128     """Accept a new client connection.
129
130     Creates a new instance of the handler class, which will use asyncore to
131     serve the client.
132
133     """
134     accept_result = utils.IgnoreSignals(self.accept)
135     if accept_result is not None:
136       connected_socket, client_address = accept_result
137       if self.family == socket.AF_UNIX:
138         # override the client address, as for unix sockets nothing meaningful
139         # is passed in from accept anyway
140         client_address = netutils.GetSocketCredentials(connected_socket)
141       logging.info("Accepted connection from %s",
142                    netutils.FormatAddress(client_address, family=self.family))
143       self.handle_connection(connected_socket, client_address)
144
145   def handle_connection(self, connected_socket, client_address):
146     """Handle an already accepted connection.
147
148     """
149     raise NotImplementedError
150
151
152 class AsyncTerminatedMessageStream(asynchat.async_chat):
153   """A terminator separated message stream asyncore module.
154
155   Handles a stream connection receiving messages terminated by a defined
156   separator. For each complete message handle_message is called.
157
158   """
159   def __init__(self, connected_socket, peer_address, terminator, family,
160                unhandled_limit):
161     """AsyncTerminatedMessageStream constructor.
162
163     @type connected_socket: socket.socket
164     @param connected_socket: connected stream socket to receive messages from
165     @param peer_address: family-specific peer address
166     @type terminator: string
167     @param terminator: terminator separating messages in the stream
168     @type family: integer
169     @param family: socket family
170     @type unhandled_limit: integer or None
171     @param unhandled_limit: maximum unanswered messages
172
173     """
174     # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
175     # using a positional argument rather than a keyword one.
176     asynchat.async_chat.__init__(self, connected_socket)
177     self.connected_socket = connected_socket
178     # on python 2.4 there is no "family" attribute for the socket class
179     # FIXME: when we move to python 2.5 or above remove the family parameter
180     #self.family = self.connected_socket.family
181     self.family = family
182     self.peer_address = peer_address
183     self.terminator = terminator
184     self.unhandled_limit = unhandled_limit
185     self.set_terminator(terminator)
186     self.ibuffer = []
187     self.receive_count = 0
188     self.send_count = 0
189     self.oqueue = collections.deque()
190     self.iqueue = collections.deque()
191
192   # this method is overriding an asynchat.async_chat method
193   def collect_incoming_data(self, data):
194     self.ibuffer.append(data)
195
196   def _can_handle_message(self):
197     return (self.unhandled_limit is None or
198             (self.receive_count < self.send_count + self.unhandled_limit) and
199              not self.iqueue)
200
201   # this method is overriding an asynchat.async_chat method
202   def found_terminator(self):
203     message = "".join(self.ibuffer)
204     self.ibuffer = []
205     message_id = self.receive_count
206     # We need to increase the receive_count after checking if the message can
207     # be handled, but before calling handle_message
208     can_handle = self._can_handle_message()
209     self.receive_count += 1
210     if can_handle:
211       self.handle_message(message, message_id)
212     else:
213       self.iqueue.append((message, message_id))
214
215   def handle_message(self, message, message_id):
216     """Handle a terminated message.
217
218     @type message: string
219     @param message: message to handle
220     @type message_id: integer
221     @param message_id: stream's message sequence number
222
223     """
224     pass
225     # TODO: move this method to raise NotImplementedError
226     # raise NotImplementedError
227
228   def send_message(self, message):
229     """Send a message to the remote peer. This function is thread-safe.
230
231     @type message: string
232     @param message: message to send, without the terminator
233
234     @warning: If calling this function from a thread different than the one
235     performing the main asyncore loop, remember that you have to wake that one
236     up.
237
238     """
239     # If we just append the message we received to the output queue, this
240     # function can be safely called by multiple threads at the same time, and
241     # we don't need locking, since deques are thread safe. handle_write in the
242     # asyncore thread will handle the next input message if there are any
243     # enqueued.
244     self.oqueue.append(message)
245
246   # this method is overriding an asyncore.dispatcher method
247   def readable(self):
248     # read from the socket if we can handle the next requests
249     return self._can_handle_message() and asynchat.async_chat.readable(self)
250
251   # this method is overriding an asyncore.dispatcher method
252   def writable(self):
253     # the output queue may become full just after we called writable. This only
254     # works if we know we'll have something else waking us up from the select,
255     # in such case, anyway.
256     return asynchat.async_chat.writable(self) or self.oqueue
257
258   # this method is overriding an asyncore.dispatcher method
259   def handle_write(self):
260     if self.oqueue:
261       # if we have data in the output queue, then send_message was called.
262       # this means we can process one more message from the input queue, if
263       # there are any.
264       data = self.oqueue.popleft()
265       self.push(data + self.terminator)
266       self.send_count += 1
267       if self.iqueue:
268         self.handle_message(*self.iqueue.popleft())
269     self.initiate_send()
270
271   def close_log(self):
272     logging.info("Closing connection from %s",
273                  netutils.FormatAddress(self.peer_address, family=self.family))
274     self.close()
275
276   # this method is overriding an asyncore.dispatcher method
277   def handle_expt(self):
278     self.close_log()
279
280   # this method is overriding an asyncore.dispatcher method
281   def handle_error(self):
282     """Log an error in handling any request, and proceed.
283
284     """
285     logging.exception("Error while handling asyncore request")
286     self.close_log()
287
288
289 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290   """An improved asyncore udp socket.
291
292   """
293   def __init__(self, family):
294     """Constructor for AsyncUDPSocket
295
296     """
297     GanetiBaseAsyncoreDispatcher.__init__(self)
298     self._out_queue = []
299     self._family = family
300     self.create_socket(family, socket.SOCK_DGRAM)
301
302   # this method is overriding an asyncore.dispatcher method
303   def handle_connect(self):
304     # Python thinks that the first udp message from a source qualifies as a
305     # "connect" and further ones are part of the same connection. We beg to
306     # differ and treat all messages equally.
307     pass
308
309   # this method is overriding an asyncore.dispatcher method
310   def handle_read(self):
311     recv_result = utils.IgnoreSignals(self.recvfrom,
312                                       constants.MAX_UDP_DATA_SIZE)
313     if recv_result is not None:
314       payload, address = recv_result
315       if self._family == socket.AF_INET6:
316         # we ignore 'flow info' and 'scope id' as we don't need them
317         ip, port, _, _ = address
318       else:
319         ip, port = address
320
321       self.handle_datagram(payload, ip, port)
322
323   def handle_datagram(self, payload, ip, port):
324     """Handle an already read udp datagram
325
326     """
327     raise NotImplementedError
328
329   # this method is overriding an asyncore.dispatcher method
330   def writable(self):
331     # We should check whether we can write to the socket only if we have
332     # something scheduled to be written
333     return bool(self._out_queue)
334
335   # this method is overriding an asyncore.dispatcher method
336   def handle_write(self):
337     if not self._out_queue:
338       logging.error("handle_write called with empty output queue")
339       return
340     (ip, port, payload) = self._out_queue[0]
341     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
342     self._out_queue.pop(0)
343
344   def enqueue_send(self, ip, port, payload):
345     """Enqueue a datagram to be sent when possible
346
347     """
348     if len(payload) > constants.MAX_UDP_DATA_SIZE:
349       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
350                                     constants.MAX_UDP_DATA_SIZE))
351     self._out_queue.append((ip, port, payload))
352
353   def process_next_packet(self, timeout=0):
354     """Process the next datagram, waiting for it if necessary.
355
356     @type timeout: float
357     @param timeout: how long to wait for data
358     @rtype: boolean
359     @return: True if some data has been handled, False otherwise
360
361     """
362     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
363     if result is not None and result & select.POLLIN:
364       self.handle_read()
365       return True
366     else:
367       return False
368
369
370 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
371   """A way to notify the asyncore loop that something is going on.
372
373   If an asyncore daemon is multithreaded when a thread tries to push some data
374   to a socket, the main loop handling asynchronous requests might be sleeping
375   waiting on a select(). To avoid this it can create an instance of the
376   AsyncAwaker, which other threads can use to wake it up.
377
378   """
379   def __init__(self, signal_fn=None):
380     """Constructor for AsyncAwaker
381
382     @type signal_fn: function
383     @param signal_fn: function to call when awaken
384
385     """
386     GanetiBaseAsyncoreDispatcher.__init__(self)
387     assert signal_fn == None or callable(signal_fn)
388     (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
389                                                           socket.SOCK_STREAM)
390     self.in_socket.setblocking(0)
391     self.in_socket.shutdown(socket.SHUT_WR)
392     self.out_socket.shutdown(socket.SHUT_RD)
393     self.set_socket(self.in_socket)
394     self.need_signal = True
395     self.signal_fn = signal_fn
396     self.connected = True
397
398   # this method is overriding an asyncore.dispatcher method
399   def handle_read(self):
400     utils.IgnoreSignals(self.recv, 4096)
401     if self.signal_fn:
402       self.signal_fn()
403     self.need_signal = True
404
405   # this method is overriding an asyncore.dispatcher method
406   def close(self):
407     asyncore.dispatcher.close(self)
408     self.out_socket.close()
409
410   def signal(self):
411     """Signal the asyncore main loop.
412
413     Any data we send here will be ignored, but it will cause the select() call
414     to return.
415
416     """
417     # Yes, there is a race condition here. No, we don't care, at worst we're
418     # sending more than one wakeup token, which doesn't harm at all.
419     if self.need_signal:
420       self.need_signal = False
421       self.out_socket.send("\0")
422
423
424 class Mainloop(object):
425   """Generic mainloop for daemons
426
427   @ivar scheduler: A sched.scheduler object, which can be used to register
428     timed events
429
430   """
431   def __init__(self):
432     """Constructs a new Mainloop instance.
433
434     """
435     self._signal_wait = []
436     self.scheduler = AsyncoreScheduler(time.time)
437
438     # Resolve uid/gids used
439     runtime.GetEnts()
440
441   @utils.SignalHandled([signal.SIGCHLD])
442   @utils.SignalHandled([signal.SIGTERM])
443   @utils.SignalHandled([signal.SIGINT])
444   def Run(self, signal_handlers=None):
445     """Runs the mainloop.
446
447     @type signal_handlers: dict
448     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449
450     """
451     assert isinstance(signal_handlers, dict) and \
452            len(signal_handlers) > 0, \
453            "Broken SignalHandled decorator"
454     running = True
455
456     # Start actual main loop
457     while running:
458       if not self.scheduler.empty():
459         try:
460           self.scheduler.run()
461         except SchedulerBreakout:
462           pass
463       else:
464         asyncore.loop(count=1, use_poll=True)
465
466       # Check whether a signal was raised
467       for sig in signal_handlers:
468         handler = signal_handlers[sig]
469         if handler.called:
470           self._CallSignalWaiters(sig)
471           running = sig not in (signal.SIGTERM, signal.SIGINT)
472           handler.Clear()
473
474   def _CallSignalWaiters(self, signum):
475     """Calls all signal waiters for a certain signal.
476
477     @type signum: int
478     @param signum: Signal number
479
480     """
481     for owner in self._signal_wait:
482       owner.OnSignal(signum)
483
484   def RegisterSignal(self, owner):
485     """Registers a receiver for signal notifications
486
487     The receiver must support a "OnSignal(self, signum)" function.
488
489     @type owner: instance
490     @param owner: Receiver
491
492     """
493     self._signal_wait.append(owner)
494
495
496 def _VerifyDaemonUser(daemon_name):
497   """Verifies the process uid matches the configured uid.
498
499   This method verifies that a daemon is started as the user it is
500   intended to be run
501
502   @param daemon_name: The name of daemon to be started
503   @return: A tuple with the first item indicating success or not,
504            the second item current uid and third with expected uid
505
506   """
507   getents = runtime.GetEnts()
508   running_uid = os.getuid()
509   daemon_uids = {
510     constants.MASTERD: getents.masterd_uid,
511     constants.RAPI: getents.rapi_uid,
512     constants.NODED: getents.noded_uid,
513     constants.CONFD: getents.confd_uid,
514     }
515
516   return (daemon_uids[daemon_name] == running_uid, running_uid,
517           daemon_uids[daemon_name])
518
519
520 def _BeautifyError(err):
521   """Try to format an error better.
522
523   Since we're dealing with daemon startup errors, in many cases this
524   will be due to socket error and such, so we try to format these cases better.
525
526   @param err: an exception object
527   @rtype: string
528   @return: the formatted error description
529
530   """
531   try:
532     if isinstance(err, socket.error):
533       return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
534     elif isinstance(err, EnvironmentError):
535       if err.filename is None:
536         return "%s (errno=%s)" % (err.strerror, err.errno)
537       else:
538         return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
539                                             err.errno)
540     else:
541       return str(err)
542   except Exception: # pylint: disable-msg=W0703
543     logging.exception("Error while handling existing error %s", err)
544     return "%s" % str(err)
545
546
547 def _HandleSigHup(reopen_cb, signum, frame): # pylint: disable-msg=W0613
548   """Handler for SIGHUP.
549
550   @param reopen_cb: Callback function for reopening log files
551
552   """
553   assert callable(reopen_cb)
554   logging.info("Reopening log files after receiving SIGHUP")
555   reopen_cb()
556
557
558 def GenericMain(daemon_name, optionparser,
559                 check_fn, prepare_fn, exec_fn,
560                 multithreaded=False, console_logging=False,
561                 default_ssl_cert=None, default_ssl_key=None):
562   """Shared main function for daemons.
563
564   @type daemon_name: string
565   @param daemon_name: daemon name
566   @type optionparser: optparse.OptionParser
567   @param optionparser: initialized optionparser with daemon-specific options
568                        (common -f -d options will be handled by this module)
569   @type check_fn: function which accepts (options, args)
570   @param check_fn: function that checks start conditions and exits if they're
571                    not met
572   @type prepare_fn: function which accepts (options, args)
573   @param prepare_fn: function that is run before forking, or None;
574       it's result will be passed as the third parameter to exec_fn, or
575       if None was passed in, we will just pass None to exec_fn
576   @type exec_fn: function which accepts (options, args, prepare_results)
577   @param exec_fn: function that's executed with the daemon's pid file held, and
578                   runs the daemon itself.
579   @type multithreaded: bool
580   @param multithreaded: Whether the daemon uses threads
581   @type console_logging: boolean
582   @param console_logging: if True, the daemon will fall back to the system
583                           console if logging fails
584   @type default_ssl_cert: string
585   @param default_ssl_cert: Default SSL certificate path
586   @type default_ssl_key: string
587   @param default_ssl_key: Default SSL key path
588
589   """
590   optionparser.add_option("-f", "--foreground", dest="fork",
591                           help="Don't detach from the current terminal",
592                           default=True, action="store_false")
593   optionparser.add_option("-d", "--debug", dest="debug",
594                           help="Enable some debug messages",
595                           default=False, action="store_true")
596   optionparser.add_option("--syslog", dest="syslog",
597                           help="Enable logging to syslog (except debug"
598                           " messages); one of 'no', 'yes' or 'only' [%s]" %
599                           constants.SYSLOG_USAGE,
600                           default=constants.SYSLOG_USAGE,
601                           choices=["no", "yes", "only"])
602
603   if daemon_name in constants.DAEMONS_PORTS:
604     default_bind_address = constants.IP4_ADDRESS_ANY
605     family = ssconf.SimpleStore().GetPrimaryIPFamily()
606     # family will default to AF_INET if there is no ssconf file (e.g. when
607     # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
608     # <= 2.2 can not be AF_INET6
609     if family == netutils.IP6Address.family:
610       default_bind_address = constants.IP6_ADDRESS_ANY
611
612     default_port = netutils.GetDaemonPort(daemon_name)
613
614     # For networked daemons we allow choosing the port and bind address
615     optionparser.add_option("-p", "--port", dest="port",
616                             help="Network port (default: %s)" % default_port,
617                             default=default_port, type="int")
618     optionparser.add_option("-b", "--bind", dest="bind_address",
619                             help=("Bind address (default: '%s')" %
620                                   default_bind_address),
621                             default=default_bind_address, metavar="ADDRESS")
622
623   if default_ssl_key is not None and default_ssl_cert is not None:
624     optionparser.add_option("--no-ssl", dest="ssl",
625                             help="Do not secure HTTP protocol with SSL",
626                             default=True, action="store_false")
627     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
628                             help=("SSL key path (default: %s)" %
629                                   default_ssl_key),
630                             default=default_ssl_key, type="string",
631                             metavar="SSL_KEY_PATH")
632     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
633                             help=("SSL certificate path (default: %s)" %
634                                   default_ssl_cert),
635                             default=default_ssl_cert, type="string",
636                             metavar="SSL_CERT_PATH")
637
638   # Disable the use of fork(2) if the daemon uses threads
639   if multithreaded:
640     utils.DisableFork()
641
642   options, args = optionparser.parse_args()
643
644   if getattr(options, "ssl", False):
645     ssl_paths = {
646       "certificate": options.ssl_cert,
647       "key": options.ssl_key,
648       }
649
650     for name, path in ssl_paths.iteritems():
651       if not os.path.isfile(path):
652         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
653         sys.exit(constants.EXIT_FAILURE)
654
655     # TODO: By initiating http.HttpSslParams here we would only read the files
656     # once and have a proper validation (isfile returns False on directories)
657     # at the same time.
658
659   result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
660   if not result:
661     msg = ("%s started using wrong user ID (%d), expected %d" %
662            (daemon_name, running_uid, expected_uid))
663     print >> sys.stderr, msg
664     sys.exit(constants.EXIT_FAILURE)
665
666   if check_fn is not None:
667     check_fn(options, args)
668
669   if options.fork:
670     utils.CloseFDs()
671     wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
672   else:
673     wpipe = None
674
675   log_reopen_fn = \
676     utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
677                        debug=options.debug,
678                        stderr_logging=not options.fork,
679                        multithreaded=multithreaded,
680                        syslog=options.syslog,
681                        console_logging=console_logging)
682
683   # Reopen log file(s) on SIGHUP
684   signal.signal(signal.SIGHUP, compat.partial(_HandleSigHup, log_reopen_fn))
685
686   utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
687   try:
688     try:
689       logging.info("%s daemon startup", daemon_name)
690       if callable(prepare_fn):
691         prep_results = prepare_fn(options, args)
692       else:
693         prep_results = None
694     except Exception, err:
695       utils.WriteErrorToFD(wpipe, _BeautifyError(err))
696       raise
697
698     if wpipe is not None:
699       # we're done with the preparation phase, we close the pipe to
700       # let the parent know it's safe to exit
701       os.close(wpipe)
702
703     exec_fn(options, args, prep_results)
704   finally:
705     utils.RemoveFile(utils.DaemonPidFileName(daemon_name))