Hypervisor: allow ballooning of instance memory
[ganeti-local] / lib / daemon.py
index 7f5e9ef..1e41b80 100644 (file)
@@ -75,7 +75,43 @@ class AsyncoreScheduler(sched.scheduler):
 
   """
   def __init__(self, timefunc):
-    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
+    """Initializes this class.
+
+    """
+    sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
+    self._max_delay = None
+
+  def run(self, max_delay=None): # pylint: disable=W0221
+    """Run any pending events.
+
+    @type max_delay: None or number
+    @param max_delay: Maximum delay (useful if caller has timeouts running)
+
+    """
+    assert self._max_delay is None
+
+    # The delay function used by the scheduler can't be different on each run,
+    # hence an instance variable must be used.
+    if max_delay is None:
+      self._max_delay = None
+    else:
+      self._max_delay = utils.RunningTimeout(max_delay, False)
+
+    try:
+      return sched.scheduler.run(self)
+    finally:
+      self._max_delay = None
+
+  def _LimitedDelay(self, duration):
+    """Custom delay function for C{sched.scheduler}.
+
+    """
+    if self._max_delay is None:
+      timeout = duration
+    else:
+      timeout = min(duration, self._max_delay.Remaining())
+
+    return AsyncoreDelayFunction(timeout)
 
 
 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
@@ -421,6 +457,47 @@ class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
       self.out_socket.send("\0")
 
 
+class _ShutdownCheck:
+  """Logic for L{Mainloop} shutdown.
+
+  """
+  def __init__(self, fn):
+    """Initializes this class.
+
+    @type fn: callable
+    @param fn: Function returning C{None} if mainloop can be stopped or a
+      duration in seconds after which the function should be called again
+    @see: L{Mainloop.Run}
+
+    """
+    assert callable(fn)
+
+    self._fn = fn
+    self._defer = None
+
+  def CanShutdown(self):
+    """Checks whether mainloop can be stopped.
+
+    @rtype: bool
+
+    """
+    if self._defer and self._defer.Remaining() > 0:
+      # A deferred check has already been scheduled
+      return False
+
+    # Ask mainloop driver whether we can stop or should check again
+    timeout = self._fn()
+
+    if timeout is None:
+      # Yes, can stop mainloop
+      return True
+
+    # Schedule another check in the future
+    self._defer = utils.RunningTimeout(timeout, True)
+
+    return False
+
+
 class Mainloop(object):
   """Generic mainloop for daemons
 
@@ -428,6 +505,8 @@ class Mainloop(object):
     timed events
 
   """
+  _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
+
   def __init__(self):
     """Constructs a new Mainloop instance.
 
@@ -441,9 +520,13 @@ class Mainloop(object):
   @utils.SignalHandled([signal.SIGCHLD])
   @utils.SignalHandled([signal.SIGTERM])
   @utils.SignalHandled([signal.SIGINT])
-  def Run(self, signal_handlers=None):
+  def Run(self, shutdown_wait_fn=None, signal_handlers=None):
     """Runs the mainloop.
 
+    @type shutdown_wait_fn: callable
+    @param shutdown_wait_fn: Function to check whether loop can be terminated;
+      B{important}: function must be idempotent and must return either None
+      for shutting down or a timeout for another call
     @type signal_handlers: dict
     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
 
@@ -451,24 +534,50 @@ class Mainloop(object):
     assert isinstance(signal_handlers, dict) and \
            len(signal_handlers) > 0, \
            "Broken SignalHandled decorator"
-    running = True
+
+    # Counter for received signals
+    shutdown_signals = 0
+
+    # Logic to wait for shutdown
+    shutdown_waiter = None
 
     # Start actual main loop
-    while running:
-      if not self.scheduler.empty():
+    while True:
+      if shutdown_signals == 1 and shutdown_wait_fn is not None:
+        if shutdown_waiter is None:
+          shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
+
+        # Let mainloop driver decide if we can already abort
+        if shutdown_waiter.CanShutdown():
+          break
+
+        # Re-evaluate in a second
+        timeout = 1.0
+
+      elif shutdown_signals >= 1:
+        # Abort loop if more than one signal has been sent or no callback has
+        # been given
+        break
+
+      else:
+        # Wait forever on I/O events
+        timeout = None
+
+      if self.scheduler.empty():
+        asyncore.loop(count=1, timeout=timeout, use_poll=True)
+      else:
         try:
-          self.scheduler.run()
+          self.scheduler.run(max_delay=timeout)
         except SchedulerBreakout:
           pass
-      else:
-        asyncore.loop(count=1, use_poll=True)
 
       # Check whether a signal was raised
-      for sig in signal_handlers:
-        handler = signal_handlers[sig]
+      for (sig, handler) in signal_handlers.items():
         if handler.called:
           self._CallSignalWaiters(sig)
-          running = sig not in (signal.SIGTERM, signal.SIGINT)
+          if sig in (signal.SIGTERM, signal.SIGINT):
+            logging.info("Received signal %s asking for shutdown", sig)
+            shutdown_signals += 1
           handler.Clear()
 
   def _CallSignalWaiters(self, signum):
@@ -539,12 +648,12 @@ def _BeautifyError(err):
                                             err.errno)
     else:
       return str(err)
-  except Exception: # pylint: disable-msg=W0703
+  except Exception: # pylint: disable=W0703
     logging.exception("Error while handling existing error %s", err)
     return "%s" % str(err)
 
 
-def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable-msg=W0613
+def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
   """Handler for SIGHUP.
 
   @param reopen_fn: List of callback functions for reopening log files
@@ -687,7 +796,12 @@ def GenericMain(daemon_name, optionparser,
   signal.signal(signal.SIGHUP,
                 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
 
-  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+  try:
+    utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+  except errors.PidFileLockError, err:
+    print >> sys.stderr, "Error while locking PID file:\n%s" % err
+    sys.exit(constants.EXIT_FAILURE)
+
   try:
     try:
       logging.info("%s daemon startup", daemon_name)