Change method dispatch in ClientOps to enforce luxi.REQ_ALL
[ganeti-local] / lib / server / masterd.py
index bb6d620..b0e8708 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,7 +26,7 @@ inheritance from parent classes requires it.
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name ganeti-masterd
 
 import grp
@@ -55,6 +55,11 @@ from ganeti import workerpool
 from ganeti import rpc
 from ganeti import bootstrap
 from ganeti import netutils
+from ganeti import objects
+from ganeti import query
+from ganeti import runtime
+from ganeti import pathutils
+from ganeti import ht
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -63,8 +68,21 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+def _LogNewJob(status, info, ops):
+  """Log information about a recently submitted job.
+
+  """
+  op_summary = utils.CommaJoin(op.Summary() for op in ops)
+
+  if status:
+    logging.info("New job with id %s, summary: %s", info, op_summary)
+  else:
+    logging.info("Failed to submit job, reason: '%s', summary: %s",
+                 info, op_summary)
+
+
 class ClientRequestWorker(workerpool.BaseWorker):
-  # pylint: disable-msg=W0221
+  # pylint: disable=W0221
   def RunTask(self, server, message, client):
     """Process the request.
 
@@ -101,7 +119,7 @@ class ClientRequestWorker(workerpool.BaseWorker):
       client.send_message(reply)
       # awake the main thread so that it can write out the data.
       server.awaker.signal()
-    except: # pylint: disable-msg=W0702
+    except: # pylint: disable=W0702
       logging.exception("Send error")
       client.close_log()
 
@@ -111,6 +129,7 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
 
   """
   _MAX_UNHANDLED = 1
+
   def __init__(self, server, connected_socket, client_address, family):
     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
                                                  client_address,
@@ -122,6 +141,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
     self.server.request_workers.AddTask((self.server, message, self))
 
 
+class _MasterShutdownCheck:
+  """Logic for master daemon shutdown.
+
+  """
+  #: How long to wait between checks
+  _CHECK_INTERVAL = 5.0
+
+  #: How long to wait after all jobs are done (e.g. to give clients time to
+  #: retrieve the job status)
+  _SHUTDOWN_LINGER = 5.0
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._had_active_jobs = None
+    self._linger_timeout = None
+
+  def __call__(self, jq_prepare_result):
+    """Determines if master daemon is ready for shutdown.
+
+    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
+    @rtype: None or number
+    @return: None if master daemon is ready, timeout if the check must be
+             repeated
+
+    """
+    if jq_prepare_result:
+      # Check again shortly
+      logging.info("Job queue has been notified for shutdown but is still"
+                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
+      self._had_active_jobs = True
+      return self._CHECK_INTERVAL
+
+    if not self._had_active_jobs:
+      # Can shut down as there were no active jobs on the first check
+      return None
+
+    # No jobs are running anymore, but maybe some clients want to collect some
+    # information. Give them a short amount of time.
+    if self._linger_timeout is None:
+      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
+
+    remaining = self._linger_timeout.Remaining()
+
+    logging.info("Job queue no longer busy; shutting down master daemon"
+                 " in %s seconds", remaining)
+
+    # TODO: Should the master daemon socket be closed at this point? Doing so
+    # wouldn't affect existing connections.
+
+    if remaining < 0:
+      return None
+    else:
+      return remaining
+
+
 class MasterServer(daemon.AsyncStreamServer):
   """Master Server.
 
@@ -131,11 +207,9 @@ class MasterServer(daemon.AsyncStreamServer):
   """
   family = socket.AF_UNIX
 
-  def __init__(self, mainloop, address, uid, gid):
+  def __init__(self, address, uid, gid):
     """MasterServer constructor
 
-    @type mainloop: ganeti.daemon.Mainloop
-    @param mainloop: Mainloop used to poll for I/O events
     @param address: the unix socket address to bind the MasterServer to
     @param uid: The uid of the owner of the socket
     @param gid: The gid of the owner of the socket
@@ -147,13 +221,14 @@ class MasterServer(daemon.AsyncStreamServer):
     os.chown(temp_name, uid, gid)
     os.rename(temp_name, address)
 
-    self.mainloop = mainloop
     self.awaker = daemon.AsyncAwaker()
 
     # We'll only start threads once we've forked.
     self.context = None
     self.request_workers = None
 
+    self._shutdown_check = None
+
   def handle_connection(self, connected_socket, client_address):
     # TODO: add connection count and limit the number of open connections to a
     # maximum number to avoid breaking for lack of file descriptors or memory.
@@ -165,6 +240,15 @@ class MasterServer(daemon.AsyncStreamServer):
                                                  CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
+  def WaitForShutdown(self):
+    """Prepares server for shutdown.
+
+    """
+    if self._shutdown_check is None:
+      self._shutdown_check = _MasterShutdownCheck()
+
+    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
+
   def server_cleanup(self):
     """Cleanup the server.
 
@@ -186,36 +270,57 @@ class ClientOps:
   def __init__(self, server):
     self.server = server
 
-  def handle_request(self, method, args): # pylint: disable-msg=R0911
-    queue = self.server.context.jobqueue
+  def handle_request(self, method, args): # pylint: disable=R0911
+    context = self.server.context
+    queue = context.jobqueue
 
     # TODO: Parameter validation
+    if not isinstance(args, (tuple, list)):
+      logging.info("Received invalid arguments of type '%s'", type(args))
+      raise ValueError("Invalid arguments type '%s'" % type(args))
+
+    if method not in luxi.REQ_ALL:
+      logging.info("Received invalid request '%s'", method)
+      raise ValueError("Invalid operation '%s'" % method)
 
     # TODO: Rewrite to not exit in each 'if/elif' branch
 
     if method == luxi.REQ_SUBMIT_JOB:
-      logging.info("Received new job")
-      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
-      return queue.SubmitJob(ops)
-
-    if method == luxi.REQ_SUBMIT_MANY_JOBS:
-      logging.info("Received multiple jobs")
+      logging.info("Receiving new job")
+      (job_def, ) = args
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
+      job_id = queue.SubmitJob(ops)
+      _LogNewJob(True, job_id, ops)
+      return job_id
+
+    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Receiving multiple jobs")
+      (job_defs, ) = args
       jobs = []
-      for ops in args:
+      for ops in job_defs:
         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
-      return queue.SubmitManyJobs(jobs)
+      job_ids = queue.SubmitManyJobs(jobs)
+      for ((status, job_id), ops) in zip(job_ids, jobs):
+        _LogNewJob(status, job_id, ops)
+      return job_ids
 
     elif method == luxi.REQ_CANCEL_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job cancel request for %s", job_id)
       return queue.CancelJob(job_id)
 
+    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
+      (job_id, priority) = args
+      logging.info("Received request to change priority for job %s to %s",
+                   job_id, priority)
+      return queue.ChangeJobPriority(job_id, priority)
+
     elif method == luxi.REQ_ARCHIVE_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job archive request for %s", job_id)
       return queue.ArchiveJob(job_id)
 
-    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
+    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
       (age, timeout) = args
       logging.info("Received job autoarchive request for age %s, timeout %s",
                    age, timeout)
@@ -227,6 +332,39 @@ class ClientOps:
       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
                                      prev_log_serial, timeout)
 
+    elif method == luxi.REQ_QUERY:
+      (what, fields, qfilter) = args
+
+      if what in constants.QR_VIA_OP:
+        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
+                                             qfilter=qfilter))
+      elif what == constants.QR_LOCK:
+        if qfilter is not None:
+          raise errors.OpPrereqError("Lock queries can't be filtered",
+                                     errors.ECODE_INVAL)
+        return context.glm.QueryLocks(fields)
+      elif what == constants.QR_JOB:
+        return queue.QueryJobs(fields, qfilter)
+      elif what in constants.QR_VIA_LUXI:
+        raise NotImplementedError
+      else:
+        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
+                                   errors.ECODE_INVAL)
+
+      return result
+
+    elif method == luxi.REQ_QUERY_FIELDS:
+      (what, fields) = args
+      req = objects.QueryFieldsRequest(what=what, fields=fields)
+
+      try:
+        fielddefs = query.ALL_FIELDS[req.what]
+      except KeyError:
+        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
+                                   errors.ECODE_INVAL)
+
+      return query.QueryFields(fielddefs, req.fields)
+
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
       if isinstance(job_ids, (tuple, list)) and job_ids:
@@ -234,7 +372,7 @@ class ClientOps:
       else:
         msg = str(job_ids)
       logging.info("Received job query request for %s", msg)
-      return queue.QueryJobs(job_ids, fields)
+      return queue.OldStyleQueryJobs(job_ids, fields)
 
     elif method == luxi.REQ_QUERY_INSTANCES:
       (names, fields, use_locking) = args
@@ -242,8 +380,8 @@ class ClientOps:
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
-      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
-                                    use_locking=use_locking)
+      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
+                                   use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_NODES:
@@ -252,43 +390,56 @@ class ClientOps:
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
-      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
-                                use_locking=use_locking)
+      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
+                               use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_GROUPS:
+      (names, fields, use_locking) = args
+      logging.info("Received group query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
+      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_NETWORKS:
+      (names, fields, use_locking) = args
+      logging.info("Received network query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
+      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
-      nodes, use_locking = args
+      (nodes, use_locking) = args
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
       logging.info("Received exports query request")
-      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
+      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
-      fields = args
+      (fields, ) = args
       logging.info("Received config values query request for %s", fields)
-      op = opcodes.OpQueryConfigValues(output_fields=fields)
+      op = opcodes.OpClusterConfigQuery(output_fields=fields)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
       logging.info("Received cluster info query request")
-      op = opcodes.OpQueryClusterInfo()
+      op = opcodes.OpClusterQuery()
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_TAGS:
-      kind, name = args
+      (kind, name) = args
       logging.info("Received tags query request")
-      op = opcodes.OpGetTags(kind=kind, name=name)
+      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
       return self._Query(op)
 
-    elif method == luxi.REQ_QUERY_LOCKS:
-      (fields, sync) = args
-      logging.info("Received locks query request")
-      return self.server.context.glm.QueryLocks(fields, sync)
-
-    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
-      drain_flag = args
+    elif method == luxi.REQ_SET_DRAIN_FLAG:
+      (drain_flag, ) = args
       logging.info("Received queue drain flag change request to %s",
                    drain_flag)
       return queue.SetDrainFlag(drain_flag)
@@ -296,29 +447,19 @@ class ClientOps:
     elif method == luxi.REQ_SET_WATCHER_PAUSE:
       (until, ) = args
 
-      if until is None:
-        logging.info("Received request to no longer pause the watcher")
-      else:
-        if not isinstance(until, (int, float)):
-          raise TypeError("Duration must be an integer or float")
-
-        if until < time.time():
-          raise errors.GenericError("Unable to set pause end time in the past")
-
-        logging.info("Received request to pause the watcher until %s", until)
-
-      return _SetWatcherPause(until)
+      return _SetWatcherPause(context, until)
 
     else:
-      logging.info("Received invalid request '%s'", method)
-      raise ValueError("Invalid operation '%s'" % method)
+      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
+      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
+                                   " but not implemented" % method)
 
   def _Query(self, op):
     """Runs the specified opcode and returns the result.
 
     """
     # Queries don't have a job id
-    proc = mcpu.Processor(self.server.context, None)
+    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
 
     # TODO: Executing an opcode using locks will acquire them in blocking mode.
     # Consider using a timeout for retries.
@@ -331,7 +472,7 @@ class GanetiContext(object):
   This class creates and holds common objects shared by all threads.
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   # we do want to ensure a singleton here
   _instance = None
 
@@ -349,8 +490,15 @@ class GanetiContext(object):
 
     # Locking manager
     self.glm = locking.GanetiLockManager(
-                self.cfg.GetNodeList(),
-                self.cfg.GetInstanceList())
+      self.cfg.GetNodeList(),
+      self.cfg.GetNodeGroupList(),
+      self.cfg.GetInstanceList(),
+      self.cfg.GetNetworkList())
+
+    self.cfg.SetContext(self)
+
+    # RPC runner
+    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
 
     # Job queue
     self.jobqueue = jqueue.JobQueue(self)
@@ -377,6 +525,7 @@ class GanetiContext(object):
 
     # Add the new node to the Ganeti Lock Manager
     self.glm.add(locking.LEVEL_NODE, node.name)
+    self.glm.add(locking.LEVEL_NODE_RES, node.name)
 
   def ReaddNode(self, node):
     """Updates a node that's already in the configuration
@@ -397,20 +546,39 @@ class GanetiContext(object):
 
     # Remove the node from the Ganeti Lock Manager
     self.glm.remove(locking.LEVEL_NODE, name)
+    self.glm.remove(locking.LEVEL_NODE_RES, name)
 
 
-def _SetWatcherPause(until):
+def _SetWatcherPause(context, until):
   """Creates or removes the watcher pause file.
 
+  @type context: L{GanetiContext}
+  @param context: Global Ganeti context
   @type until: None or int
   @param until: Unix timestamp saying until when the watcher shouldn't run
 
   """
+  node_names = context.cfg.GetNodeList()
+
   if until is None:
-    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
+    logging.info("Received request to no longer pause watcher")
   else:
-    utils.WriteFile(constants.WATCHER_PAUSEFILE,
-                    data="%d\n" % (until, ))
+    if not ht.TNumber(until):
+      raise TypeError("Duration must be numeric")
+
+    if until < time.time():
+      raise errors.GenericError("Unable to set pause end time in the past")
+
+    logging.info("Received request to pause watcher until %s", until)
+
+  result = context.rpc.call_set_watcher_pause(node_names, until)
+
+  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
+                           for (node_name, nres) in result.items()
+                           if nres.fail_msg and not nres.offline)
+  if errmsg:
+    raise errors.OpExecError("Watcher pause was set where possible, but failed"
+                             " on the following node(s): %s" % errmsg)
 
   return until
 
@@ -479,8 +647,13 @@ def CheckAgreement():
 @rpc.RunWithRPC
 def ActivateMasterIP():
   # activate ip
-  master_node = ssconf.SimpleStore().GetMasterNode()
-  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+  cfg = config.ConfigWriter()
+  master_params = cfg.GetMasterNetworkParameters()
+  ems = cfg.GetUseExternalMipScript()
+  runner = rpc.BootstrapRunner()
+  result = runner.call_node_activate_master_ip(master_params.name,
+                                               master_params, ems)
+
   msg = result.fail_msg
   if msg:
     logging.error("Can't activate master IP address: %s", msg)
@@ -504,6 +677,9 @@ def CheckMasterd(options, args):
                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
     sys.exit(constants.EXIT_FAILURE)
 
+  # Determine static runtime architecture information
+  runtime.InitArchInfo()
+
   # Check the configuration is sane before anything else
   try:
     config.ConfigWriter()
@@ -530,26 +706,24 @@ def CheckMasterd(options, args):
   # If CheckMaster didn't fail we believe we are the master, but we have to
   # confirm with the other nodes.
   if options.no_voting:
-    if options.yes_do_it:
-      return
+    if not options.yes_do_it:
+      sys.stdout.write("The 'no voting' option has been selected.\n")
+      sys.stdout.write("This is dangerous, please confirm by"
+                       " typing uppercase 'yes': ")
+      sys.stdout.flush()
 
-    sys.stdout.write("The 'no voting' option has been selected.\n")
-    sys.stdout.write("This is dangerous, please confirm by"
-                     " typing uppercase 'yes': ")
-    sys.stdout.flush()
+      confirmation = sys.stdin.readline().strip()
+      if confirmation != "YES":
+        print >> sys.stderr, "Aborting."
+        sys.exit(constants.EXIT_FAILURE)
 
-    confirmation = sys.stdin.readline().strip()
-    if confirmation != "YES":
-      print >> sys.stderr, "Aborting."
+  else:
+    # CheckAgreement uses RPC and threads, hence it needs to be run in
+    # a separate process before we call utils.Daemonize in the current
+    # process.
+    if not utils.RunInSeparateProcess(CheckAgreement):
       sys.exit(constants.EXIT_FAILURE)
 
-    return
-
-  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
-  # process before we call utils.Daemonize in the current process.
-  if not utils.RunInSeparateProcess(CheckAgreement):
-    sys.exit(constants.EXIT_FAILURE)
-
   # ActivateMasterIP also uses RPC/threads, so we run it again via a
   # separate process.
 
@@ -563,15 +737,14 @@ def PrepMasterd(options, _):
   """
   # This is safe to do as the pid file guarantees against
   # concurrent execution.
-  utils.RemoveFile(constants.MASTER_SOCKET)
+  utils.RemoveFile(pathutils.MASTER_SOCKET)
 
   mainloop = daemon.Mainloop()
-  master = MasterServer(mainloop, constants.MASTER_SOCKET,
-                        options.uid, options.gid)
+  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
   return (mainloop, master)
 
 
-def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
+def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
   """Main master daemon function, executed with the PID file held.
 
   """
@@ -581,13 +754,15 @@ def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
     try:
       master.setup_queue()
       try:
-        mainloop.Run()
+        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
       finally:
         master.server_cleanup()
     finally:
       rpc.Shutdown()
   finally:
-    utils.RemoveFile(constants.MASTER_SOCKET)
+    utils.RemoveFile(pathutils.MASTER_SOCKET)
+
+  logging.info("Clean master daemon shutdown")
 
 
 def Main():