self.server.request_workers.AddTask((self.server, message, self))
+class _MasterShutdownCheck:
+ """Logic for master daemon shutdown.
+
+ """
+ #: How long to wait between checks
+ _CHECK_INTERVAL = 5.0
+
+ #: How long to wait after all jobs are done (e.g. to give clients time to
+ #: retrieve the job status)
+ _SHUTDOWN_LINGER = 5.0
+
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ self._had_active_jobs = None
+ self._linger_timeout = None
+
+ def __call__(self, jq_prepare_result):
+ """Determines if master daemon is ready for shutdown.
+
+ @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
+ @rtype: None or number
+ @return: None if master daemon is ready, timeout if the check must be
+ repeated
+
+ """
+ if jq_prepare_result:
+ # Check again shortly
+ logging.info("Job queue has been notified for shutdown but is still"
+ " busy; next check in %s seconds", self._CHECK_INTERVAL)
+ self._had_active_jobs = True
+ return self._CHECK_INTERVAL
+
+ if not self._had_active_jobs:
+ # Can shut down as there were no active jobs on the first check
+ return None
+
+ # No jobs are running anymore, but maybe some clients want to collect some
+ # information. Give them a short amount of time.
+ if self._linger_timeout is None:
+ self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
+
+ remaining = self._linger_timeout.Remaining()
+
+ logging.info("Job queue no longer busy; shutting down master daemon"
+ " in %s seconds", remaining)
+
+ # TODO: Should the master daemon socket be closed at this point? Doing so
+ # wouldn't affect existing connections.
+
+ if remaining < 0:
+ return None
+ else:
+ return remaining
+
+
class MasterServer(daemon.AsyncStreamServer):
"""Master Server.
"""
family = socket.AF_UNIX
- def __init__(self, mainloop, address, uid, gid):
+ def __init__(self, address, uid, gid):
"""MasterServer constructor
- @type mainloop: ganeti.daemon.Mainloop
- @param mainloop: Mainloop used to poll for I/O events
@param address: the unix socket address to bind the MasterServer to
@param uid: The uid of the owner of the socket
@param gid: The gid of the owner of the socket
os.chown(temp_name, uid, gid)
os.rename(temp_name, address)
- self.mainloop = mainloop
self.awaker = daemon.AsyncAwaker()
# We'll only start threads once we've forked.
self.context = None
self.request_workers = None
+ self._shutdown_check = None
+
def handle_connection(self, connected_socket, client_address):
# TODO: add connection count and limit the number of open connections to a
# maximum number to avoid breaking for lack of file descriptors or memory.
CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
+ def WaitForShutdown(self):
+ """Prepares server for shutdown.
+
+ """
+ if self._shutdown_check is None:
+ self._shutdown_check = _MasterShutdownCheck()
+
+ return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
+
def server_cleanup(self):
"""Cleanup the server.
queue = self.server.context.jobqueue
# TODO: Parameter validation
+ if not isinstance(args, (tuple, list)):
+ logging.info("Received invalid arguments of type '%s'", type(args))
+ raise ValueError("Invalid arguments type '%s'" % type(args))
# TODO: Rewrite to not exit in each 'if/elif' branch
return queue.SubmitManyJobs(jobs)
elif method == luxi.REQ_CANCEL_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
elif method == luxi.REQ_ARCHIVE_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
prev_log_serial, timeout)
elif method == luxi.REQ_QUERY:
- req = objects.QueryRequest.FromDict(args)
+ (what, fields, qfilter) = args
+ req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
if req.what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
- filter=req.filter))
+ qfilter=req.qfilter))
elif req.what == constants.QR_LOCK:
- if req.filter is not None:
+ if req.qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
elif req.what in constants.QR_VIA_LUXI:
return result
elif method == luxi.REQ_QUERY_FIELDS:
- req = objects.QueryFieldsRequest.FromDict(args)
+ (what, fields) = args
+ req = objects.QueryFieldsRequest(what=what, fields=fields)
try:
fielddefs = query.ALL_FIELDS[req.what]
return self._Query(op)
elif method == luxi.REQ_QUERY_EXPORTS:
- nodes, use_locking = args
+ (nodes, use_locking) = args
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
return self._Query(op)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
- fields = args
+ (fields, ) = args
logging.info("Received config values query request for %s", fields)
op = opcodes.OpClusterConfigQuery(output_fields=fields)
return self._Query(op)
return self._Query(op)
elif method == luxi.REQ_QUERY_TAGS:
- kind, name = args
+ (kind, name) = args
logging.info("Received tags query request")
op = opcodes.OpTagsGet(kind=kind, name=name)
return self._Query(op)
return self.server.context.glm.OldStyleQueryLocks(fields)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
- drain_flag = args
+ (drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
return queue.SetDrainFlag(drain_flag)
self.cfg.GetNodeGroupList(),
self.cfg.GetInstanceList())
+ self.cfg.SetContext(self)
+
+ # RPC runner
+ self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
+
# Job queue
self.jobqueue = jqueue.JobQueue(self)
# Add the new node to the Ganeti Lock Manager
self.glm.add(locking.LEVEL_NODE, node.name)
+ self.glm.add(locking.LEVEL_NODE_RES, node.name)
def ReaddNode(self, node):
"""Updates a node that's already in the configuration
# Remove the node from the Ganeti Lock Manager
self.glm.remove(locking.LEVEL_NODE, name)
+ self.glm.remove(locking.LEVEL_NODE_RES, name)
def _SetWatcherPause(until):
@rpc.RunWithRPC
def ActivateMasterIP():
# activate ip
- master_node = ssconf.SimpleStore().GetMasterNode()
- result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+ cfg = config.ConfigWriter()
+ master_params = cfg.GetMasterNetworkParameters()
+ ems = cfg.GetUseExternalMipScript()
+ runner = rpc.BootstrapRunner()
+ result = runner.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
+
msg = result.fail_msg
if msg:
logging.error("Can't activate master IP address: %s", msg)
utils.RemoveFile(constants.MASTER_SOCKET)
mainloop = daemon.Mainloop()
- master = MasterServer(mainloop, constants.MASTER_SOCKET,
- options.uid, options.gid)
+ master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
return (mainloop, master)
try:
master.setup_queue()
try:
- mainloop.Run()
+ mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
finally:
master.server_cleanup()
finally:
finally:
utils.RemoveFile(constants.MASTER_SOCKET)
+ logging.info("Clean master daemon shutdown")
+
def Main():
"""Main function"""