opcodes: Annotate the OP_RESULT of query operations
[ganeti-local] / lib / server / masterd.py
index 27bf561..41df24e 100644 (file)
@@ -125,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
     self.server.request_workers.AddTask((self.server, message, self))
 
 
+class _MasterShutdownCheck:
+  """Logic for master daemon shutdown.
+
+  """
+  #: How long to wait between checks
+  _CHECK_INTERVAL = 5.0
+
+  #: How long to wait after all jobs are done (e.g. to give clients time to
+  #: retrieve the job status)
+  _SHUTDOWN_LINGER = 5.0
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._had_active_jobs = None
+    self._linger_timeout = None
+
+  def __call__(self, jq_prepare_result):
+    """Determines if master daemon is ready for shutdown.
+
+    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
+    @rtype: None or number
+    @return: None if master daemon is ready, timeout if the check must be
+             repeated
+
+    """
+    if jq_prepare_result:
+      # Check again shortly
+      logging.info("Job queue has been notified for shutdown but is still"
+                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
+      self._had_active_jobs = True
+      return self._CHECK_INTERVAL
+
+    if not self._had_active_jobs:
+      # Can shut down as there were no active jobs on the first check
+      return None
+
+    # No jobs are running anymore, but maybe some clients want to collect some
+    # information. Give them a short amount of time.
+    if self._linger_timeout is None:
+      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
+
+    remaining = self._linger_timeout.Remaining()
+
+    logging.info("Job queue no longer busy; shutting down master daemon"
+                 " in %s seconds", remaining)
+
+    # TODO: Should the master daemon socket be closed at this point? Doing so
+    # wouldn't affect existing connections.
+
+    if remaining < 0:
+      return None
+    else:
+      return remaining
+
+
 class MasterServer(daemon.AsyncStreamServer):
   """Master Server.
 
@@ -134,11 +191,9 @@ class MasterServer(daemon.AsyncStreamServer):
   """
   family = socket.AF_UNIX
 
-  def __init__(self, mainloop, address, uid, gid):
+  def __init__(self, address, uid, gid):
     """MasterServer constructor
 
-    @type mainloop: ganeti.daemon.Mainloop
-    @param mainloop: Mainloop used to poll for I/O events
     @param address: the unix socket address to bind the MasterServer to
     @param uid: The uid of the owner of the socket
     @param gid: The gid of the owner of the socket
@@ -150,13 +205,14 @@ class MasterServer(daemon.AsyncStreamServer):
     os.chown(temp_name, uid, gid)
     os.rename(temp_name, address)
 
-    self.mainloop = mainloop
     self.awaker = daemon.AsyncAwaker()
 
     # We'll only start threads once we've forked.
     self.context = None
     self.request_workers = None
 
+    self._shutdown_check = None
+
   def handle_connection(self, connected_socket, client_address):
     # TODO: add connection count and limit the number of open connections to a
     # maximum number to avoid breaking for lack of file descriptors or memory.
@@ -168,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer):
                                                  CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
+  def WaitForShutdown(self):
+    """Prepares server for shutdown.
+
+    """
+    if self._shutdown_check is None:
+      self._shutdown_check = _MasterShutdownCheck()
+
+    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
+
   def server_cleanup(self):
     """Cleanup the server.
 
@@ -193,6 +258,9 @@ class ClientOps:
     queue = self.server.context.jobqueue
 
     # TODO: Parameter validation
+    if not isinstance(args, (tuple, list)):
+      logging.info("Received invalid arguments of type '%s'", type(args))
+      raise ValueError("Invalid arguments type '%s'" % type(args))
 
     # TODO: Rewrite to not exit in each 'if/elif' branch
 
@@ -209,12 +277,12 @@ class ClientOps:
       return queue.SubmitManyJobs(jobs)
 
     elif method == luxi.REQ_CANCEL_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job cancel request for %s", job_id)
       return queue.CancelJob(job_id)
 
     elif method == luxi.REQ_ARCHIVE_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job archive request for %s", job_id)
       return queue.ArchiveJob(job_id)
 
@@ -231,13 +299,14 @@ class ClientOps:
                                      prev_log_serial, timeout)
 
     elif method == luxi.REQ_QUERY:
-      req = objects.QueryRequest.FromDict(args)
+      (what, fields, qfilter) = args
+      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
 
       if req.what in constants.QR_VIA_OP:
         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
-                                             filter=req.filter))
+                                             qfilter=req.qfilter))
       elif req.what == constants.QR_LOCK:
-        if req.filter is not None:
+        if req.qfilter is not None:
           raise errors.OpPrereqError("Lock queries can't be filtered")
         return self.server.context.glm.QueryLocks(req.fields)
       elif req.what in constants.QR_VIA_LUXI:
@@ -249,7 +318,8 @@ class ClientOps:
       return result
 
     elif method == luxi.REQ_QUERY_FIELDS:
-      req = objects.QueryFieldsRequest.FromDict(args)
+      (what, fields) = args
+      req = objects.QueryFieldsRequest(what=what, fields=fields)
 
       try:
         fielddefs = query.ALL_FIELDS[req.what]
@@ -298,7 +368,7 @@ class ClientOps:
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
-      nodes, use_locking = args
+      (nodes, use_locking) = args
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
@@ -307,7 +377,7 @@ class ClientOps:
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
-      fields = args
+      (fields, ) = args
       logging.info("Received config values query request for %s", fields)
       op = opcodes.OpClusterConfigQuery(output_fields=fields)
       return self._Query(op)
@@ -318,7 +388,7 @@ class ClientOps:
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_TAGS:
-      kind, name = args
+      (kind, name) = args
       logging.info("Received tags query request")
       op = opcodes.OpTagsGet(kind=kind, name=name)
       return self._Query(op)
@@ -331,7 +401,7 @@ class ClientOps:
       return self.server.context.glm.OldStyleQueryLocks(fields)
 
     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
-      drain_flag = args
+      (drain_flag, ) = args
       logging.info("Received queue drain flag change request to %s",
                    drain_flag)
       return queue.SetDrainFlag(drain_flag)
@@ -396,6 +466,11 @@ class GanetiContext(object):
                 self.cfg.GetNodeGroupList(),
                 self.cfg.GetInstanceList())
 
+    self.cfg.SetContext(self)
+
+    # RPC runner
+    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
+
     # Job queue
     self.jobqueue = jqueue.JobQueue(self)
 
@@ -421,6 +496,7 @@ class GanetiContext(object):
 
     # Add the new node to the Ganeti Lock Manager
     self.glm.add(locking.LEVEL_NODE, node.name)
+    self.glm.add(locking.LEVEL_NODE_RES, node.name)
 
   def ReaddNode(self, node):
     """Updates a node that's already in the configuration
@@ -441,6 +517,7 @@ class GanetiContext(object):
 
     # Remove the node from the Ganeti Lock Manager
     self.glm.remove(locking.LEVEL_NODE, name)
+    self.glm.remove(locking.LEVEL_NODE_RES, name)
 
 
 def _SetWatcherPause(until):
@@ -523,8 +600,13 @@ def CheckAgreement():
 @rpc.RunWithRPC
 def ActivateMasterIP():
   # activate ip
-  master_node = ssconf.SimpleStore().GetMasterNode()
-  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+  cfg = config.ConfigWriter()
+  master_params = cfg.GetMasterNetworkParameters()
+  ems = cfg.GetUseExternalMipScript()
+  runner = rpc.BootstrapRunner()
+  result = runner.call_node_activate_master_ip(master_params.name,
+                                               master_params, ems)
+
   msg = result.fail_msg
   if msg:
     logging.error("Can't activate master IP address: %s", msg)
@@ -608,8 +690,7 @@ def PrepMasterd(options, _):
   utils.RemoveFile(constants.MASTER_SOCKET)
 
   mainloop = daemon.Mainloop()
-  master = MasterServer(mainloop, constants.MASTER_SOCKET,
-                        options.uid, options.gid)
+  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
   return (mainloop, master)
 
 
@@ -623,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
     try:
       master.setup_queue()
       try:
-        mainloop.Run()
+        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
       finally:
         master.server_cleanup()
     finally:
@@ -631,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
   finally:
     utils.RemoveFile(constants.MASTER_SOCKET)
 
+  logging.info("Clean master daemon shutdown")
+
 
 def Main():
   """Main function"""