Further pylint disables, mostly for Unused args
[ganeti-local] / daemons / ganeti-masterd
index 5d2ba9f..4d783a8 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/python -u
+#!/usr/bin/python
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -26,6 +26,8 @@ inheritance from parent classes requires it.
 
 """
 
+# pylint: disable-msg=C0103
+# C0103: Invalid name ganeti-masterd
 
 import os
 import sys
@@ -35,7 +37,6 @@ import collections
 import signal
 import logging
 
-from cStringIO import StringIO
 from optparse import OptionParser
 
 from ganeti import config
@@ -62,6 +63,7 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 class ClientRequestWorker(workerpool.BaseWorker):
+   # pylint: disable-msg=W0221
   def RunTask(self, server, request, client_address):
     """Process the request.
 
@@ -71,7 +73,7 @@ class ClientRequestWorker(workerpool.BaseWorker):
     try:
       server.finish_request(request, client_address)
       server.close_request(request)
-    except:
+    except: # pylint: disable-msg=W0702
       server.handle_error(request, client_address)
       server.close_request(request)
 
@@ -109,7 +111,7 @@ class IOServer(SocketServer.UnixStreamServer):
     self.request_workers.AddTask(self, request, client_address)
 
   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
-  def serve_forever(self, signal_handlers=None):
+  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
     """Handle one request at a time until told to quit."""
     assert isinstance(signal_handlers, dict) and \
            len(signal_handlers) > 0, \
@@ -142,6 +144,8 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
   READ_SIZE = 4096
 
   def setup(self):
+    # pylint: disable-msg=W0201
+    # setup() is the api for initialising for this class
     self._buffer = ""
     self._msgs = collections.deque()
     self._ops = ClientOps(self.server)
@@ -171,7 +175,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         success = True
       except errors.GenericError, err:
         success = False
-        result = (err.__class__.__name__, err.args)
+        result = errors.EncodeException(err)
       except:
         logging.error("Unexpected exception", exc_info=True)
         err = sys.exc_info()
@@ -205,11 +209,13 @@ class ClientOps:
   def __init__(self, server):
     self.server = server
 
-  def handle_request(self, method, args):
+  def handle_request(self, method, args): # pylint: disable-msg=R0911
     queue = self.server.context.jobqueue
 
     # TODO: Parameter validation
 
+    # TODO: Rewrite to not exit in each 'if/elif' branch
+
     if method == luxi.REQ_SUBMIT_JOB:
       logging.info("Received new job")
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
@@ -247,7 +253,7 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
       if isinstance(job_ids, (tuple, list)) and job_ids:
-        msg = ", ".join(job_ids)
+        msg = utils.CommaJoin(job_ids)
       else:
         msg = str(job_ids)
       logging.info("Received job query request for %s", msg)
@@ -257,7 +263,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
@@ -266,7 +273,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
@@ -274,7 +282,8 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
@@ -296,20 +305,33 @@ class ClientOps:
                    drain_flag)
       return queue.SetDrainFlag(drain_flag)
 
+    elif method == luxi.REQ_SET_WATCHER_PAUSE:
+      (until, ) = args
+
+      if until is None:
+        logging.info("Received request to no longer pause the watcher")
+      else:
+        if not isinstance(until, (int, float)):
+          raise TypeError("Duration must be an integer or float")
+
+        if until < time.time():
+          raise errors.GenericError("Unable to set pause end time in the past")
+
+        logging.info("Received request to pause the watcher until %s", until)
+
+      return _SetWatcherPause(until)
+
     else:
       logging.info("Received invalid request '%s'", method)
       raise ValueError("Invalid operation '%s'" % method)
 
-  def _DummyLog(self, *args):
-    pass
-
   def _Query(self, op):
     """Runs the specified opcode and returns the result.
 
     """
-    proc = mcpu.Processor(self.server.context)
-    # TODO: Where should log messages go?
-    return proc.ExecOpCode(op, self._DummyLog, None)
+    # Queries don't have a job id
+    proc = mcpu.Processor(self.server.context, None)
+    return proc.ExecOpCode(op, None)
 
 
 class GanetiContext(object):
@@ -318,6 +340,8 @@ class GanetiContext(object):
   This class creates and holds common objects shared by all threads.
 
   """
+  # pylint: disable-msg=W0212
+  # we do want to ensure a singleton here
   _instance = None
 
   def __init__(self):
@@ -350,12 +374,12 @@ class GanetiContext(object):
     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
     object.__setattr__(self, name, value)
 
-  def AddNode(self, node):
+  def AddNode(self, node, ec_id):
     """Adds a node to the configuration and lock manager.
 
     """
     # Add it to the configuration
-    self.cfg.AddNode(node)
+    self.cfg.AddNode(node, ec_id)
 
     # If preseeding fails it'll not be added
     self.jobqueue.AddNode(node)
@@ -384,6 +408,22 @@ class GanetiContext(object):
     self.glm.remove(locking.LEVEL_NODE, name)
 
 
+def _SetWatcherPause(until):
+  """Creates or removes the watcher pause file.
+
+  @type until: None or int
+  @param until: Unix timestamp saying until when the watcher shouldn't run
+
+  """
+  if until is None:
+    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
+  else:
+    utils.WriteFile(constants.WATCHER_PAUSEFILE,
+                    data="%d\n" % (until, ))
+
+  return until
+
+
 def CheckAgreement():
   """Check the agreement on who is the master.
 
@@ -442,27 +482,10 @@ def CheckAgreement():
   return result
 
 
-def CheckMasterd(options, args):
-  """Initial checks whether to run or exit with a failure.
-
-  """
+def CheckAgreementWithRpc():
   rpc.Init()
   try:
-    ssconf.CheckMaster(options.debug)
-
-    # we believe we are the master, let's ask the other nodes...
-    if options.no_voting and not options.yes_do_it:
-      sys.stdout.write("The 'no voting' option has been selected.\n")
-      sys.stdout.write("This is dangerous, please confirm by"
-                       " typing uppercase 'yes': ")
-      sys.stdout.flush()
-      confirmation = sys.stdin.readline().strip()
-      if confirmation != "YES":
-        print "Aborting."
-        return
-    elif not options.no_voting:
-      if not CheckAgreement():
-        return
+    return CheckAgreement()
   finally:
     rpc.Shutdown()
 
@@ -484,12 +507,12 @@ def _RunInSeparateProcess(fn):
       # Call function
       result = int(bool(fn()))
       assert result in (0, 1)
-    except:
+    except: # pylint: disable-msg=W0702
       logging.exception("Error while calling function in separate process")
       # 0 and 1 are reserved for the return value
       result = 33
 
-    os._exit(result)
+    os._exit(result) # pylint: disable-msg=W0212
 
   # Parent process
 
@@ -511,7 +534,41 @@ def _RunInSeparateProcess(fn):
   return bool(exitcode)
 
 
-def ExecMasterd (options, args):
+def CheckMasterd(options, args):
+  """Initial checks whether to run or exit with a failure.
+
+  """
+  if args: # masterd doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
+  ssconf.CheckMaster(options.debug)
+
+  # If CheckMaster didn't fail we believe we are the master, but we have to
+  # confirm with the other nodes.
+  if options.no_voting:
+    if options.yes_do_it:
+      return
+
+    sys.stdout.write("The 'no voting' option has been selected.\n")
+    sys.stdout.write("This is dangerous, please confirm by"
+                     " typing uppercase 'yes': ")
+    sys.stdout.flush()
+
+    confirmation = sys.stdin.readline().strip()
+    if confirmation != "YES":
+      print >> sys.stderr, "Aborting."
+      sys.exit(constants.EXIT_FAILURE)
+
+    return
+
+  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
+  # process before we call utils.Daemonize in the current process.
+  if not _RunInSeparateProcess(CheckAgreementWithRpc):
+    sys.exit(constants.EXIT_FAILURE)
+
+
+def ExecMasterd (options, args): # pylint: disable-msg=W0613
   """Main master daemon function, executed with the PID file held.
 
   """
@@ -526,7 +583,7 @@ def ExecMasterd (options, args):
       # activate ip
       master_node = ssconf.SimpleStore().GetMasterNode()
       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
-      msg = result.RemoteFailMsg()
+      msg = result.fail_msg
       if msg:
         logging.error("Can't activate master IP address: %s", msg)