Handle ENOENT case in ssconf.GetPrimaryIPFamily
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import grp
29 import os
30 import pwd
31 import signal
32 import logging
33 import sched
34 import time
35 import socket
36 import select
37 import sys
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import netutils
43 from ganeti import ssconf
44
45
46 _DEFAULT_RUN_USER = "root"
47 _DEFAULT_RUN_GROUP = "root"
48
49
50 class SchedulerBreakout(Exception):
51   """Exception used to get out of the scheduler loop
52
53   """
54
55
56 def AsyncoreDelayFunction(timeout):
57   """Asyncore-compatible scheduler delay function.
58
59   This is a delay function for sched that, rather than actually sleeping,
60   executes asyncore events happening in the meantime.
61
62   After an event has occurred, rather than returning, it raises a
63   SchedulerBreakout exception, which will force the current scheduler.run()
64   invocation to terminate, so that we can also check for signals. The main loop
65   will then call the scheduler run again, which will allow it to actually
66   process any due events.
67
68   This is needed because scheduler.run() doesn't support a count=..., as
69   asyncore loop, and the scheduler module documents throwing exceptions from
70   inside the delay function as an allowed usage model.
71
72   """
73   asyncore.loop(timeout=timeout, count=1, use_poll=True)
74   raise SchedulerBreakout()
75
76
77 class AsyncoreScheduler(sched.scheduler):
78   """Event scheduler integrated with asyncore
79
80   """
81   def __init__(self, timefunc):
82     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
83
84
85 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
86   """Base Ganeti Asyncore Dispacher
87
88   """
89   # this method is overriding an asyncore.dispatcher method
90   def handle_error(self):
91     """Log an error in handling any request, and proceed.
92
93     """
94     logging.exception("Error while handling asyncore request")
95
96   # this method is overriding an asyncore.dispatcher method
97   def writable(self):
98     """Most of the time we don't want to check for writability.
99
100     """
101     return False
102
103
104 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
105   """A stream server to use with asyncore.
106
107   Each request is accepted, and then dispatched to a separate asyncore
108   dispatcher to handle.
109
110   """
111
112   _REQUEST_QUEUE_SIZE = 5
113
114   def __init__(self, family, address):
115     """Constructor for AsyncUnixStreamSocket
116
117     @type family: integer
118     @param family: socket family (one of socket.AF_*)
119     @type address: address family dependent
120     @param address: address to bind the socket to
121
122     """
123     GanetiBaseAsyncoreDispatcher.__init__(self)
124     self.family = family
125     self.create_socket(self.family, socket.SOCK_STREAM)
126     self.set_reuse_addr()
127     self.bind(address)
128     self.listen(self._REQUEST_QUEUE_SIZE)
129
130   # this method is overriding an asyncore.dispatcher method
131   def handle_accept(self):
132     """Accept a new client connection.
133
134     Creates a new instance of the handler class, which will use asyncore to
135     serve the client.
136
137     """
138     accept_result = utils.IgnoreSignals(self.accept)
139     if accept_result is not None:
140       connected_socket, client_address = accept_result
141       if self.family == socket.AF_UNIX:
142         # override the client address, as for unix sockets nothing meaningful
143         # is passed in from accept anyway
144         client_address = netutils.GetSocketCredentials(connected_socket)
145       logging.info("Accepted connection from %s",
146                    netutils.FormatAddress(client_address, family=self.family))
147       self.handle_connection(connected_socket, client_address)
148
149   def handle_connection(self, connected_socket, client_address):
150     """Handle an already accepted connection.
151
152     """
153     raise NotImplementedError
154
155
156 class AsyncTerminatedMessageStream(asynchat.async_chat):
157   """A terminator separated message stream asyncore module.
158
159   Handles a stream connection receiving messages terminated by a defined
160   separator. For each complete message handle_message is called.
161
162   """
163   def __init__(self, connected_socket, peer_address, terminator, family,
164                unhandled_limit):
165     """AsyncTerminatedMessageStream constructor.
166
167     @type connected_socket: socket.socket
168     @param connected_socket: connected stream socket to receive messages from
169     @param peer_address: family-specific peer address
170     @type terminator: string
171     @param terminator: terminator separating messages in the stream
172     @type family: integer
173     @param family: socket family
174     @type unhandled_limit: integer or None
175     @param unhandled_limit: maximum unanswered messages
176
177     """
178     # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
179     # using a positional argument rather than a keyword one.
180     asynchat.async_chat.__init__(self, connected_socket)
181     self.connected_socket = connected_socket
182     # on python 2.4 there is no "family" attribute for the socket class
183     # FIXME: when we move to python 2.5 or above remove the family parameter
184     #self.family = self.connected_socket.family
185     self.family = family
186     self.peer_address = peer_address
187     self.terminator = terminator
188     self.unhandled_limit = unhandled_limit
189     self.set_terminator(terminator)
190     self.ibuffer = []
191     self.receive_count = 0
192     self.send_count = 0
193     self.oqueue = collections.deque()
194     self.iqueue = collections.deque()
195
196   # this method is overriding an asynchat.async_chat method
197   def collect_incoming_data(self, data):
198     self.ibuffer.append(data)
199
200   def _can_handle_message(self):
201     return (self.unhandled_limit is None or
202             (self.receive_count < self.send_count + self.unhandled_limit) and
203              not self.iqueue)
204
205   # this method is overriding an asynchat.async_chat method
206   def found_terminator(self):
207     message = "".join(self.ibuffer)
208     self.ibuffer = []
209     message_id = self.receive_count
210     # We need to increase the receive_count after checking if the message can
211     # be handled, but before calling handle_message
212     can_handle = self._can_handle_message()
213     self.receive_count += 1
214     if can_handle:
215       self.handle_message(message, message_id)
216     else:
217       self.iqueue.append((message, message_id))
218
219   def handle_message(self, message, message_id):
220     """Handle a terminated message.
221
222     @type message: string
223     @param message: message to handle
224     @type message_id: integer
225     @param message_id: stream's message sequence number
226
227     """
228     pass
229     # TODO: move this method to raise NotImplementedError
230     # raise NotImplementedError
231
232   def send_message(self, message):
233     """Send a message to the remote peer. This function is thread-safe.
234
235     @type message: string
236     @param message: message to send, without the terminator
237
238     @warning: If calling this function from a thread different than the one
239     performing the main asyncore loop, remember that you have to wake that one
240     up.
241
242     """
243     # If we just append the message we received to the output queue, this
244     # function can be safely called by multiple threads at the same time, and
245     # we don't need locking, since deques are thread safe. handle_write in the
246     # asyncore thread will handle the next input message if there are any
247     # enqueued.
248     self.oqueue.append(message)
249
250   # this method is overriding an asyncore.dispatcher method
251   def readable(self):
252     # read from the socket if we can handle the next requests
253     return self._can_handle_message() and asynchat.async_chat.readable(self)
254
255   # this method is overriding an asyncore.dispatcher method
256   def writable(self):
257     # the output queue may become full just after we called writable. This only
258     # works if we know we'll have something else waking us up from the select,
259     # in such case, anyway.
260     return asynchat.async_chat.writable(self) or self.oqueue
261
262   # this method is overriding an asyncore.dispatcher method
263   def handle_write(self):
264     if self.oqueue:
265       # if we have data in the output queue, then send_message was called.
266       # this means we can process one more message from the input queue, if
267       # there are any.
268       data = self.oqueue.popleft()
269       self.push(data + self.terminator)
270       self.send_count += 1
271       if self.iqueue:
272         self.handle_message(*self.iqueue.popleft())
273     self.initiate_send()
274
275   def close_log(self):
276     logging.info("Closing connection from %s",
277                  netutils.FormatAddress(self.peer_address, family=self.family))
278     self.close()
279
280   # this method is overriding an asyncore.dispatcher method
281   def handle_expt(self):
282     self.close_log()
283
284   # this method is overriding an asyncore.dispatcher method
285   def handle_error(self):
286     """Log an error in handling any request, and proceed.
287
288     """
289     logging.exception("Error while handling asyncore request")
290     self.close_log()
291
292
293 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
294   """An improved asyncore udp socket.
295
296   """
297   def __init__(self, family):
298     """Constructor for AsyncUDPSocket
299
300     """
301     GanetiBaseAsyncoreDispatcher.__init__(self)
302     self._out_queue = []
303     self._family = family
304     self.create_socket(family, socket.SOCK_DGRAM)
305
306   # this method is overriding an asyncore.dispatcher method
307   def handle_connect(self):
308     # Python thinks that the first udp message from a source qualifies as a
309     # "connect" and further ones are part of the same connection. We beg to
310     # differ and treat all messages equally.
311     pass
312
313   # this method is overriding an asyncore.dispatcher method
314   def handle_read(self):
315     recv_result = utils.IgnoreSignals(self.recvfrom,
316                                       constants.MAX_UDP_DATA_SIZE)
317     if recv_result is not None:
318       payload, address = recv_result
319       if self._family == socket.AF_INET6:
320         # we ignore 'flow info' and 'scope id' as we don't need them
321         ip, port, _, _ = address
322       else:
323         ip, port = address
324
325       self.handle_datagram(payload, ip, port)
326
327   def handle_datagram(self, payload, ip, port):
328     """Handle an already read udp datagram
329
330     """
331     raise NotImplementedError
332
333   # this method is overriding an asyncore.dispatcher method
334   def writable(self):
335     # We should check whether we can write to the socket only if we have
336     # something scheduled to be written
337     return bool(self._out_queue)
338
339   # this method is overriding an asyncore.dispatcher method
340   def handle_write(self):
341     if not self._out_queue:
342       logging.error("handle_write called with empty output queue")
343       return
344     (ip, port, payload) = self._out_queue[0]
345     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
346     self._out_queue.pop(0)
347
348   def enqueue_send(self, ip, port, payload):
349     """Enqueue a datagram to be sent when possible
350
351     """
352     if len(payload) > constants.MAX_UDP_DATA_SIZE:
353       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
354                                     constants.MAX_UDP_DATA_SIZE))
355     self._out_queue.append((ip, port, payload))
356
357   def process_next_packet(self, timeout=0):
358     """Process the next datagram, waiting for it if necessary.
359
360     @type timeout: float
361     @param timeout: how long to wait for data
362     @rtype: boolean
363     @return: True if some data has been handled, False otherwise
364
365     """
366     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
367     if result is not None and result & select.POLLIN:
368       self.handle_read()
369       return True
370     else:
371       return False
372
373
374 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
375   """A way to notify the asyncore loop that something is going on.
376
377   If an asyncore daemon is multithreaded when a thread tries to push some data
378   to a socket, the main loop handling asynchronous requests might be sleeping
379   waiting on a select(). To avoid this it can create an instance of the
380   AsyncAwaker, which other threads can use to wake it up.
381
382   """
383   def __init__(self, signal_fn=None):
384     """Constructor for AsyncAwaker
385
386     @type signal_fn: function
387     @param signal_fn: function to call when awaken
388
389     """
390     GanetiBaseAsyncoreDispatcher.__init__(self)
391     assert signal_fn == None or callable(signal_fn)
392     (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
393                                                           socket.SOCK_STREAM)
394     self.in_socket.setblocking(0)
395     self.in_socket.shutdown(socket.SHUT_WR)
396     self.out_socket.shutdown(socket.SHUT_RD)
397     self.set_socket(self.in_socket)
398     self.need_signal = True
399     self.signal_fn = signal_fn
400     self.connected = True
401
402   # this method is overriding an asyncore.dispatcher method
403   def handle_read(self):
404     utils.IgnoreSignals(self.recv, 4096)
405     if self.signal_fn:
406       self.signal_fn()
407     self.need_signal = True
408
409   # this method is overriding an asyncore.dispatcher method
410   def close(self):
411     asyncore.dispatcher.close(self)
412     self.out_socket.close()
413
414   def signal(self):
415     """Signal the asyncore main loop.
416
417     Any data we send here will be ignored, but it will cause the select() call
418     to return.
419
420     """
421     # Yes, there is a race condition here. No, we don't care, at worst we're
422     # sending more than one wakeup token, which doesn't harm at all.
423     if self.need_signal:
424       self.need_signal = False
425       self.out_socket.send("\0")
426
427
428 class Mainloop(object):
429   """Generic mainloop for daemons
430
431   @ivar scheduler: A sched.scheduler object, which can be used to register
432     timed events
433
434   """
435   def __init__(self):
436     """Constructs a new Mainloop instance.
437
438     """
439     self._signal_wait = []
440     self.scheduler = AsyncoreScheduler(time.time)
441
442   @utils.SignalHandled([signal.SIGCHLD])
443   @utils.SignalHandled([signal.SIGTERM])
444   @utils.SignalHandled([signal.SIGINT])
445   def Run(self, signal_handlers=None):
446     """Runs the mainloop.
447
448     @type signal_handlers: dict
449     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
450
451     """
452     assert isinstance(signal_handlers, dict) and \
453            len(signal_handlers) > 0, \
454            "Broken SignalHandled decorator"
455     running = True
456     # Start actual main loop
457     while running:
458       if not self.scheduler.empty():
459         try:
460           self.scheduler.run()
461         except SchedulerBreakout:
462           pass
463       else:
464         asyncore.loop(count=1, use_poll=True)
465
466       # Check whether a signal was raised
467       for sig in signal_handlers:
468         handler = signal_handlers[sig]
469         if handler.called:
470           self._CallSignalWaiters(sig)
471           running = sig not in (signal.SIGTERM, signal.SIGINT)
472           handler.Clear()
473
474   def _CallSignalWaiters(self, signum):
475     """Calls all signal waiters for a certain signal.
476
477     @type signum: int
478     @param signum: Signal number
479
480     """
481     for owner in self._signal_wait:
482       owner.OnSignal(signum)
483
484   def RegisterSignal(self, owner):
485     """Registers a receiver for signal notifications
486
487     The receiver must support a "OnSignal(self, signum)" function.
488
489     @type owner: instance
490     @param owner: Receiver
491
492     """
493     self._signal_wait.append(owner)
494
495
496 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
497                 multithreaded=False, console_logging=False,
498                 default_ssl_cert=None, default_ssl_key=None,
499                 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
500   """Shared main function for daemons.
501
502   @type daemon_name: string
503   @param daemon_name: daemon name
504   @type optionparser: optparse.OptionParser
505   @param optionparser: initialized optionparser with daemon-specific options
506                        (common -f -d options will be handled by this module)
507   @type dirs: list of (string, integer)
508   @param dirs: list of directories that must be created if they don't exist,
509                and the permissions to be used to create them
510   @type check_fn: function which accepts (options, args)
511   @param check_fn: function that checks start conditions and exits if they're
512                    not met
513   @type exec_fn: function which accepts (options, args)
514   @param exec_fn: function that's executed with the daemon's pid file held, and
515                   runs the daemon itself.
516   @type multithreaded: bool
517   @param multithreaded: Whether the daemon uses threads
518   @type console_logging: boolean
519   @param console_logging: if True, the daemon will fall back to the system
520                           console if logging fails
521   @type default_ssl_cert: string
522   @param default_ssl_cert: Default SSL certificate path
523   @type default_ssl_key: string
524   @param default_ssl_key: Default SSL key path
525   @param user: Default user to run as
526   @type user: string
527   @param group: Default group to run as
528   @type group: string
529
530   """
531   optionparser.add_option("-f", "--foreground", dest="fork",
532                           help="Don't detach from the current terminal",
533                           default=True, action="store_false")
534   optionparser.add_option("-d", "--debug", dest="debug",
535                           help="Enable some debug messages",
536                           default=False, action="store_true")
537   optionparser.add_option("--syslog", dest="syslog",
538                           help="Enable logging to syslog (except debug"
539                           " messages); one of 'no', 'yes' or 'only' [%s]" %
540                           constants.SYSLOG_USAGE,
541                           default=constants.SYSLOG_USAGE,
542                           choices=["no", "yes", "only"])
543
544   if daemon_name in constants.DAEMONS_PORTS:
545     default_bind_address = constants.IP4_ADDRESS_ANY
546     family = ssconf.SimpleStore().GetPrimaryIPFamily()
547     # family will default to AF_INET if there is no ssconf file (e.g. when
548     # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
549     # <= 2.2 can not be AF_INET6
550     if family == netutils.IP6Address.family:
551       default_bind_address = constants.IP6_ADDRESS_ANY
552
553     default_port = netutils.GetDaemonPort(daemon_name)
554
555     # For networked daemons we allow choosing the port and bind address
556     optionparser.add_option("-p", "--port", dest="port",
557                             help="Network port (default: %s)" % default_port,
558                             default=default_port, type="int")
559     optionparser.add_option("-b", "--bind", dest="bind_address",
560                             help=("Bind address (default: '%s')" %
561                                   default_bind_address),
562                             default=default_bind_address, metavar="ADDRESS")
563
564   if default_ssl_key is not None and default_ssl_cert is not None:
565     optionparser.add_option("--no-ssl", dest="ssl",
566                             help="Do not secure HTTP protocol with SSL",
567                             default=True, action="store_false")
568     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
569                             help=("SSL key path (default: %s)" %
570                                   default_ssl_key),
571                             default=default_ssl_key, type="string",
572                             metavar="SSL_KEY_PATH")
573     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
574                             help=("SSL certificate path (default: %s)" %
575                                   default_ssl_cert),
576                             default=default_ssl_cert, type="string",
577                             metavar="SSL_CERT_PATH")
578
579   # Disable the use of fork(2) if the daemon uses threads
580   utils.no_fork = multithreaded
581
582   options, args = optionparser.parse_args()
583
584   if getattr(options, "ssl", False):
585     ssl_paths = {
586       "certificate": options.ssl_cert,
587       "key": options.ssl_key,
588       }
589
590     for name, path in ssl_paths.iteritems():
591       if not os.path.isfile(path):
592         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
593         sys.exit(constants.EXIT_FAILURE)
594
595     # TODO: By initiating http.HttpSslParams here we would only read the files
596     # once and have a proper validation (isfile returns False on directories)
597     # at the same time.
598
599   if check_fn is not None:
600     check_fn(options, args)
601
602   utils.EnsureDirs(dirs)
603
604   if options.fork:
605     try:
606       uid = pwd.getpwnam(user).pw_uid
607       gid = grp.getgrnam(group).gr_gid
608     except KeyError:
609       raise errors.ConfigurationError("User or group not existing on system:"
610                                       " %s:%s" % (user, group))
611     utils.CloseFDs()
612     utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
613
614   utils.WritePidFile(daemon_name)
615   try:
616     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
617                        debug=options.debug,
618                        stderr_logging=not options.fork,
619                        multithreaded=multithreaded,
620                        program=daemon_name,
621                        syslog=options.syslog,
622                        console_logging=console_logging)
623     logging.info("%s daemon startup", daemon_name)
624     exec_fn(options, args)
625   finally:
626     utils.RemovePidFile(daemon_name)