daemon.AsyncStreamServer
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import logging
29 import sched
30 import time
31 import socket
32 import select
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
38
39
40 class SchedulerBreakout(Exception):
41   """Exception used to get out of the scheduler loop
42
43   """
44
45
46 def AsyncoreDelayFunction(timeout):
47   """Asyncore-compatible scheduler delay function.
48
49   This is a delay function for sched that, rather than actually sleeping,
50   executes asyncore events happening in the meantime.
51
52   After an event has occurred, rather than returning, it raises a
53   SchedulerBreakout exception, which will force the current scheduler.run()
54   invocation to terminate, so that we can also check for signals. The main loop
55   will then call the scheduler run again, which will allow it to actually
56   process any due events.
57
58   This is needed because scheduler.run() doesn't support a count=..., as
59   asyncore loop, and the scheduler module documents throwing exceptions from
60   inside the delay function as an allowed usage model.
61
62   """
63   asyncore.loop(timeout=timeout, count=1, use_poll=True)
64   raise SchedulerBreakout()
65
66
67 class AsyncoreScheduler(sched.scheduler):
68   """Event scheduler integrated with asyncore
69
70   """
71   def __init__(self, timefunc):
72     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74
75 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76   """Base Ganeti Asyncore Dispacher
77
78   """
79   # this method is overriding an asyncore.dispatcher method
80   def handle_error(self):
81     """Log an error in handling any request, and proceed.
82
83     """
84     logging.exception("Error while handling asyncore request")
85
86   # this method is overriding an asyncore.dispatcher method
87   def writable(self):
88     """Most of the time we don't want to check for writability.
89
90     """
91     return False
92
93
94 def FormatAddress(family, address):
95   """Format a client's address
96
97   @type family: integer
98   @param family: socket family (one of socket.AF_*)
99   @type address: family specific (usually tuple)
100   @param address: address, as reported by this class
101
102   """
103   if family == socket.AF_INET and len(address) == 2:
104     return "%s:%d" % address
105   elif family == socket.AF_UNIX and len(address) == 3:
106     return "pid=%s, uid=%s, gid=%s" % address
107   else:
108     return str(address)
109
110
111 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
112   """A stream server to use with asyncore.
113
114   Each request is accepted, and then dispatched to a separate asyncore
115   dispatcher to handle.
116
117   """
118
119   _REQUEST_QUEUE_SIZE = 5
120
121   def __init__(self, family, address):
122     """Constructor for AsyncUnixStreamSocket
123
124     @type family: integer
125     @param family: socket family (one of socket.AF_*)
126     @type address: address family dependent
127     @param address: address to bind the socket to
128
129     """
130     GanetiBaseAsyncoreDispatcher.__init__(self)
131     self.family = family
132     self.create_socket(self.family, socket.SOCK_STREAM)
133     self.set_reuse_addr()
134     self.bind(address)
135     self.listen(self._REQUEST_QUEUE_SIZE)
136
137   # this method is overriding an asyncore.dispatcher method
138   def handle_accept(self):
139     """Accept a new client connection.
140
141     Creates a new instance of the handler class, which will use asyncore to
142     serve the client.
143
144     """
145     accept_result = utils.IgnoreSignals(self.accept)
146     if accept_result is not None:
147       connected_socket, client_address = accept_result
148       if self.family == socket.AF_UNIX:
149         # override the client address, as for unix sockets nothing meaningful
150         # is passed in from accept anyway
151         client_address = utils.GetSocketCredentials(connected_socket)
152       logging.info("Accepted connection from %s",
153                    FormatAddress(self.family, client_address))
154       self.handle_connection(connected_socket, client_address)
155
156   def handle_connection(self, connected_socket, client_address):
157     """Handle an already accepted connection.
158
159     """
160     raise NotImplementedError
161
162
163 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
164   """An improved asyncore udp socket.
165
166   """
167   def __init__(self):
168     """Constructor for AsyncUDPSocket
169
170     """
171     GanetiBaseAsyncoreDispatcher.__init__(self)
172     self._out_queue = []
173     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
174
175   # this method is overriding an asyncore.dispatcher method
176   def handle_connect(self):
177     # Python thinks that the first udp message from a source qualifies as a
178     # "connect" and further ones are part of the same connection. We beg to
179     # differ and treat all messages equally.
180     pass
181
182   # this method is overriding an asyncore.dispatcher method
183   def handle_read(self):
184     recv_result = utils.IgnoreSignals(self.recvfrom,
185                                       constants.MAX_UDP_DATA_SIZE)
186     if recv_result is not None:
187       payload, address = recv_result
188       ip, port = address
189       self.handle_datagram(payload, ip, port)
190
191   def handle_datagram(self, payload, ip, port):
192     """Handle an already read udp datagram
193
194     """
195     raise NotImplementedError
196
197   # this method is overriding an asyncore.dispatcher method
198   def writable(self):
199     # We should check whether we can write to the socket only if we have
200     # something scheduled to be written
201     return bool(self._out_queue)
202
203   # this method is overriding an asyncore.dispatcher method
204   def handle_write(self):
205     if not self._out_queue:
206       logging.error("handle_write called with empty output queue")
207       return
208     (ip, port, payload) = self._out_queue[0]
209     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
210     self._out_queue.pop(0)
211
212   def enqueue_send(self, ip, port, payload):
213     """Enqueue a datagram to be sent when possible
214
215     """
216     if len(payload) > constants.MAX_UDP_DATA_SIZE:
217       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
218                                     constants.MAX_UDP_DATA_SIZE))
219     self._out_queue.append((ip, port, payload))
220
221   def process_next_packet(self, timeout=0):
222     """Process the next datagram, waiting for it if necessary.
223
224     @type timeout: float
225     @param timeout: how long to wait for data
226     @rtype: boolean
227     @return: True if some data has been handled, False otherwise
228
229     """
230     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
231     if result is not None and result & select.POLLIN:
232       self.handle_read()
233       return True
234     else:
235       return False
236
237
238 class Mainloop(object):
239   """Generic mainloop for daemons
240
241   @ivar scheduler: A sched.scheduler object, which can be used to register
242     timed events
243
244   """
245   def __init__(self):
246     """Constructs a new Mainloop instance.
247
248     """
249     self._signal_wait = []
250     self.scheduler = AsyncoreScheduler(time.time)
251
252   @utils.SignalHandled([signal.SIGCHLD])
253   @utils.SignalHandled([signal.SIGTERM])
254   @utils.SignalHandled([signal.SIGINT])
255   def Run(self, signal_handlers=None):
256     """Runs the mainloop.
257
258     @type signal_handlers: dict
259     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
260
261     """
262     assert isinstance(signal_handlers, dict) and \
263            len(signal_handlers) > 0, \
264            "Broken SignalHandled decorator"
265     running = True
266     # Start actual main loop
267     while running:
268       if not self.scheduler.empty():
269         try:
270           self.scheduler.run()
271         except SchedulerBreakout:
272           pass
273       else:
274         asyncore.loop(count=1, use_poll=True)
275
276       # Check whether a signal was raised
277       for sig in signal_handlers:
278         handler = signal_handlers[sig]
279         if handler.called:
280           self._CallSignalWaiters(sig)
281           running = sig not in (signal.SIGTERM, signal.SIGINT)
282           handler.Clear()
283
284   def _CallSignalWaiters(self, signum):
285     """Calls all signal waiters for a certain signal.
286
287     @type signum: int
288     @param signum: Signal number
289
290     """
291     for owner in self._signal_wait:
292       owner.OnSignal(signum)
293
294   def RegisterSignal(self, owner):
295     """Registers a receiver for signal notifications
296
297     The receiver must support a "OnSignal(self, signum)" function.
298
299     @type owner: instance
300     @param owner: Receiver
301
302     """
303     self._signal_wait.append(owner)
304
305
306 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
307                 multithreaded=False, console_logging=False,
308                 default_ssl_cert=None, default_ssl_key=None):
309   """Shared main function for daemons.
310
311   @type daemon_name: string
312   @param daemon_name: daemon name
313   @type optionparser: optparse.OptionParser
314   @param optionparser: initialized optionparser with daemon-specific options
315                        (common -f -d options will be handled by this module)
316   @type dirs: list of (string, integer)
317   @param dirs: list of directories that must be created if they don't exist,
318                and the permissions to be used to create them
319   @type check_fn: function which accepts (options, args)
320   @param check_fn: function that checks start conditions and exits if they're
321                    not met
322   @type exec_fn: function which accepts (options, args)
323   @param exec_fn: function that's executed with the daemon's pid file held, and
324                   runs the daemon itself.
325   @type multithreaded: bool
326   @param multithreaded: Whether the daemon uses threads
327   @type console_logging: boolean
328   @param console_logging: if True, the daemon will fall back to the system
329                           console if logging fails
330   @type default_ssl_cert: string
331   @param default_ssl_cert: Default SSL certificate path
332   @type default_ssl_key: string
333   @param default_ssl_key: Default SSL key path
334
335   """
336   optionparser.add_option("-f", "--foreground", dest="fork",
337                           help="Don't detach from the current terminal",
338                           default=True, action="store_false")
339   optionparser.add_option("-d", "--debug", dest="debug",
340                           help="Enable some debug messages",
341                           default=False, action="store_true")
342   optionparser.add_option("--syslog", dest="syslog",
343                           help="Enable logging to syslog (except debug"
344                           " messages); one of 'no', 'yes' or 'only' [%s]" %
345                           constants.SYSLOG_USAGE,
346                           default=constants.SYSLOG_USAGE,
347                           choices=["no", "yes", "only"])
348
349   if daemon_name in constants.DAEMONS_PORTS:
350     default_bind_address = "0.0.0.0"
351     default_port = utils.GetDaemonPort(daemon_name)
352
353     # For networked daemons we allow choosing the port and bind address
354     optionparser.add_option("-p", "--port", dest="port",
355                             help="Network port (default: %s)" % default_port,
356                             default=default_port, type="int")
357     optionparser.add_option("-b", "--bind", dest="bind_address",
358                             help=("Bind address (default: %s)" %
359                                   default_bind_address),
360                             default=default_bind_address, metavar="ADDRESS")
361
362   if default_ssl_key is not None and default_ssl_cert is not None:
363     optionparser.add_option("--no-ssl", dest="ssl",
364                             help="Do not secure HTTP protocol with SSL",
365                             default=True, action="store_false")
366     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
367                             help=("SSL key path (default: %s)" %
368                                   default_ssl_key),
369                             default=default_ssl_key, type="string",
370                             metavar="SSL_KEY_PATH")
371     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
372                             help=("SSL certificate path (default: %s)" %
373                                   default_ssl_cert),
374                             default=default_ssl_cert, type="string",
375                             metavar="SSL_CERT_PATH")
376
377   # Disable the use of fork(2) if the daemon uses threads
378   utils.no_fork = multithreaded
379
380   options, args = optionparser.parse_args()
381
382   if getattr(options, "ssl", False):
383     ssl_paths = {
384       "certificate": options.ssl_cert,
385       "key": options.ssl_key,
386       }
387
388     for name, path in ssl_paths.iteritems():
389       if not os.path.isfile(path):
390         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
391         sys.exit(constants.EXIT_FAILURE)
392
393     # TODO: By initiating http.HttpSslParams here we would only read the files
394     # once and have a proper validation (isfile returns False on directories)
395     # at the same time.
396
397   if check_fn is not None:
398     check_fn(options, args)
399
400   utils.EnsureDirs(dirs)
401
402   if options.fork:
403     utils.CloseFDs()
404     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
405
406   utils.WritePidFile(daemon_name)
407   try:
408     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
409                        debug=options.debug,
410                        stderr_logging=not options.fork,
411                        multithreaded=multithreaded,
412                        program=daemon_name,
413                        syslog=options.syslog,
414                        console_logging=console_logging)
415     logging.info("%s daemon startup", daemon_name)
416     exec_fn(options, args)
417   finally:
418     utils.RemovePidFile(daemon_name)