Add _UnlockedLookupNodeGroup()
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43
44
45 class SchedulerBreakout(Exception):
46   """Exception used to get out of the scheduler loop
47
48   """
49
50
51 def AsyncoreDelayFunction(timeout):
52   """Asyncore-compatible scheduler delay function.
53
54   This is a delay function for sched that, rather than actually sleeping,
55   executes asyncore events happening in the meantime.
56
57   After an event has occurred, rather than returning, it raises a
58   SchedulerBreakout exception, which will force the current scheduler.run()
59   invocation to terminate, so that we can also check for signals. The main loop
60   will then call the scheduler run again, which will allow it to actually
61   process any due events.
62
63   This is needed because scheduler.run() doesn't support a count=..., as
64   asyncore loop, and the scheduler module documents throwing exceptions from
65   inside the delay function as an allowed usage model.
66
67   """
68   asyncore.loop(timeout=timeout, count=1, use_poll=True)
69   raise SchedulerBreakout()
70
71
72 class AsyncoreScheduler(sched.scheduler):
73   """Event scheduler integrated with asyncore
74
75   """
76   def __init__(self, timefunc):
77     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
78
79
80 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81   """Base Ganeti Asyncore Dispacher
82
83   """
84   # this method is overriding an asyncore.dispatcher method
85   def handle_error(self):
86     """Log an error in handling any request, and proceed.
87
88     """
89     logging.exception("Error while handling asyncore request")
90
91   # this method is overriding an asyncore.dispatcher method
92   def writable(self):
93     """Most of the time we don't want to check for writability.
94
95     """
96     return False
97
98
99 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100   """A stream server to use with asyncore.
101
102   Each request is accepted, and then dispatched to a separate asyncore
103   dispatcher to handle.
104
105   """
106
107   _REQUEST_QUEUE_SIZE = 5
108
109   def __init__(self, family, address):
110     """Constructor for AsyncUnixStreamSocket
111
112     @type family: integer
113     @param family: socket family (one of socket.AF_*)
114     @type address: address family dependent
115     @param address: address to bind the socket to
116
117     """
118     GanetiBaseAsyncoreDispatcher.__init__(self)
119     self.family = family
120     self.create_socket(self.family, socket.SOCK_STREAM)
121     self.set_reuse_addr()
122     self.bind(address)
123     self.listen(self._REQUEST_QUEUE_SIZE)
124
125   # this method is overriding an asyncore.dispatcher method
126   def handle_accept(self):
127     """Accept a new client connection.
128
129     Creates a new instance of the handler class, which will use asyncore to
130     serve the client.
131
132     """
133     accept_result = utils.IgnoreSignals(self.accept)
134     if accept_result is not None:
135       connected_socket, client_address = accept_result
136       if self.family == socket.AF_UNIX:
137         # override the client address, as for unix sockets nothing meaningful
138         # is passed in from accept anyway
139         client_address = netutils.GetSocketCredentials(connected_socket)
140       logging.info("Accepted connection from %s",
141                    netutils.FormatAddress(client_address, family=self.family))
142       self.handle_connection(connected_socket, client_address)
143
144   def handle_connection(self, connected_socket, client_address):
145     """Handle an already accepted connection.
146
147     """
148     raise NotImplementedError
149
150
151 class AsyncTerminatedMessageStream(asynchat.async_chat):
152   """A terminator separated message stream asyncore module.
153
154   Handles a stream connection receiving messages terminated by a defined
155   separator. For each complete message handle_message is called.
156
157   """
158   def __init__(self, connected_socket, peer_address, terminator, family,
159                unhandled_limit):
160     """AsyncTerminatedMessageStream constructor.
161
162     @type connected_socket: socket.socket
163     @param connected_socket: connected stream socket to receive messages from
164     @param peer_address: family-specific peer address
165     @type terminator: string
166     @param terminator: terminator separating messages in the stream
167     @type family: integer
168     @param family: socket family
169     @type unhandled_limit: integer or None
170     @param unhandled_limit: maximum unanswered messages
171
172     """
173     # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
174     # using a positional argument rather than a keyword one.
175     asynchat.async_chat.__init__(self, connected_socket)
176     self.connected_socket = connected_socket
177     # on python 2.4 there is no "family" attribute for the socket class
178     # FIXME: when we move to python 2.5 or above remove the family parameter
179     #self.family = self.connected_socket.family
180     self.family = family
181     self.peer_address = peer_address
182     self.terminator = terminator
183     self.unhandled_limit = unhandled_limit
184     self.set_terminator(terminator)
185     self.ibuffer = []
186     self.receive_count = 0
187     self.send_count = 0
188     self.oqueue = collections.deque()
189     self.iqueue = collections.deque()
190
191   # this method is overriding an asynchat.async_chat method
192   def collect_incoming_data(self, data):
193     self.ibuffer.append(data)
194
195   def _can_handle_message(self):
196     return (self.unhandled_limit is None or
197             (self.receive_count < self.send_count + self.unhandled_limit) and
198              not self.iqueue)
199
200   # this method is overriding an asynchat.async_chat method
201   def found_terminator(self):
202     message = "".join(self.ibuffer)
203     self.ibuffer = []
204     message_id = self.receive_count
205     # We need to increase the receive_count after checking if the message can
206     # be handled, but before calling handle_message
207     can_handle = self._can_handle_message()
208     self.receive_count += 1
209     if can_handle:
210       self.handle_message(message, message_id)
211     else:
212       self.iqueue.append((message, message_id))
213
214   def handle_message(self, message, message_id):
215     """Handle a terminated message.
216
217     @type message: string
218     @param message: message to handle
219     @type message_id: integer
220     @param message_id: stream's message sequence number
221
222     """
223     pass
224     # TODO: move this method to raise NotImplementedError
225     # raise NotImplementedError
226
227   def send_message(self, message):
228     """Send a message to the remote peer. This function is thread-safe.
229
230     @type message: string
231     @param message: message to send, without the terminator
232
233     @warning: If calling this function from a thread different than the one
234     performing the main asyncore loop, remember that you have to wake that one
235     up.
236
237     """
238     # If we just append the message we received to the output queue, this
239     # function can be safely called by multiple threads at the same time, and
240     # we don't need locking, since deques are thread safe. handle_write in the
241     # asyncore thread will handle the next input message if there are any
242     # enqueued.
243     self.oqueue.append(message)
244
245   # this method is overriding an asyncore.dispatcher method
246   def readable(self):
247     # read from the socket if we can handle the next requests
248     return self._can_handle_message() and asynchat.async_chat.readable(self)
249
250   # this method is overriding an asyncore.dispatcher method
251   def writable(self):
252     # the output queue may become full just after we called writable. This only
253     # works if we know we'll have something else waking us up from the select,
254     # in such case, anyway.
255     return asynchat.async_chat.writable(self) or self.oqueue
256
257   # this method is overriding an asyncore.dispatcher method
258   def handle_write(self):
259     if self.oqueue:
260       # if we have data in the output queue, then send_message was called.
261       # this means we can process one more message from the input queue, if
262       # there are any.
263       data = self.oqueue.popleft()
264       self.push(data + self.terminator)
265       self.send_count += 1
266       if self.iqueue:
267         self.handle_message(*self.iqueue.popleft())
268     self.initiate_send()
269
270   def close_log(self):
271     logging.info("Closing connection from %s",
272                  netutils.FormatAddress(self.peer_address, family=self.family))
273     self.close()
274
275   # this method is overriding an asyncore.dispatcher method
276   def handle_expt(self):
277     self.close_log()
278
279   # this method is overriding an asyncore.dispatcher method
280   def handle_error(self):
281     """Log an error in handling any request, and proceed.
282
283     """
284     logging.exception("Error while handling asyncore request")
285     self.close_log()
286
287
288 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289   """An improved asyncore udp socket.
290
291   """
292   def __init__(self, family):
293     """Constructor for AsyncUDPSocket
294
295     """
296     GanetiBaseAsyncoreDispatcher.__init__(self)
297     self._out_queue = []
298     self._family = family
299     self.create_socket(family, socket.SOCK_DGRAM)
300
301   # this method is overriding an asyncore.dispatcher method
302   def handle_connect(self):
303     # Python thinks that the first udp message from a source qualifies as a
304     # "connect" and further ones are part of the same connection. We beg to
305     # differ and treat all messages equally.
306     pass
307
308   # this method is overriding an asyncore.dispatcher method
309   def handle_read(self):
310     recv_result = utils.IgnoreSignals(self.recvfrom,
311                                       constants.MAX_UDP_DATA_SIZE)
312     if recv_result is not None:
313       payload, address = recv_result
314       if self._family == socket.AF_INET6:
315         # we ignore 'flow info' and 'scope id' as we don't need them
316         ip, port, _, _ = address
317       else:
318         ip, port = address
319
320       self.handle_datagram(payload, ip, port)
321
322   def handle_datagram(self, payload, ip, port):
323     """Handle an already read udp datagram
324
325     """
326     raise NotImplementedError
327
328   # this method is overriding an asyncore.dispatcher method
329   def writable(self):
330     # We should check whether we can write to the socket only if we have
331     # something scheduled to be written
332     return bool(self._out_queue)
333
334   # this method is overriding an asyncore.dispatcher method
335   def handle_write(self):
336     if not self._out_queue:
337       logging.error("handle_write called with empty output queue")
338       return
339     (ip, port, payload) = self._out_queue[0]
340     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341     self._out_queue.pop(0)
342
343   def enqueue_send(self, ip, port, payload):
344     """Enqueue a datagram to be sent when possible
345
346     """
347     if len(payload) > constants.MAX_UDP_DATA_SIZE:
348       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
349                                     constants.MAX_UDP_DATA_SIZE))
350     self._out_queue.append((ip, port, payload))
351
352   def process_next_packet(self, timeout=0):
353     """Process the next datagram, waiting for it if necessary.
354
355     @type timeout: float
356     @param timeout: how long to wait for data
357     @rtype: boolean
358     @return: True if some data has been handled, False otherwise
359
360     """
361     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362     if result is not None and result & select.POLLIN:
363       self.handle_read()
364       return True
365     else:
366       return False
367
368
369 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370   """A way to notify the asyncore loop that something is going on.
371
372   If an asyncore daemon is multithreaded when a thread tries to push some data
373   to a socket, the main loop handling asynchronous requests might be sleeping
374   waiting on a select(). To avoid this it can create an instance of the
375   AsyncAwaker, which other threads can use to wake it up.
376
377   """
378   def __init__(self, signal_fn=None):
379     """Constructor for AsyncAwaker
380
381     @type signal_fn: function
382     @param signal_fn: function to call when awaken
383
384     """
385     GanetiBaseAsyncoreDispatcher.__init__(self)
386     assert signal_fn == None or callable(signal_fn)
387     (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
388                                                           socket.SOCK_STREAM)
389     self.in_socket.setblocking(0)
390     self.in_socket.shutdown(socket.SHUT_WR)
391     self.out_socket.shutdown(socket.SHUT_RD)
392     self.set_socket(self.in_socket)
393     self.need_signal = True
394     self.signal_fn = signal_fn
395     self.connected = True
396
397   # this method is overriding an asyncore.dispatcher method
398   def handle_read(self):
399     utils.IgnoreSignals(self.recv, 4096)
400     if self.signal_fn:
401       self.signal_fn()
402     self.need_signal = True
403
404   # this method is overriding an asyncore.dispatcher method
405   def close(self):
406     asyncore.dispatcher.close(self)
407     self.out_socket.close()
408
409   def signal(self):
410     """Signal the asyncore main loop.
411
412     Any data we send here will be ignored, but it will cause the select() call
413     to return.
414
415     """
416     # Yes, there is a race condition here. No, we don't care, at worst we're
417     # sending more than one wakeup token, which doesn't harm at all.
418     if self.need_signal:
419       self.need_signal = False
420       self.out_socket.send("\0")
421
422
423 class Mainloop(object):
424   """Generic mainloop for daemons
425
426   @ivar scheduler: A sched.scheduler object, which can be used to register
427     timed events
428
429   """
430   def __init__(self):
431     """Constructs a new Mainloop instance.
432
433     """
434     self._signal_wait = []
435     self.scheduler = AsyncoreScheduler(time.time)
436
437   @utils.SignalHandled([signal.SIGCHLD])
438   @utils.SignalHandled([signal.SIGTERM])
439   @utils.SignalHandled([signal.SIGINT])
440   def Run(self, signal_handlers=None):
441     """Runs the mainloop.
442
443     @type signal_handlers: dict
444     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
445
446     """
447     assert isinstance(signal_handlers, dict) and \
448            len(signal_handlers) > 0, \
449            "Broken SignalHandled decorator"
450     running = True
451     # Start actual main loop
452     while running:
453       if not self.scheduler.empty():
454         try:
455           self.scheduler.run()
456         except SchedulerBreakout:
457           pass
458       else:
459         asyncore.loop(count=1, use_poll=True)
460
461       # Check whether a signal was raised
462       for sig in signal_handlers:
463         handler = signal_handlers[sig]
464         if handler.called:
465           self._CallSignalWaiters(sig)
466           running = sig not in (signal.SIGTERM, signal.SIGINT)
467           handler.Clear()
468
469   def _CallSignalWaiters(self, signum):
470     """Calls all signal waiters for a certain signal.
471
472     @type signum: int
473     @param signum: Signal number
474
475     """
476     for owner in self._signal_wait:
477       owner.OnSignal(signum)
478
479   def RegisterSignal(self, owner):
480     """Registers a receiver for signal notifications
481
482     The receiver must support a "OnSignal(self, signum)" function.
483
484     @type owner: instance
485     @param owner: Receiver
486
487     """
488     self._signal_wait.append(owner)
489
490
491 def _VerifyDaemonUser(daemon_name):
492   """Verifies the process uid matches the configured uid.
493
494   This method verifies that a daemon is started as the user it is
495   intended to be run
496
497   @param daemon_name: The name of daemon to be started
498   @return: A tuple with the first item indicating success or not,
499            the second item current uid and third with expected uid
500
501   """
502   getents = runtime.GetEnts()
503   running_uid = os.getuid()
504   daemon_uids = {
505     constants.MASTERD: getents.masterd_uid,
506     constants.RAPI: getents.rapi_uid,
507     constants.NODED: getents.noded_uid,
508     constants.CONFD: getents.confd_uid,
509     }
510
511   return (daemon_uids[daemon_name] == running_uid, running_uid,
512           daemon_uids[daemon_name])
513
514
515 def _BeautifyError(err):
516   """Try to format an error better.
517
518   Since we're dealing with daemon startup errors, in many cases this
519   will be due to socket error and such, so we try to format these cases better.
520
521   @param err: an exception object
522   @rtype: string
523   @return: the formatted error description
524
525   """
526   try:
527     if isinstance(err, socket.error):
528       return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
529     elif isinstance(err, EnvironmentError):
530       if err.filename is None:
531         return "%s (errno=%s)" % (err.strerror, err.errno)
532       else:
533         return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
534                                             err.errno)
535     else:
536       return str(err)
537   except Exception: # pylint: disable-msg=W0703
538     logging.exception("Error while handling existing error %s", err)
539     return "%s" % str(err)
540
541
542 def GenericMain(daemon_name, optionparser,
543                 check_fn, prepare_fn, exec_fn,
544                 multithreaded=False, console_logging=False,
545                 default_ssl_cert=None, default_ssl_key=None):
546   """Shared main function for daemons.
547
548   @type daemon_name: string
549   @param daemon_name: daemon name
550   @type optionparser: optparse.OptionParser
551   @param optionparser: initialized optionparser with daemon-specific options
552                        (common -f -d options will be handled by this module)
553   @type check_fn: function which accepts (options, args)
554   @param check_fn: function that checks start conditions and exits if they're
555                    not met
556   @type prepare_fn: function which accepts (options, args)
557   @param prepare_fn: function that is run before forking, or None;
558       it's result will be passed as the third parameter to exec_fn, or
559       if None was passed in, we will just pass None to exec_fn
560   @type exec_fn: function which accepts (options, args, prepare_results)
561   @param exec_fn: function that's executed with the daemon's pid file held, and
562                   runs the daemon itself.
563   @type multithreaded: bool
564   @param multithreaded: Whether the daemon uses threads
565   @type console_logging: boolean
566   @param console_logging: if True, the daemon will fall back to the system
567                           console if logging fails
568   @type default_ssl_cert: string
569   @param default_ssl_cert: Default SSL certificate path
570   @type default_ssl_key: string
571   @param default_ssl_key: Default SSL key path
572
573   """
574   optionparser.add_option("-f", "--foreground", dest="fork",
575                           help="Don't detach from the current terminal",
576                           default=True, action="store_false")
577   optionparser.add_option("-d", "--debug", dest="debug",
578                           help="Enable some debug messages",
579                           default=False, action="store_true")
580   optionparser.add_option("--syslog", dest="syslog",
581                           help="Enable logging to syslog (except debug"
582                           " messages); one of 'no', 'yes' or 'only' [%s]" %
583                           constants.SYSLOG_USAGE,
584                           default=constants.SYSLOG_USAGE,
585                           choices=["no", "yes", "only"])
586
587   if daemon_name in constants.DAEMONS_PORTS:
588     default_bind_address = constants.IP4_ADDRESS_ANY
589     family = ssconf.SimpleStore().GetPrimaryIPFamily()
590     # family will default to AF_INET if there is no ssconf file (e.g. when
591     # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
592     # <= 2.2 can not be AF_INET6
593     if family == netutils.IP6Address.family:
594       default_bind_address = constants.IP6_ADDRESS_ANY
595
596     default_port = netutils.GetDaemonPort(daemon_name)
597
598     # For networked daemons we allow choosing the port and bind address
599     optionparser.add_option("-p", "--port", dest="port",
600                             help="Network port (default: %s)" % default_port,
601                             default=default_port, type="int")
602     optionparser.add_option("-b", "--bind", dest="bind_address",
603                             help=("Bind address (default: '%s')" %
604                                   default_bind_address),
605                             default=default_bind_address, metavar="ADDRESS")
606
607   if default_ssl_key is not None and default_ssl_cert is not None:
608     optionparser.add_option("--no-ssl", dest="ssl",
609                             help="Do not secure HTTP protocol with SSL",
610                             default=True, action="store_false")
611     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
612                             help=("SSL key path (default: %s)" %
613                                   default_ssl_key),
614                             default=default_ssl_key, type="string",
615                             metavar="SSL_KEY_PATH")
616     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
617                             help=("SSL certificate path (default: %s)" %
618                                   default_ssl_cert),
619                             default=default_ssl_cert, type="string",
620                             metavar="SSL_CERT_PATH")
621
622   # Disable the use of fork(2) if the daemon uses threads
623   if multithreaded:
624     utils.DisableFork()
625
626   options, args = optionparser.parse_args()
627
628   if getattr(options, "ssl", False):
629     ssl_paths = {
630       "certificate": options.ssl_cert,
631       "key": options.ssl_key,
632       }
633
634     for name, path in ssl_paths.iteritems():
635       if not os.path.isfile(path):
636         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
637         sys.exit(constants.EXIT_FAILURE)
638
639     # TODO: By initiating http.HttpSslParams here we would only read the files
640     # once and have a proper validation (isfile returns False on directories)
641     # at the same time.
642
643   result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
644   if not result:
645     msg = ("%s started using wrong user ID (%d), expected %d" %
646            (daemon_name, running_uid, expected_uid))
647     print >> sys.stderr, msg
648     sys.exit(constants.EXIT_FAILURE)
649
650   if check_fn is not None:
651     check_fn(options, args)
652
653   if options.fork:
654     utils.CloseFDs()
655     wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
656   else:
657     wpipe = None
658
659   utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
660   try:
661     try:
662       utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
663                          debug=options.debug,
664                          stderr_logging=not options.fork,
665                          multithreaded=multithreaded,
666                          program=daemon_name,
667                          syslog=options.syslog,
668                          console_logging=console_logging)
669       if callable(prepare_fn):
670         prep_results = prepare_fn(options, args)
671       else:
672         prep_results = None
673       logging.info("%s daemon startup", daemon_name)
674     except Exception, err:
675       utils.WriteErrorToFD(wpipe, _BeautifyError(err))
676       raise
677
678     if wpipe is not None:
679       # we're done with the preparation phase, we close the pipe to
680       # let the parent know it's safe to exit
681       os.close(wpipe)
682
683     exec_fn(options, args, prep_results)
684   finally:
685     utils.RemoveFile(utils.DaemonPidFileName(daemon_name))