cmdlib: Add utility to transfer instance data within the cluster
[ganeti-local] / lib / daemon.py
index 809e538..02d9729 100644 (file)
@@ -30,6 +30,7 @@ import logging
 import sched
 import time
 import socket
+import select
 import sys
 
 from ganeti import utils
@@ -91,21 +92,24 @@ class AsyncUDPSocket(asyncore.dispatcher):
     # differ and treat all messages equally.
     pass
 
+  def do_read(self):
+    try:
+      payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
+    except socket.error, err:
+      if err.errno == errno.EINTR:
+        # we got a signal while trying to read. no need to do anything,
+        # handle_read will be called again if there is data on the socket.
+        return
+      else:
+        raise
+    ip, port = address
+    self.handle_datagram(payload, ip, port)
+
   # this method is overriding an asyncore.dispatcher method
   def handle_read(self):
     try:
-      try:
-        payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
-      except socket.error, err:
-        if err.errno == errno.EINTR:
-          # we got a signal while trying to read. no need to do anything,
-          # handle_read will be called again if there is data on the socket.
-          return
-        else:
-          raise
-      ip, port = address
-      self.handle_datagram(payload, ip, port)
-    except:
+      self.do_read()
+    except: # pylint: disable-msg=W0702
       # we need to catch any exception here, log it, but proceed, because even
       # if we failed handling a single request, we still want to continue.
       logging.error("Unexpected exception", exc_info=True)
@@ -139,7 +143,7 @@ class AsyncUDPSocket(asyncore.dispatcher):
         else:
           raise
       self._out_queue.pop(0)
-    except:
+    except: # pylint: disable-msg=W0702
       # we need to catch any exception here, log it, but proceed, because even
       # if we failed sending a single datagram we still want to continue.
       logging.error("Unexpected exception", exc_info=True)
@@ -153,6 +157,22 @@ class AsyncUDPSocket(asyncore.dispatcher):
                                     constants.MAX_UDP_DATA_SIZE))
     self._out_queue.append((ip, port, payload))
 
+  def process_next_packet(self, timeout=0):
+    """Process the next datagram, waiting for it if necessary.
+
+    @type timeout: float
+    @param timeout: how long to wait for data
+    @rtype: boolean
+    @return: True if some data has been handled, False otherwise
+
+    """
+    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
+    if result is not None and result & select.POLLIN:
+      self.do_read()
+      return True
+    else:
+      return False
+
 
 class Mainloop(object):
   """Generic mainloop for daemons
@@ -221,7 +241,9 @@ class Mainloop(object):
     self._signal_wait.append(owner)
 
 
-def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
+def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
+                multithreaded=False,
+                default_ssl_cert=None, default_ssl_key=None):
   """Shared main function for daemons.
 
   @type daemon_name: string
@@ -237,6 +259,12 @@ def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
   @type exec_fn: function which accepts (options, args)
   @param exec_fn: function that's executed with the daemon's pid file held, and
                   runs the daemon itself.
+  @type multithreaded: bool
+  @param multithreaded: Whether the daemon uses threads
+  @type default_ssl_cert: string
+  @param default_ssl_cert: Default SSL certificate path
+  @type default_ssl_key: string
+  @param default_ssl_key: Default SSL key path
 
   """
   optionparser.add_option("-f", "--foreground", dest="fork",
@@ -245,43 +273,61 @@ def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
   optionparser.add_option("-d", "--debug", dest="debug",
                           help="Enable some debug messages",
                           default=False, action="store_true")
+  optionparser.add_option("--syslog", dest="syslog",
+                          help="Enable logging to syslog (except debug"
+                          " messages); one of 'no', 'yes' or 'only' [%s]" %
+                          constants.SYSLOG_USAGE,
+                          default=constants.SYSLOG_USAGE,
+                          choices=["no", "yes", "only"])
+
   if daemon_name in constants.DAEMONS_PORTS:
-    # for networked daemons we also allow choosing the bind port and address.
-    # by default we use the port provided by utils.GetDaemonPort, and bind to
-    # 0.0.0.0 (which is represented by and empty bind address.
-    port = utils.GetDaemonPort(daemon_name)
+    default_bind_address = "0.0.0.0"
+    default_port = utils.GetDaemonPort(daemon_name)
+
+    # For networked daemons we allow choosing the port and bind address
     optionparser.add_option("-p", "--port", dest="port",
-                            help="Network port (%s default)." % port,
-                            default=port, type="int")
+                            help="Network port (default: %s)" % default_port,
+                            default=default_port, type="int")
     optionparser.add_option("-b", "--bind", dest="bind_address",
-                            help="Bind address",
-                            default="", metavar="ADDRESS")
+                            help=("Bind address (default: %s)" %
+                                  default_bind_address),
+                            default=default_bind_address, metavar="ADDRESS")
 
-  if daemon_name in constants.DAEMONS_SSL:
-    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
+  if default_ssl_key is not None and default_ssl_cert is not None:
     optionparser.add_option("--no-ssl", dest="ssl",
                             help="Do not secure HTTP protocol with SSL",
                             default=True, action="store_false")
     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
-                            help="SSL key",
-                            default=default_key, type="string")
+                            help=("SSL key path (default: %s)" %
+                                  default_ssl_key),
+                            default=default_ssl_key, type="string",
+                            metavar="SSL_KEY_PATH")
     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
-                            help="SSL certificate",
-                            default=default_cert, type="string")
+                            help=("SSL certificate path (default: %s)" %
+                                  default_ssl_cert),
+                            default=default_ssl_cert, type="string",
+                            metavar="SSL_CERT_PATH")
 
-  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
+  # Disable the use of fork(2) if the daemon uses threads
+  utils.no_fork = multithreaded
 
   options, args = optionparser.parse_args()
 
-  if hasattr(options, 'ssl') and options.ssl:
-    if not (options.ssl_cert and options.ssl_key):
-      print >> sys.stderr, "Need key and certificate to use ssl"
-      sys.exit(constants.EXIT_FAILURE)
-    for fname in (options.ssl_cert, options.ssl_key):
-      if not os.path.isfile(fname):
-        print >> sys.stderr, "Need ssl file %s to run" % fname
+  if getattr(options, "ssl", False):
+    ssl_paths = {
+      "certificate": options.ssl_cert,
+      "key": options.ssl_key,
+      }
+
+    for name, path in ssl_paths.iteritems():
+      if not os.path.isfile(path):
+        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
         sys.exit(constants.EXIT_FAILURE)
 
+    # TODO: By initiating http.HttpSslParams here we would only read the files
+    # once and have a proper validation (isfile returns False on directories)
+    # at the same time.
+
   if check_fn is not None:
     check_fn(options, args)
 
@@ -296,8 +342,10 @@ def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
                        debug=options.debug,
                        stderr_logging=not options.fork,
-                       multithreaded=multithread)
-    logging.info("%s daemon startup" % daemon_name)
+                       multithreaded=multithreaded,
+                       program=daemon_name,
+                       syslog=options.syslog)
+    logging.info("%s daemon startup", daemon_name)
     exec_fn(options, args)
   finally:
     utils.RemovePidFile(daemon_name)